You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/03/27 18:35:44 UTC

[1/3] phoenix git commit: Secondary indexing with txns

Repository: phoenix
Updated Branches:
  refs/heads/txn 826ebf5ce -> 5a558e16c


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index eb117bf..1e28766 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -18,91 +18,41 @@
 package org.apache.phoenix.index;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
-import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.types.PVarbinary;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-import com.google.common.collect.Lists;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.util.IndexUtil;
 
 /**
  * Index builder for covered-columns index that ties into phoenix for faster use.
  */
-public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder {
+public class PhoenixIndexBuilder extends NonTxIndexBuilder {
+
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        super.setup(env);
+        Configuration conf = env.getConfiguration();
+        // Install handler that will attempt to disable the index first before killing the region
+        // server
+        conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY,
+            PhoenixIndexFailurePolicy.class.getName());
+    }
 
     @Override
     public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
         // The entire purpose of this method impl is to get the existing rows for the
         // table rows being indexed into the block cache, as the index maintenance code
-        // does a point scan per row
-        List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
-        Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
-                new HashMap<ImmutableBytesWritable, IndexMaintainer>();
-        ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
-        for (int i = 0; i < miniBatchOp.size(); i++) {
-            Mutation m = miniBatchOp.getOperation(i);
-            keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
-            List<IndexMaintainer> indexMaintainers = getCodec().getIndexMetaData(m.getAttributesMap()).getIndexMaintainers();
-            
-            for(IndexMaintainer indexMaintainer: indexMaintainers) {
-                if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
-                indexTableName.set(indexMaintainer.getIndexTableName());
-                if (maintainers.get(indexTableName) != null) continue;
-                maintainers.put(indexTableName, indexMaintainer);
-            }
-            
-        }
-        if (maintainers.isEmpty()) return;
-        Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
-        ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
-        scanRanges.initializeScan(scan);
-        scan.setFilter(scanRanges.getSkipScanFilter());
-        HRegion region = this.env.getRegion();
-        RegionScanner scanner = region.getScanner(scan);
-        // Run through the scanner using internal nextRaw method
-        region.startRegionOperation();
-        try {
-            synchronized (scanner) {
-                boolean hasMore;
-                do {
-                    List<Cell> results = Lists.newArrayList();
-                    // Results are potentially returned even when the return value of s.next is
-                    // false since this is an indication of whether or not there are more values
-                    // after the ones returned
-                    hasMore = scanner.nextRaw(results);
-                } while (hasMore);
-            }
-        } finally {
-            try {
-                scanner.close();
-            } finally {
-                region.closeRegionOperation();
-            }
-        }
+        // does a point scan per row.
+        // TODO: provide a means for the transactional case to just return the Scanner
+        // for when this is executed as it seems like that would be more efficient.
+        IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache());
     }
 
     private PhoenixIndexCodec getCodec() {
         return (PhoenixIndexCodec)this.codec;
     }
-    
-    @Override
-    public byte[] getBatchId(Mutation m){
-        return this.codec.getBatchId(m);
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 8b507b6..7b77c37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.index;
 
@@ -25,7 +17,6 @@ import java.util.Map;
 
 import co.cask.tephra.Transaction;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -41,13 +32,12 @@ import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.TxIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ServerUtil;
@@ -63,42 +53,28 @@ import com.google.common.collect.Lists;
 public class PhoenixIndexCodec extends BaseIndexCodec {
     public static final String INDEX_MD = "IdxMD";
     public static final String INDEX_UUID = "IdxUUID";
+    public static final String INDEX_MAINTAINERS = "IndexMaintainers";
 
     private RegionCoprocessorEnvironment env;
-    private KeyValueBuilder kvBuilder;
+    private KeyValueBuilder kvBuilder = GenericKeyValueBuilder.INSTANCE;;
 
     @Override
-    public void initialize(RegionCoprocessorEnvironment env) {
+    public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+        super.initialize(env);
         this.env = env;
-        Configuration conf = env.getConfiguration();
-        // Install handler that will attempt to disable the index first before killing the region
-        // server
-        conf.setIfUnset(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY,
-            PhoenixIndexFailurePolicy.class.getName());
-        // Use the GenericKeyValueBuilder, as it's been shown in perf testing that ClientKeyValue doesn't help
-        // TODO: Jesse to investigate more
-        this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
     }
 
     boolean hasIndexMaintainers(Map<String, byte[]> attributes) {
-        if (attributes == null) {
-            return false;
-        }
+        if (attributes == null) { return false; }
         byte[] uuid = attributes.get(INDEX_UUID);
-        if (uuid == null) {
-            return false;
-        }
+        if (uuid == null) { return false; }
         return true;
     }
-    
-    IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException{
-        if (attributes == null) {
-            return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
-        }
+
+    public IndexMetaDataCache getIndexMetaData(Map<String, byte[]> attributes) throws IOException {
+        if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
         byte[] uuid = attributes.get(INDEX_UUID);
-        if (uuid == null) {
-            return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE;
-        }
+        if (uuid == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
         byte[] md = attributes.get(INDEX_MD);
         byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE);
         if (md != null) {
@@ -107,8 +83,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             return new IndexMetaDataCache() {
 
                 @Override
-                public void close() throws IOException {
-                }
+                public void close() throws IOException {}
 
                 @Override
                 public List<IndexMaintainer> getIndexMaintainers() {
@@ -119,26 +94,24 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
                 public Transaction getTransaction() {
                     return txn;
                 }
-                
+
             };
         } else {
             byte[] tenantIdBytes = attributes.get(PhoenixRuntime.TENANT_ID_ATTRIB);
-            ImmutableBytesWritable tenantId =
-                tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
+            ImmutableBytesWritable tenantId = tenantIdBytes == null ? null : new ImmutableBytesWritable(tenantIdBytes);
             TenantCache cache = GlobalCache.getTenantCache(env, tenantId);
-            IndexMetaDataCache indexCache =
-                (IndexMetaDataCache) cache.getServerCache(new ImmutableBytesPtr(uuid));
+            IndexMetaDataCache indexCache = (IndexMetaDataCache)cache.getServerCache(new ImmutableBytesPtr(uuid));
             if (indexCache == null) {
-                String msg = "key="+ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
+                String msg = "key=" + ServerCacheClient.idToString(uuid) + " region=" + env.getRegion();
                 SQLException e = new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_METADATA_NOT_FOUND)
-                    .setMessage(msg).build().buildException();
+                        .setMessage(msg).build().buildException();
                 ServerUtil.throwIOException("Index update failed", e); // will not return
             }
             return indexCache;
         }
-    
+
     }
-    
+
     @Override
     public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
         return getIndexUpdates(state, true);
@@ -150,18 +123,16 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     }
 
     /**
-     * 
      * @param state
-     * @param upsert prepare index upserts if it's true otherwise prepare index deletes. 
+     * @param upsert
+     *            prepare index upserts if it's true otherwise prepare index deletes.
      * @return
      * @throws IOException
      */
     private Iterable<IndexUpdate> getIndexUpdates(TableState state, boolean upsert) throws IOException {
-        IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes());
-        List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers();
-        if (indexMaintainers.isEmpty()) {
-            return Collections.emptyList();
-        }
+        @SuppressWarnings("unchecked")
+        List<IndexMaintainer> indexMaintainers = (List<IndexMaintainer>)state.getContext().get(INDEX_MAINTAINERS);
+        if (indexMaintainers.isEmpty()) { return Collections.emptyList(); }
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         // TODO: state.getCurrentRowKey() should take an ImmutableBytesWritable arg to prevent byte copy
@@ -171,7 +142,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         ValueGetter valueGetter = null;
         Scanner scanner = null;
         for (IndexMaintainer maintainer : indexMaintainers) {
-            if(upsert) {
+            if (upsert) {
                 // Short-circuit building state when we know it's a row deletion
                 if (maintainer.isRowDeleted(state.getPendingUpdate())) {
                     continue;
@@ -180,31 +151,25 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             IndexUpdate indexUpdate = null;
             if (maintainer.isImmutableRows()) {
                 indexUpdate = new IndexUpdate(new ColumnTracker(maintainer.getAllColumns()));
-                if(maintainer.isLocalIndex()) {
+                if (maintainer.isLocalIndex()) {
                     indexUpdate.setTable(localIndexTableName);
                 } else {
                     indexUpdate.setTable(maintainer.getIndexTableName());
                 }
                 valueGetter = maintainer.createGetterFromKeyValues(dataRowKey, state.getPendingUpdate());
             } else {
-                // TODO: if more efficient, I could do this just once with all columns in all indexes
-                Pair<Scanner,IndexUpdate> statePair = state.getIndexedColumnsTableState(maintainer.getAllColumns());
-                scanner = statePair.getFirst();
+                Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+                valueGetter = statePair.getFirst();
                 indexUpdate = statePair.getSecond();
                 indexUpdate.setTable(maintainer.getIndexTableName());
-                valueGetter = IndexManagementUtil.createGetterFromScanner(scanner, dataRowKey);
             }
             Mutation mutation = null;
             if (upsert) {
-                mutation =
-                        maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state
-                                .getCurrentTimestamp(), env.getRegion().getStartKey(), env
-                                .getRegion().getEndKey());
+                mutation = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, state.getCurrentTimestamp(), env
+                        .getRegion().getStartKey(), env.getRegion().getEndKey());
             } else {
-                mutation =
-                        maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state
-                                .getPendingUpdate(), state.getCurrentTimestamp(), env.getRegion()
-                                .getStartKey(), env.getRegion().getEndKey());
+                mutation = maintainer.buildDeleteMutation(kvBuilder, valueGetter, ptr, state.getPendingUpdate(),
+                        state.getCurrentTimestamp(), env.getRegion().getStartKey(), env.getRegion().getEndKey());
             }
             indexUpdate.setUpdate(mutation);
             if (scanner != null) {
@@ -215,15 +180,25 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         }
         return indexUpdates;
     }
-    
-  @Override
-  public boolean isEnabled(Mutation m) throws IOException {
-      return !hasIndexMaintainers(m.getAttributesMap());
-  }
-  
-  @Override
-  public byte[] getBatchId(Mutation m) {
-    Map<String, byte[]> attributes = m.getAttributesMap();
-    return attributes.get(INDEX_UUID);
-  }
+
+    @Override
+    public boolean isEnabled(Mutation m) throws IOException {
+        return !hasIndexMaintainers(m.getAttributesMap());
+    }
+
+    @Override
+    public byte[] getBatchId(Mutation m) {
+        Map<String, byte[]> attributes = m.getAttributesMap();
+        return attributes.get(INDEX_UUID);
+    }
+
+    @Override
+    public void setContext(TableState state, Mutation mutation) throws IOException {
+        IndexMetaDataCache indexCache = getIndexMetaData(state.getUpdateAttributes());
+        List<IndexMaintainer> indexMaintainers = indexCache.getIndexMaintainers();
+        Map<String,Object> context = state.getContext();
+        context.clear();
+        context.put(INDEX_MAINTAINERS, indexMaintainers);
+        context.put(TxIndexBuilder.TRANSACTION, indexCache.getTransaction());
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 2fd168a..806a20a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -75,7 +75,7 @@ import com.google.common.collect.Multimap;
  * 
  * @since 2.1
  */
-public class PhoenixIndexFailurePolicy extends  KillServerOnFailurePolicy {
+public class PhoenixIndexFailurePolicy extends KillServerOnFailurePolicy {
     private static final Log LOG = LogFactory.getLog(PhoenixIndexFailurePolicy.class);
     private RegionCoprocessorEnvironment env;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
new file mode 100644
index 0000000..c471df6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexBuilder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.phoenix.hbase.index.covered.TxIndexBuilder;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.util.IndexUtil;
+
+public class PhoenixTxIndexBuilder extends TxIndexBuilder {
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        super.setup(env);
+        Configuration conf = env.getConfiguration();
+        // Install failure policy that just re-throws exception instead of killing RS
+        // or disabling the index
+        conf.set(IndexWriter.INDEX_FAILURE_POLICY_CONF_KEY, PhoenixTxIndexFailurePolicy.class.getName());
+    }
+
+    @Override
+    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+        // The entire purpose of this method impl is to get the existing rows for the
+        // table rows being indexed into the block cache, as the index maintenance code
+        // does a point scan per row.
+        // TODO: provide a means for the transactional case to just return the Scanner
+        // for when this is executed as it seems like that would be more efficient.
+        IndexUtil.loadMutatingRowsIntoBlockCache(this.env.getRegion(), getCodec(), miniBatchOp, useRawScanToPrimeBlockCache());
+    }
+
+    private PhoenixIndexCodec getCodec() {
+        return (PhoenixIndexCodec)this.codec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
new file mode 100644
index 0000000..fa70cc9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTxIndexFailurePolicy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
+
+import com.google.common.collect.Multimap;
+
+public class PhoenixTxIndexFailurePolicy implements IndexFailurePolicy {
+    private Stoppable parent;
+
+    @Override
+    public void stop(String why) {
+        if (parent != null) {
+            parent.stop(why);
+        }
+    }
+
+    @Override
+    public boolean isStopped() {
+        return parent == null ? false : parent.isStopped();
+    }
+
+    @Override
+    public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+        this.parent = parent;
+    }
+
+    @Override
+    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause)
+            throws IOException {
+        if (cause instanceof IOException) {
+            throw (IOException)cause;
+        } else if (cause instanceof RuntimeException) { throw (RuntimeException)cause; }
+        throw new IOException(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 5abf151..c14aa60 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -107,11 +107,12 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
 import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.index.PhoenixTxIndexBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
@@ -696,6 +697,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
             }
+            boolean isTransactional = Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name()));
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
             // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
@@ -705,8 +707,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     && !SchemaUtil.isStatsTable(tableName)
                     && !descriptor.hasCoprocessor(Indexer.class.getName())) {
                 Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
-                opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
-                Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+                opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+                Indexer.enableIndexing(descriptor, isTransactional ? PhoenixTxIndexBuilder.class : PhoenixIndexBuilder.class, opts, priority);
             }
             if (SchemaUtil.isStatsTable(tableName) && !descriptor.hasCoprocessor(MultiRowMutationEndpoint.class.getName())) {
                 descriptor.addCoprocessor(MultiRowMutationEndpoint.class.getName(),
@@ -743,7 +745,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 }
             }
             
-            if (Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
+            if (isTransactional && !descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
                 descriptor.addCoprocessor(TransactionProcessor.class.getName(), null, priority - 10, null);
             }
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
index 6fc480e..1f271f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.schema.tuple;
 
-import static org.apache.phoenix.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary;
-
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.HConstants;
@@ -28,7 +26,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 /**
  * 
@@ -56,13 +53,22 @@ public class ValueGetterTuple extends BaseTuple {
 
     @Override
     public KeyValue getValue(byte[] family, byte[] qualifier) {
-    	ImmutableBytesPtr value = null;
+        ImmutableBytesWritable value = null;
         try {
             value = valueGetter.getLatestValue(new ColumnReference(family, qualifier));
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-    	return new KeyValue(valueGetter.getRowKey(), family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Put, value!=null? copyBytesIfNecessary(value) : null);
+        byte[] rowKey = valueGetter.getRowKey();
+        int valueOffset = 0;
+        int valueLength = 0;
+        byte[] valueBytes = HConstants.EMPTY_BYTE_ARRAY;
+        if (value != null) {
+            valueBytes = value.get();
+            valueOffset = value.getOffset();
+            valueLength = value.getLength();
+        }
+    	return new KeyValue(rowKey, 0, rowKey.length, family, 0, family.length, qualifier, 0, qualifier.length, HConstants.LATEST_TIMESTAMP, Type.Put, valueBytes, valueOffset, valueLength);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 49956f9..1fcf16a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -24,6 +24,8 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -41,12 +43,15 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.IndexStatementRewriter;
+import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -60,13 +65,16 @@ import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -235,7 +243,7 @@ public class IndexUtil {
                     	}
         
                         @Override
-                        public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+                        public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
                             // Always return null for our empty key value, as this will cause the index
                             // maintainer to always treat this Put as a new row.
                             if (isEmptyKeyValue(table, ref)) {
@@ -636,4 +644,58 @@ public class IndexUtil {
         return col.getExpressionStr() == null ? IndexUtil.getCaseSensitiveDataColumnFullName(col.getName().getString())
                 : col.getExpressionStr();
     }
+    
+    /*
+     * The entire purpose of this method is to get the existing rows for the table rows being indexed into the
+     * block cache, as the index maintenance code does a point scan per row. Though for the transactional
+     * case we may be loading more than we need, since we're not applying the transaction filters, that
+     * should still be ok.
+     */
+    public static void loadMutatingRowsIntoBlockCache(HRegion region, PhoenixIndexCodec codec, MiniBatchOperationInProgress<Mutation> miniBatchOp, boolean useRawScan) 
+    throws IOException {
+        List<KeyRange> keys = Lists.newArrayListWithExpectedSize(miniBatchOp.size());
+        Map<ImmutableBytesWritable, IndexMaintainer> maintainers =
+                new HashMap<ImmutableBytesWritable, IndexMaintainer>();
+        ImmutableBytesWritable indexTableName = new ImmutableBytesWritable();
+        for (int i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));
+            List<IndexMaintainer> indexMaintainers = codec.getIndexMetaData(m.getAttributesMap()).getIndexMaintainers();
+            
+            for(IndexMaintainer indexMaintainer: indexMaintainers) {
+                if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue;
+                indexTableName.set(indexMaintainer.getIndexTableName());
+                if (maintainers.get(indexTableName) != null) continue;
+                maintainers.put(indexTableName, indexMaintainer);
+            }
+            
+        }
+        if (maintainers.isEmpty()) return;
+        Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values()));
+        scan.setRaw(useRawScan);
+        ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN);
+        scanRanges.initializeScan(scan);
+        scan.setFilter(scanRanges.getSkipScanFilter());
+        RegionScanner scanner = region.getScanner(scan);
+        // Run through the scanner using internal nextRaw method
+        region.startRegionOperation();
+        try {
+            synchronized (scanner) {
+                boolean hasMore;
+                do {
+                    List<Cell> results = Lists.newArrayList();
+                    // Results are potentially returned even when the return value of s.next is
+                    // false since this is an indication of whether or not there are more values
+                    // after the ones returned
+                    hasMore = scanner.nextRaw(results);
+                } while (hasMore);
+            }
+        } finally {
+            try {
+                scanner.close();
+            } finally {
+                region.closeRegionOperation();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
index cb63380..7e73a81 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/CoveredIndexCodecForTesting.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered;
 
@@ -24,51 +16,50 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import org.apache.phoenix.hbase.index.covered.IndexCodec;
-import org.apache.phoenix.hbase.index.covered.IndexUpdate;
-import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.index.BaseIndexCodec;
 
 /**
- * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless
- * of the current tables' state.
+ * An {@link IndexCodec} for testing that allow you to specify the index updates/deletes, regardless of the current
+ * tables' state.
  */
 public class CoveredIndexCodecForTesting extends BaseIndexCodec {
 
-  private List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
-  private List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+    private List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+    private List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+
+    public void addIndexDelete(IndexUpdate... deletes) {
+        this.deletes.addAll(Arrays.asList(deletes));
+    }
+
+    public void addIndexUpserts(IndexUpdate... updates) {
+        this.updates.addAll(Arrays.asList(updates));
+    }
+
+    public void clear() {
+        this.deletes.clear();
+        this.updates.clear();
+    }
 
-  public void addIndexDelete(IndexUpdate... deletes) {
-    this.deletes.addAll(Arrays.asList(deletes));
-  }
-  
-  public void addIndexUpserts(IndexUpdate... updates) {
-    this.updates.addAll(Arrays.asList(updates));
-  }
+    @Override
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+        return this.deletes;
+    }
 
-  public void clear() {
-    this.deletes.clear();
-    this.updates.clear();
-  }
-  
-  @Override
-  public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
-    return this.deletes;
-  }
+    @Override
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+        return this.updates;
+    }
 
-  @Override
-  public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
-    return this.updates;
-  }
+    @Override
+    public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+        // noop
+    }
 
-  @Override
-  public void initialize(RegionCoprocessorEnvironment env) throws IOException {
-    // noop
-  }
+    @Override
+    public boolean isEnabled(Mutation m) {
+        return true;
+    }
 
-  @Override
-  public boolean isEnabled(Mutation m) {
-    return true;
-  }
+    @Override
+    public void setContext(TableState state, Mutation mutation) throws IOException {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
index 8c15551..3024820 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
@@ -23,9 +23,10 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -33,17 +34,14 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import org.apache.phoenix.hbase.index.covered.IndexUpdate;
-import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  *
@@ -86,7 +84,7 @@ public class TestLocalTableState {
     LocalHBaseState state = new LocalTable(env);
     LocalTableState table = new LocalTableState(env, state, m);
     //add the kvs from the mutation
-    table.addPendingUpdates(KeyValueUtil.ensureKeyValues(m.get(fam, qual)));
+    table.addPendingUpdates(m.get(fam, qual));
 
     // setup the lookup
     ColumnReference col = new ColumnReference(fam, qual);
@@ -128,8 +126,8 @@ public class TestLocalTableState {
     LocalHBaseState state = new LocalTable(env);
     LocalTableState table = new LocalTableState(env, state, m);
     // add the kvs from the mutation
-    KeyValue kv = KeyValueUtil.ensureKeyValue(m.get(fam, qual).get(0));
-    kv.setMvccVersion(0);
+    Cell kv = m.get(fam, qual).get(0);
+    KeyValueUtil.ensureKeyValue(kv).setMvccVersion(0);
     table.addPendingUpdates(kv);
 
     // setup the lookup

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
index 1b61ef0..f0c1483 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
@@ -32,26 +32,22 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import com.google.common.collect.Lists;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.example.ColumnGroup;
-import org.apache.phoenix.hbase.index.covered.example.CoveredColumn;
-import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec;
 import org.apache.phoenix.hbase.index.covered.example.CoveredColumnIndexCodec.ColumnEntry;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
 
 public class TestCoveredColumnIndexCodec {
   private static final byte[] PK = new byte[] { 'a' };
@@ -190,7 +186,7 @@ public class TestCoveredColumnIndexCodec {
 
     // get the updates with the pending update
     state.setCurrentTimestamp(1);
-    state.addPendingUpdates(KeyValueUtil.ensureKeyValues(kvs));
+    state.addPendingUpdates(kvs);
     updates = codec.getIndexUpserts(state);
     assertTrue("Didn't find index updates for pending primary table update!", updates.iterator()
         .hasNext());
@@ -243,7 +239,7 @@ public class TestCoveredColumnIndexCodec {
     LocalTableState state = new LocalTableState(env, table, d);
     state.setCurrentTimestamp(d.getTimeStamp());
     // now we shouldn't see anything when getting the index update
-    state.addPendingUpdates(KeyValueUtil.ensureKeyValues(d.getFamilyCellMap().get(FAMILY)));
+    state.addPendingUpdates(d.getFamilyCellMap().get(FAMILY));
     Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
     for (IndexUpdate update : updates) {
       assertFalse("Had some index updates, though it should have been covered by the delete",

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
index 592ac7c..6e1c28f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
@@ -73,7 +73,7 @@ public class IndexMaintainerTest  extends BaseConnectionlessQueryTest {
         return new ValueGetter() {
 
             @Override
-            public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+            public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
                 return new ImmutableBytesPtr(valueMap.get(ref));
             }
 


[2/3] phoenix git commit: Secondary indexing with txns

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
new file mode 100644
index 0000000..79a485c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.data.LocalTable;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+
+/**
+ * Build covered indexes for phoenix updates.
+ * <p>
+ * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't need to do any extra
+ * synchronization in the IndexBuilder.
+ * <p>
+ * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or flush, leading to a
+ * bloated index that needs to be cleaned up by a background process.
+ */
+public class NonTxIndexBuilder extends BaseIndexBuilder {
+    private static final Log LOG = LogFactory.getLog(NonTxIndexBuilder.class);
+
+    protected LocalHBaseState localTable;
+
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        super.setup(env);
+        this.localTable = new LocalTable(env);
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
+        // build the index updates for each group
+        IndexUpdateManager updateMap = new IndexUpdateManager();
+
+        batchMutationAndAddUpdates(updateMap, mutation);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Found index updates for Mutation: " + mutation + "\n" + updateMap);
+        }
+
+        return updateMap.toMap();
+    }
+
+    /**
+     * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each key-value in the
+     * update to see if it matches the others. Generally, this will be the case, but you can add kvs to a mutation that
+     * don't all have the timestamp, so we need to manage everything in batches based on timestamp.
+     * <p>
+     * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
+     * 
+     * @param updateMap
+     *            index updates into which to add new updates. Modified as a side-effect.
+     * @param state
+     *            current state of the row for the mutation.
+     * @param m
+     *            mutation to batch
+     * @throws IOException
+     */
+    private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
+        // split the mutation into timestamp-based batches
+        Collection<Batch> batches = createTimestampBatchesFromMutation(m);
+
+        // create a state manager, so we can manage each batch
+        LocalTableState state = new LocalTableState(env, localTable, m);
+
+        // go through each batch of keyvalues and build separate index entries for each
+        boolean cleanupCurrentState = true;
+        for (Batch batch : batches) {
+            /*
+             * We have to split the work between the cleanup and the update for each group because when we update the
+             * current state of the row for the current batch (appending the mutations for the current batch) the next
+             * group will see that as the current state, which will can cause the a delete and a put to be created for
+             * the next group.
+             */
+            if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
+                cleanupCurrentState = false;
+            }
+        }
+    }
+
+    /**
+     * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any {@link KeyValue} with a timestamp
+     * == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called.
+     * 
+     * @param m
+     *            {@link Mutation} from which to extract the {@link KeyValue}s
+     * @return the mutation, broken into batches and sorted in ascending order (smallest first)
+     */
+    protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
+        Map<Long, Batch> batches = new HashMap<Long, Batch>();
+        for (List<Cell> family : m.getFamilyCellMap().values()) {
+            List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
+            createTimestampBatchesFromKeyValues(familyKVs, batches);
+        }
+        // sort the batches
+        List<Batch> sorted = new ArrayList<Batch>(batches.values());
+        Collections.sort(sorted, new Comparator<Batch>() {
+            @Override
+            public int compare(Batch o1, Batch o2) {
+                return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+            }
+        });
+        return sorted;
+    }
+
+    /**
+     * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any {@link KeyValue} with a
+     * timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at the time the method is called.
+     * 
+     * @param kvs
+     *            {@link KeyValue}s to break into batches
+     * @param batches
+     *            to update with the given kvs
+     */
+    protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs, Map<Long, Batch> batches) {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        byte[] nowBytes = Bytes.toBytes(now);
+
+        // batch kvs by timestamp
+        for (KeyValue kv : kvs) {
+            long ts = kv.getTimestamp();
+            // override the timestamp to the current time, so the index and primary tables match
+            // all the keys with LATEST_TIMESTAMP will then be put into the same batch
+            if (kv.updateLatestStamp(nowBytes)) {
+                ts = now;
+            }
+            Batch batch = batches.get(ts);
+            if (batch == null) {
+                batch = new Batch(ts);
+                batches.put(ts, batch);
+            }
+            batch.add(kv);
+        }
+    }
+
+    /**
+     * For a single batch, get all the index updates and add them to the updateMap
+     * <p>
+     * This method manages cleaning up the entire history of the row from the given timestamp forward for out-of-order
+     * (e.g. 'back in time') updates.
+     * <p>
+     * If things arrive out of order (client is using custom timestamps) we should still see the index in the correct
+     * order (assuming we scan after the out-of-order update in finished). Therefore, we when we aren't the most recent
+     * update to the index, we need to delete the state at the current timestamp (similar to above), but also issue a
+     * delete for the added index updates at the next newest timestamp of any of the columns in the update; we need to
+     * cleanup the insert so it looks like it was also deleted at that next newest timestamp. However, its not enough to
+     * just update the one in front of us - that column will likely be applied to index entries up the entire history in
+     * front of us, which also needs to be fixed up.
+     * <p>
+     * However, the current update usually will be the most recent thing to be added. In that case, all we need to is
+     * issue a delete for the previous index row (the state of the row, without the update applied) at the current
+     * timestamp. This gets rid of anything currently in the index for the current state of the row (at the timestamp).
+     * Then we can just follow that by applying the pending update and building the index update based on the new row
+     * state.
+     * 
+     * @param updateMap
+     *            map to update with new index elements
+     * @param batch
+     *            timestamp-based batch of edits
+     * @param state
+     *            local state to update and pass to the codec
+     * @param requireCurrentStateCleanup
+     *            <tt>true</tt> if we should should attempt to cleanup the current state of the table, in the event of a
+     *            'back in time' batch. <tt>false</tt> indicates we should not attempt the cleanup, e.g. an earlier
+     *            batch already did the cleanup.
+     * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt>
+     *         otherwise
+     * @throws IOException
+     */
+    private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state,
+            boolean requireCurrentStateCleanup) throws IOException {
+
+        // need a temporary manager for the current batch. It should resolve any conflicts for the
+        // current batch. Essentially, we can get the case where a batch doesn't change the current
+        // state of the index (all Puts are covered by deletes), in which case we don't want to add
+        // anything
+        // A. Get the correct values for the pending state in the batch
+        // A.1 start by cleaning up the current state - as long as there are key-values in the batch
+        // that are indexed, we need to change the current state of the index. Its up to the codec to
+        // determine if we need to make any cleanup given the pending update.
+        long batchTs = batch.getTimestamp();
+        state.setPendingUpdates(batch.getKvs());
+        addCleanupForCurrentBatch(updateMap, batchTs, state);
+
+        // A.2 do a single pass first for the updates to the current state
+        state.applyPendingUpdates();
+        long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
+        // if all the updates are the latest thing in the index, we are done - don't go and fix history
+        if (ColumnTracker.isNewestTime(minTs)) { return false; }
+
+        // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
+        // index. after this, we have the correct view of the index, from the batch up to the index
+        while (!ColumnTracker.isNewestTime(minTs)) {
+            minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
+        }
+
+        // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
+        if (requireCurrentStateCleanup) {
+            // roll back the pending update. This is needed so we can remove all the 'old' index entries.
+            // We don't need to do the puts here, but just the deletes at the given timestamps since we
+            // just want to completely hide the incorrect entries.
+            state.rollback(batch.getKvs());
+            // setup state
+            state.setPendingUpdates(batch.getKvs());
+
+            // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
+            // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
+            // because the update may have a different set of columns or value based on the update).
+            cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
+
+            // have to roll the state forward again, so the current state is correct
+            state.applyPendingUpdates();
+            return true;
+        }
+        return false;
+    }
+
+    private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap)
+            throws IOException {
+        state.setCurrentTimestamp(ts);
+        ts = addCurrentStateMutationsForBatch(updateMap, state);
+        return ts;
+    }
+
+    private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs, LocalTableState state)
+            throws IOException {
+        // get the cleanup for the current state
+        state.setCurrentTimestamp(batchTs);
+        addDeleteUpdatesToMap(updateMap, state, batchTs);
+        // ignore any index tracking from the delete
+        state.resetTrackedColumns();
+    }
+
+    /**
+     * Add the necessary mutations for the pending batch on the local state. Handles rolling up through history to
+     * determine the index changes after applying the batch (for the case where the batch is back in time).
+     * 
+     * @param updateMap
+     *            to update with index mutations
+     * @param batch
+     *            to apply to the current state
+     * @param state
+     *            current state of the table
+     * @return the minimum timestamp across all index columns requested. If {@link ColumnTracker#isNewestTime(long)}
+     *         returns <tt>true</tt> on the returned timestamp, we know that this <i>was not a back-in-time update</i>.
+     * @throws IOException
+     */
+    private long addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state)
+            throws IOException {
+
+        // get the index updates for this current batch
+        Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
+        state.resetTrackedColumns();
+
+        /*
+         * go through all the pending updates. If we are sure that all the entries are the latest timestamp, we can just
+         * add the index updates and move on. However, if there are columns that we skip past (based on the timestamp of
+         * the batch), we need to roll back up the history. Regardless of whether or not they are the latest timestamp,
+         * the entries here are going to be correct for the current batch timestamp, so we add them to the updates. The
+         * only thing we really care about it if we need to roll up the history and fix it as we go.
+         */
+        // timestamp of the next update we need to track
+        long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+        List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
+        for (IndexUpdate update : upserts) {
+            // this is the one bit where we check the timestamps
+            final ColumnTracker tracker = update.getIndexedColumns();
+            long trackerTs = tracker.getTS();
+            // update the next min TS we need to track
+            if (trackerTs < minTs) {
+                minTs = tracker.getTS();
+            }
+            // track index hints for the next round. Hint if we need an update for that column for the
+            // next timestamp. These columns clearly won't need to update as we go through time as they
+            // already match the most recent possible thing.
+            boolean needsCleanup = false;
+            if (tracker.hasNewerTimestamps()) {
+                columnHints.add(tracker);
+                // this update also needs to be cleaned up at the next timestamp because it not the latest.
+                needsCleanup = true;
+            }
+
+            // only make the put if the index update has been setup
+            if (update.isValid()) {
+                byte[] table = update.getTableName();
+                Mutation mutation = update.getUpdate();
+                updateMap.addIndexUpdate(table, mutation);
+
+                // only make the cleanup if we made a put and need cleanup
+                if (needsCleanup) {
+                    // there is a TS for the interested columns that is greater than the columns in the
+                    // put. Therefore, we need to issue a delete at the same timestamp
+                    Delete d = new Delete(mutation.getRow());
+                    d.setTimestamp(tracker.getTS());
+                    updateMap.addIndexUpdate(table, d);
+                }
+            }
+        }
+        return minTs;
+    }
+
+    /**
+     * Cleanup the index based on the current state from the given batch. Iterates over each timestamp (for the indexed
+     * rows) for the current state of the table and cleans up all the existing entries generated by the codec.
+     * <p>
+     * Adds all pending updates to the updateMap
+     * 
+     * @param updateMap
+     *            updated with the pending index updates from the codec
+     * @param batchTs
+     *            timestamp from which we should cleanup
+     * @param state
+     *            current state of the primary table. Should already by setup to the correct state from which we want to
+     *            cleanup.
+     * @throws IOException
+     */
+    private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state)
+            throws IOException {
+        // get the cleanup for the current state
+        state.setCurrentTimestamp(batchTs);
+        addDeleteUpdatesToMap(updateMap, state, batchTs);
+        Set<ColumnTracker> trackers = state.getTrackedColumns();
+        long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+        for (ColumnTracker tracker : trackers) {
+            if (tracker.getTS() < minTs) {
+                minTs = tracker.getTS();
+            }
+        }
+        state.resetTrackedColumns();
+        if (!ColumnTracker.isNewestTime(minTs)) {
+            state.setHints(Lists.newArrayList(trackers));
+            cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
+        }
+    }
+
+    /**
+     * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then add them to the
+     * update map.
+     * <p>
+     * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates applied, etc).
+     * 
+     * @throws IOException
+     */
+    protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts)
+            throws IOException {
+        Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
+        if (cleanup != null) {
+            for (IndexUpdate d : cleanup) {
+                if (!d.isValid()) {
+                    continue;
+                }
+                // override the timestamps in the delete to match the current batch.
+                Delete remove = (Delete)d.getUpdate();
+                remove.setTimestamp(ts);
+                updateMap.addIndexUpdate(d.getTableName(), remove);
+            }
+        }
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+            throws IOException {
+        // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
+        return null;
+    }
+
+    @Override
+    protected boolean useRawScanToPrimeBlockCache() {
+        return false;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index 4c4d0b0..b8b2f19 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -23,14 +23,13 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Pair;
-
+import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.scanner.Scanner;
 
 /**
  * Interface for the current state of the table. This is generally going to be as of a timestamp - a
@@ -52,46 +51,14 @@ public interface TableState {
   public long getCurrentTimestamp();
 
   /**
-   * Set the current timestamp up to which the table should allow access to the underlying table.
-   * This overrides the timestamp view provided by the indexer - use with care!
-   * @param timestamp timestamp up to which the table should allow access.
-   */
-  public void setCurrentTimestamp(long timestamp);
-
-  /**
    * @return the attributes attached to the current update (e.g. {@link Mutation}).
    */
   public Map<String, byte[]> getUpdateAttributes();
 
   /**
-   * Get a scanner on the columns that are needed by the index.
-   * <p>
-   * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given
-   * columns with a timestamp earlier than the timestamp to which the table is currently set (the
-   * current state of the table for which we need to build an update).
-   * <p>
-   * If none of the passed columns matches any of the columns in the pending update (as determined
-   * by {@link ColumnReference#matchesFamily(byte[])} and
-   * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This
-   * is because it doesn't make sense to build index updates when there is no change in the table
-   * state for any of the columns you are indexing.
-   * <p>
-   * <i>NOTE:</i> This method should <b>not</b> be used during
-   * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been
-   * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
-   * need to track the indexed columns.
-   * <p>
-   * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you
-   * request - you will never see a column with the timestamp we are tracking, but the next oldest
-   * timestamp for that column.
-   * @param indexedColumns the columns to that will be indexed
-   * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
-   *         the builder. Even if no update is necessary for the requested columns, you still need
-   *         to return the {@link IndexUpdate}, just don't set the update for the
-   *         {@link IndexUpdate}.
-   * @throws IOException
+   * Get a getter interface for the state of the index row
    */
-  Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+  Pair<ValueGetter, IndexUpdate> getIndexUpdateState(
       Collection<? extends ColumnReference> indexedColumns) throws IOException;
 
   /**
@@ -112,5 +79,7 @@ public interface TableState {
    * Can be used to help the codec to determine which columns it should attempt to index.
    * @return the keyvalues in the pending update to the table.
    */
-  Collection<KeyValue> getPendingUpdate();
+  Collection<Cell> getPendingUpdate();
+  
+  Map<String,Object> getContext();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
new file mode 100644
index 0000000..d90edc1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TxIndexBuilder.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import co.cask.tephra.Transaction;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TxIndexBuilder extends BaseIndexBuilder {
+    private static final Log LOG = LogFactory.getLog(TxIndexBuilder.class);
+    public static final String TRANSACTION = "TRANSACTION";
+    private TransactionAwareHTable txTable;
+    
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        super.setup(env);
+        HTableInterface htable = env.getTable(env.getRegion().getRegionInfo().getTable());
+        this.txTable = new TransactionAwareHTable(htable); // TODO: close?
+    }
+
+    @Override
+    public void stop(String why) {
+        try {
+            if (this.txTable != null) txTable.close();
+        } catch (IOException e) {
+            LOG.warn("Unable to close txTable", e);
+        } finally {
+            super.stop(why);
+        }
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
+        // get the index updates for this current batch
+        TxTableState state = new TxTableState(mutation);
+        codec.setContext(state, mutation);
+        Transaction tx = (Transaction)state.getContext().get(TRANSACTION);
+        state.setCurrentTransaction(tx);
+        Collection<Pair<Mutation, byte[]>> indexUpdates = Lists.newArrayListWithExpectedSize(2);
+        Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state);
+        for (IndexUpdate delete : deletes) {
+            indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
+        }
+        state.addPendingUpdates(mutation);
+        // TODO: state may need to maintain the old state with the new state super imposed
+        // An alternate easier way would be to calculate the state after the data mutations
+        // have been applied.
+        Iterable<IndexUpdate> updates = codec.getIndexUpserts(state);
+        for (IndexUpdate update : updates) {
+            indexUpdates.add(new Pair<Mutation, byte[]>(update.getUpdate(),update.getTableName()));
+        }
+        return indexUpdates;
+    }
+
+    private class TxTableState implements TableState {
+        private Put put;
+        private Map<String, byte[]> attributes;
+        private List<Cell> pendingUpdates = Lists.newArrayList();
+        private Transaction transaction;
+        private final Map<String,Object> context = Maps.newHashMap();
+        
+        public TxTableState(Mutation m) {
+            this.put = new Put(m.getRow());
+            this.attributes = m.getAttributesMap();
+        }
+        
+        @Override
+        public RegionCoprocessorEnvironment getEnvironment() {
+            return env;
+        }
+
+        @Override
+        public long getCurrentTimestamp() {
+            return transaction.getReadPointer();
+        }
+
+        @Override
+        public Map<String, byte[]> getUpdateAttributes() {
+            return attributes;
+        }
+
+        @Override
+        public byte[] getCurrentRowKey() {
+            return put.getRow();
+        }
+
+        @Override
+        public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+            return Collections.emptyList();
+        }
+
+        public void addPendingUpdate(Cell cell) throws IOException {
+            put.add(cell);
+        }
+        
+        public void addPendingUpdates(Mutation m) throws IOException {
+            if (m instanceof Delete) {
+                put.getFamilyCellMap().clear();
+            } else {
+                CellScanner scanner = m.cellScanner();
+                while (scanner.advance()) {
+                    Cell cell = scanner.current();
+                    if (cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+                        byte[] family = CellUtil.cloneFamily(cell);
+                        byte[] qualifier = CellUtil.cloneQualifier(cell);
+                        put.add(family, qualifier, HConstants.EMPTY_BYTE_ARRAY);
+                    } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
+                        byte[] family = CellUtil.cloneFamily(cell);
+                        put.getFamilyCellMap().remove(family);
+                    } else {
+                        put.add(cell);
+                    }
+                }
+            }
+        }
+        
+        @Override
+        public Collection<Cell> getPendingUpdate() {
+            return pendingUpdates;
+        }
+
+        public void setCurrentTransaction(Transaction tx) {
+            this.transaction = tx;
+        }
+
+        @Override
+        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+                throws IOException {
+            ColumnTracker tracker = new ColumnTracker(indexedColumns);
+            IndexUpdate indexUpdate = new IndexUpdate(tracker);
+            final byte[] rowKey = getCurrentRowKey();
+            if (!pendingUpdates.isEmpty()) {
+                final Map<ColumnReference, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
+                        .size());
+                for (Cell kv : pendingUpdates) {
+                    // create new pointers to each part of the kv
+                    ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
+                    valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
+                }
+                ValueGetter getter = new ValueGetter() {
+                    @Override
+                    public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
+                        // TODO: from IndexMaintainer we return null if ref is empty key value. Needed?
+                        return valueMap.get(ref);
+                    }
+                    @Override
+                    public byte[] getRowKey() {
+                        return rowKey;
+                    }
+                };
+                return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
+            }
+            // Establish initial state of table by finding the old values
+            // We'll apply the Mutation to this next
+            Get get = new Get(rowKey);
+            get.setMaxVersions();
+            for (ColumnReference ref : indexedColumns) {
+                get.addColumn(ref.getFamily(), ref.getQualifier());
+            }
+            txTable.startTx(transaction);
+            final Result result = txTable.get(get);
+            ValueGetter getter = new ValueGetter() {
+
+                @Override
+                public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
+                    Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+                    if (cell == null) {
+                        return null;
+                    }
+                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                    ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                    return ptr;
+                }
+
+                @Override
+                public byte[] getRowKey() {
+                    return rowKey;
+                }
+                
+            };
+            for (ColumnReference ref : indexedColumns) {
+                Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+                if (cell != null) {
+                    addPendingUpdate(cell);
+                }
+            }
+            return new Pair<ValueGetter, IndexUpdate>(getter, indexUpdate);
+        }
+
+        @Override
+        public Map<String, Object> getContext() {
+            return context;
+        }
+        
+    }
+
+    @Override
+    protected boolean useRawScanToPrimeBlockCache() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
index 96a7410..52076a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
@@ -22,8 +22,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
-
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
@@ -36,7 +38,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 public class LazyValueGetter implements ValueGetter {
 
   private Scanner scan;
-  private volatile Map<ColumnReference, ImmutableBytesPtr> values;
+  private volatile Map<ColumnReference, ImmutableBytesWritable> values;
   private byte[] row;
   
   /**
@@ -50,16 +52,16 @@ public class LazyValueGetter implements ValueGetter {
   }
 
   @Override
-  public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException {
+  public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException {
     // ensure we have a backing map
     if (values == null) {
       synchronized (this) {
-        values = Collections.synchronizedMap(new HashMap<ColumnReference, ImmutableBytesPtr>());
+        values = Collections.synchronizedMap(new HashMap<ColumnReference, ImmutableBytesWritable>());
       }
     }
 
     // check the value in the map
-    ImmutableBytesPtr value = values.get(ref);
+    ImmutableBytesWritable value = values.get(ref);
     if (value == null) {
       value = get(ref);
       values.put(ref, value);
@@ -78,9 +80,9 @@ public class LazyValueGetter implements ValueGetter {
       return null;
     }
     // there is a next value - we only care about the current value, so we can just snag that
-    KeyValue next = scan.next();
-    if (ref.matches(next)) {
-      return new ImmutableBytesPtr(next.getBuffer(), next.getValueOffset(), next.getValueLength());
+    Cell next = scan.next();
+    if (ref.matches(KeyValueUtil.ensureKeyValue(next))) {
+      return new ImmutableBytesPtr(next.getValueArray(), next.getValueOffset(), next.getValueLength());
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index aa209a5..d4bd460 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -1,19 +1,11 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered.example;
 
@@ -24,6 +16,7 @@ import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -31,336 +24,343 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.TableState;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.index.BaseIndexCodec;
 
+import com.google.common.collect.Lists;
+
 /**
  *
  */
 public class CoveredColumnIndexCodec extends BaseIndexCodec {
 
-  private static final byte[] EMPTY_BYTES = new byte[0];
-  public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS");
-
-  private List<ColumnGroup> groups;
-
-  /**
-   * @param groups to initialize the codec with
-   * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing
-   *         purposes
-   */
-  public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) {
-    CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
-    codec.groups = Lists.newArrayList(groups);
-    return codec;
-  }
-
-  @Override
-  public void initialize(RegionCoprocessorEnvironment env) {
-    groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
-  }
-
-  @Override
-  public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
-    List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
-    for (ColumnGroup group : groups) {
-      IndexUpdate update = getIndexUpdateForGroup(group, state);
-      updates.add(update);
-    }
-    return updates;
-  }
-
-  /**
-   * @param group
-   * @param state
-   * @return the update that should be made to the table
-   */
-  private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) {
-    List<CoveredColumn> refs = group.getColumns();
-    try {
-      Pair<Scanner, IndexUpdate> stateInfo = state.getIndexedColumnsTableState(refs);
-      Scanner kvs = stateInfo.getFirst();
-      Pair<Integer, List<ColumnEntry>> columns =
-          getNextEntries(refs, kvs, state.getCurrentRowKey());
-      // make sure we close the scanner
-      kvs.close();
-      if (columns.getFirst().intValue() == 0) {
-        return stateInfo.getSecond();
-      }
-      // have all the column entries, so just turn it into a Delete for the row
-      // convert the entries to the needed values
-      byte[] rowKey =
-          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
-      Put p = new Put(rowKey, state.getCurrentTimestamp());
-      // add the columns to the put
-      addColumnsToPut(p, columns.getSecond());
-
-      // update the index info
-      IndexUpdate update = stateInfo.getSecond();
-      update.setTable(Bytes.toBytes(group.getTable()));
-      update.setUpdate(p);
-      return update;
-    } catch (IOException e) {
-      throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
-    }
-  }
-
-  private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) {
-    // add each of the corresponding families to the put
-    int count = 0;
-    for (ColumnEntry column : columns) {
-      indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
-        ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null);
+    private static final byte[] EMPTY_BYTES = new byte[0];
+    public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS");
+
+    private List<ColumnGroup> groups;
+
+    /**
+     * @param groups
+     *            to initialize the codec with
+     * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing purposes
+     */
+    public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) {
+        CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
+        codec.groups = Lists.newArrayList(groups);
+        return codec;
     }
-  }
-
-  private static byte[] toIndexQualifier(CoveredColumn column) {
-    return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR),
-      column.getQualifier());
-  }
-
-  @Override
-  public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
-    List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
-    for (ColumnGroup group : groups) {
-      deletes.add(getDeleteForGroup(group, state));
+
+    @Override
+    public void initialize(RegionCoprocessorEnvironment env) {
+        groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
     }
-    return deletes;
-  }
-
-
-  /**
-   * Get all the deletes necessary for a group of columns - logically, the cleanup the index table
-   * for a given index.
-   * @param group index information
-   * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary
-   */
-  private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
-    List<CoveredColumn> refs = group.getColumns();
-    try {
-      Pair<Scanner, IndexUpdate> kvs = state.getIndexedColumnsTableState(refs);
-      Pair<Integer, List<ColumnEntry>> columns =
-          getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
-      // make sure we close the scanner reference
-      kvs.getFirst().close();
-      // no change, just return the passed update
-      if (columns.getFirst() == 0) {
-        return kvs.getSecond();
-      }
-      // have all the column entries, so just turn it into a Delete for the row
-      // convert the entries to the needed values
-      byte[] rowKey =
-          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
-      Delete d = new Delete(rowKey);
-      d.setTimestamp(state.getCurrentTimestamp());
-      IndexUpdate update = kvs.getSecond();
-      update.setUpdate(d);
-      update.setTable(Bytes.toBytes(group.getTable()));
-      return update;
-    } catch (IOException e) {
-      throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
+
+    @Override
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+        List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+        for (ColumnGroup group : groups) {
+            IndexUpdate update = getIndexUpdateForGroup(group, state);
+            updates.add(update);
+        }
+        return updates;
     }
-  }
-
-  /**
-   * Get the next batch of primary table values for the given columns
-   * @param refs columns to match against
-   * @param state
-   * @return the total length of all values found and the entries to add for the index
-   */
-  private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs,
-      byte[] currentRow) throws IOException {
-    int totalValueLength = 0;
-    List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
-
-    // pull out the latest state for each column reference, in order
-    for (CoveredColumn ref : refs) {
-      KeyValue first = ref.getFirstKeyValueForRow(currentRow);
-      if (!kvs.seek(first)) {
-        // no more keys, so add a null value
-        entries.add(new ColumnEntry(null, ref));
-        continue;
-      }
-      // there is a next value - we only care about the current value, so we can just snag that
-      KeyValue next = kvs.next();
-      if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) {
-        byte[] v = next.getValue();
-        totalValueLength += v.length;
-        entries.add(new ColumnEntry(v, ref));
-      } else {
-        // this first one didn't match at all, so we have to put in a null entry
-        entries.add(new ColumnEntry(null, ref));
-        continue;
-      }
-      // here's where is gets a little tricky - we either need to decide if we should continue
-      // adding entries (matches all qualifiers) or if we are done (matches a single qualifier)
-      if (!ref.allColumns()) {
-        continue;
-      }
-      // matches all columns, so we need to iterate until we hit the next column with the same
-      // family as the current key
-      byte[] lastQual = next.getQualifier();
-      byte[] nextQual = null;
-      while ((next = kvs.next()) != null) {
-        // different family, done with this column
-        if (!ref.matchesFamily(next.getFamily())) {
-          break;
+
+    /**
+     * @param group
+     * @param state
+     * @return the update that should be made to the table
+     */
+    private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) {
+        List<CoveredColumn> refs = group.getColumns();
+        try {
+            Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+            Scanner kvs = stateInfo.getFirst();
+            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey());
+            // make sure we close the scanner
+            kvs.close();
+            if (columns.getFirst().intValue() == 0) { return stateInfo.getSecond(); }
+            // have all the column entries, so just turn it into a Delete for the row
+            // convert the entries to the needed values
+            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
+            Put p = new Put(rowKey, state.getCurrentTimestamp());
+            // add the columns to the put
+            addColumnsToPut(p, columns.getSecond());
+
+            // update the index info
+            IndexUpdate update = stateInfo.getSecond();
+            update.setTable(Bytes.toBytes(group.getTable()));
+            update.setUpdate(p);
+            return update;
+        } catch (IOException e) {
+            throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
         }
-        nextQual = next.getQualifier();
-        // we are still on the same qualifier - skip it, since we already added a column for it
-        if (Arrays.equals(lastQual, nextQual)) {
-          continue;
+    }
+
+    private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) {
+        // add each of the corresponding families to the put
+        int count = 0;
+        for (ColumnEntry column : columns) {
+            indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
+                    ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null);
         }
-        // this must match the qualifier since its an all-qualifiers specifier, so we add it
-        byte[] v = next.getValue();
-        totalValueLength += v.length;
-        entries.add(new ColumnEntry(v, ref));
-        // update the last qualifier to check against
-        lastQual = nextQual;
-      }
     }
-    return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
-  }
 
-  static class ColumnEntry {
-    byte[] value = EMPTY_BYTES;
-    CoveredColumn ref;
+    private static byte[] toIndexQualifier(CoveredColumn column) {
+        return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR), column.getQualifier());
+    }
 
-    public ColumnEntry(byte[] value, CoveredColumn ref) {
-      this.value = value == null ? EMPTY_BYTES : value;
-      this.ref = ref;
+    @Override
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+        List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+        for (ColumnGroup group : groups) {
+            deletes.add(getDeleteForGroup(group, state));
+        }
+        return deletes;
+    }
+
+    /**
+     * Get all the deletes necessary for a group of columns - logically, the cleanup the index table for a given index.
+     * 
+     * @param group
+     *            index information
+     * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary
+     */
+    private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
+        List<CoveredColumn> refs = group.getColumns();
+        try {
+            Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+            Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
+            // make sure we close the scanner reference
+            kvs.getFirst().close();
+            // no change, just return the passed update
+            if (columns.getFirst() == 0) { return kvs.getSecond(); }
+            // have all the column entries, so just turn it into a Delete for the row
+            // convert the entries to the needed values
+            byte[] rowKey = composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
+            Delete d = new Delete(rowKey);
+            d.setTimestamp(state.getCurrentTimestamp());
+            IndexUpdate update = kvs.getSecond();
+            update.setUpdate(d);
+            update.setTable(Bytes.toBytes(group.getTable()));
+            return update;
+        } catch (IOException e) {
+            throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
+        }
     }
-  }
-
-  /**
-   * Compose the final index row key.
-   * <p>
-   * This is faster than adding each value independently as we can just build a single a array and
-   * copy everything over once.
-   * @param pk primary key of the original row
-   * @param length total number of bytes of all the values that should be added
-   * @param values to use when building the key
-   */
-  static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
-    // now build up expected row key, each of the values, in order, followed by the PK and then some
-    // info about lengths so we can deserialize each value
-    byte[] output = new byte[length + pk.length];
-    int pos = 0;
-    int[] lengths = new int[values.size()];
-    int i = 0;
-    for (ColumnEntry entry : values) {
-      byte[] v = entry.value;
-      // skip doing the copy attempt, if we don't need to
-      if (v.length != 0) {
-        System.arraycopy(v, 0, output, pos, v.length);
-        pos += v.length;
-      }
-      lengths[i++] = v.length;
+
+    /**
+     * Get the next batch of primary table values for the given columns
+     * 
+     * @param refs
+     *            columns to match against
+     * @param state
+     * @return the total length of all values found and the entries to add for the index
+     */
+    private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs, byte[] currentRow)
+            throws IOException {
+        int totalValueLength = 0;
+        List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
+
+        // pull out the latest state for each column reference, in order
+        for (CoveredColumn ref : refs) {
+            KeyValue first = ref.getFirstKeyValueForRow(currentRow);
+            if (!kvs.seek(first)) {
+                // no more keys, so add a null value
+                entries.add(new ColumnEntry(null, ref));
+                continue;
+            }
+            // there is a next value - we only care about the current value, so we can just snag that
+            Cell next = kvs.next();
+            if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) {
+                byte[] v = next.getValue();
+                totalValueLength += v.length;
+                entries.add(new ColumnEntry(v, ref));
+            } else {
+                // this first one didn't match at all, so we have to put in a null entry
+                entries.add(new ColumnEntry(null, ref));
+                continue;
+            }
+            // here's where is gets a little tricky - we either need to decide if we should continue
+            // adding entries (matches all qualifiers) or if we are done (matches a single qualifier)
+            if (!ref.allColumns()) {
+                continue;
+            }
+            // matches all columns, so we need to iterate until we hit the next column with the same
+            // family as the current key
+            byte[] lastQual = next.getQualifier();
+            byte[] nextQual = null;
+            while ((next = kvs.next()) != null) {
+                // different family, done with this column
+                if (!ref.matchesFamily(next.getFamily())) {
+                    break;
+                }
+                nextQual = next.getQualifier();
+                // we are still on the same qualifier - skip it, since we already added a column for it
+                if (Arrays.equals(lastQual, nextQual)) {
+                    continue;
+                }
+                // this must match the qualifier since its an all-qualifiers specifier, so we add it
+                byte[] v = next.getValue();
+                totalValueLength += v.length;
+                entries.add(new ColumnEntry(v, ref));
+                // update the last qualifier to check against
+                lastQual = nextQual;
+            }
+        }
+        return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
     }
 
-    // add the primary key to the end of the row key
-    System.arraycopy(pk, 0, output, pos, pk.length);
+    static class ColumnEntry {
+        byte[] value = EMPTY_BYTES;
+        CoveredColumn ref;
 
-    // add the lengths as suffixes so we can deserialize the elements again
-    for (int l : lengths) {
-      output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+        public ColumnEntry(byte[] value, CoveredColumn ref) {
+            this.value = value == null ? EMPTY_BYTES : value;
+            this.ref = ref;
+        }
     }
 
-    // and the last integer is the number of values
-    return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
-  }
-
-  /**
-   * Essentially a short-cut from building a {@link Put}.
-   * @param pk row key
-   * @param timestamp timestamp of all the keyvalues
-   * @param values expected value--column pair
-   * @return a keyvalues that the index contains for a given row at a timestamp with the given value
-   *         -- column pairs.
-   */
-  public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp,
-      List<Pair<byte[], CoveredColumn>> values) {
-  
-    int length = 0;
-    List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
-    for (Pair<byte[], CoveredColumn> value : values) {
-      ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond());
-      length += value.getFirst().length;
-      expected.add(entry);
+    /**
+     * Compose the final index row key.
+     * <p>
+     * This is faster than adding each value independently as we can just build a single a array and copy everything
+     * over once.
+     * 
+     * @param pk
+     *            primary key of the original row
+     * @param length
+     *            total number of bytes of all the values that should be added
+     * @param values
+     *            to use when building the key
+     */
+    static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
+        // now build up expected row key, each of the values, in order, followed by the PK and then some
+        // info about lengths so we can deserialize each value
+        byte[] output = new byte[length + pk.length];
+        int pos = 0;
+        int[] lengths = new int[values.size()];
+        int i = 0;
+        for (ColumnEntry entry : values) {
+            byte[] v = entry.value;
+            // skip doing the copy attempt, if we don't need to
+            if (v.length != 0) {
+                System.arraycopy(v, 0, output, pos, v.length);
+                pos += v.length;
+            }
+            lengths[i++] = v.length;
+        }
+
+        // add the primary key to the end of the row key
+        System.arraycopy(pk, 0, output, pos, pk.length);
+
+        // add the lengths as suffixes so we can deserialize the elements again
+        for (int l : lengths) {
+            output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+        }
+
+        // and the last integer is the number of values
+        return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
     }
-  
-    byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected);
-    Put p = new Put(rowKey, timestamp);
-    CoveredColumnIndexCodec.addColumnsToPut(p, expected);
-    List<KeyValue> kvs = new ArrayList<KeyValue>();
-    for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) {
-      kvs.addAll(entry.getValue());
+
+    /**
+     * Essentially a short-cut from building a {@link Put}.
+     * 
+     * @param pk
+     *            row key
+     * @param timestamp
+     *            timestamp of all the keyvalues
+     * @param values
+     *            expected value--column pair
+     * @return a keyvalues that the index contains for a given row at a timestamp with the given value -- column pairs.
+     */
+    public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp,
+            List<Pair<byte[], CoveredColumn>> values) {
+
+        int length = 0;
+        List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
+        for (Pair<byte[], CoveredColumn> value : values) {
+            ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond());
+            length += value.getFirst().length;
+            expected.add(entry);
+        }
+
+        byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected);
+        Put p = new Put(rowKey, timestamp);
+        CoveredColumnIndexCodec.addColumnsToPut(p, expected);
+        List<KeyValue> kvs = new ArrayList<KeyValue>();
+        for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) {
+            kvs.addAll(entry.getValue());
+        }
+
+        return kvs;
     }
-  
-    return kvs;
-  }
-
-  public static List<byte[]> getValues(byte[] bytes) {
-    // get the total number of keys in the bytes
-    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
-    List<byte[]> keys = new ArrayList<byte[]>(keyCount);
-    int[] lengths = new int[keyCount];
-    int lengthPos = keyCount - 1;
-    int pos = bytes.length - Bytes.SIZEOF_INT;
-    // figure out the length of each key
-    for (int i = 0; i < keyCount; i++) {
-      lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
-      pos -= Bytes.SIZEOF_INT;
+
+    public static List<byte[]> getValues(byte[] bytes) {
+        // get the total number of keys in the bytes
+        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
+        List<byte[]> keys = new ArrayList<byte[]>(keyCount);
+        int[] lengths = new int[keyCount];
+        int lengthPos = keyCount - 1;
+        int pos = bytes.length - Bytes.SIZEOF_INT;
+        // figure out the length of each key
+        for (int i = 0; i < keyCount; i++) {
+            lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+            pos -= Bytes.SIZEOF_INT;
+        }
+
+        int current = 0;
+        for (int length : lengths) {
+            byte[] key = Arrays.copyOfRange(bytes, current, current + length);
+            keys.add(key);
+            current += length;
+        }
+
+        return keys;
     }
 
-    int current = 0;
-    for (int length : lengths) {
-      byte[] key = Arrays.copyOfRange(bytes, current, current + length);
-      keys.add(key);
-      current += length;
+    /**
+     * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
+     * 
+     * @param bytes
+     *            array to read from
+     * @param start
+     *            start point, backwards from which to read. For example, if specifying "25", we would try to read an
+     *            integer from 21 -> 25
+     * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists.
+     */
+    private static int getPreviousInteger(byte[] bytes, int start) {
+        return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
     }
 
-    return keys;
-  }
-
-  /**
-   * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
-   * @param bytes array to read from
-   * @param start start point, backwards from which to read. For example, if specifying "25", we
-   *          would try to read an integer from 21 -> 25
-   * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists.
-   */
-  private static int getPreviousInteger(byte[] bytes, int start) {
-    return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
-  }
-
-  /**
-   * Check to see if an row key just contains a list of null values.
-   * @param bytes row key to examine
-   * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise
-   */
-  public static boolean checkRowKeyForAllNulls(byte[] bytes) {
-    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
-    int pos = bytes.length - Bytes.SIZEOF_INT;
-    for (int i = 0; i < keyCount; i++) {
-      int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
-      if (next > 0) {
-        return false;
-      }
-      pos -= Bytes.SIZEOF_INT;
+    /**
+     * Check to see if an row key just contains a list of null values.
+     * 
+     * @param bytes
+     *            row key to examine
+     * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise
+     */
+    public static boolean checkRowKeyForAllNulls(byte[] bytes) {
+        int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
+        int pos = bytes.length - Bytes.SIZEOF_INT;
+        for (int i = 0; i < keyCount; i++) {
+            int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+            if (next > 0) { return false; }
+            pos -= Bytes.SIZEOF_INT;
+        }
+
+        return true;
     }
 
-    return true;
-  }
+    @Override
+    public boolean isEnabled(Mutation m) {
+        // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this
+        // simple check for the moment.
+        return groups.size() > 0;
+    }
 
-  @Override
-  public boolean isEnabled(Mutation m) {
-    // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this
-    // simple check for the moment.
-    return groups.size() > 0;
-  }
+    @Override
+    public void setContext(TableState state, Mutation mutation) throws IOException {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
index 6ac89d1..48c714d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 
 /**
@@ -136,7 +136,7 @@ public class CoveredColumnIndexSpecifierBuilder {
   void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) throws IOException {
     // add the codec for the index to the map of options
     Map<String, String> opts = this.convertToMap();
-    opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
+    opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
     Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts, Coprocessor.PRIORITY_USER);
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
index f80cf41..de8f752 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
@@ -35,8 +36,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.covered.Batch;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
 
 /**
@@ -90,7 +91,7 @@ import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
  * <b>NOTE:</b> this means that we need to do a lookup (point {@link Get}) of the entire row
  * <i>every time there is a write to the table</i>.
  */
-public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
+public class CoveredColumnIndexer extends NonTxIndexBuilder {
 
   /**
    * Create the specified index table with the necessary columns
@@ -125,9 +126,9 @@ public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
     Collection<Batch> batches = batchByRow(filtered);
 
     for (Batch batch : batches) {
-      KeyValue curKV = batch.getKvs().iterator().next();
+      Cell curKV = batch.getKvs().iterator().next();
       Put p = new Put(curKV.getRowArray(), curKV.getRowOffset(), curKV.getRowLength());
-      for (KeyValue kv : batch.getKvs()) {
+      for (Cell kv : batch.getKvs()) {
         // we only need to cleanup Put entries
         byte type = kv.getTypeByte();
         Type t = KeyValue.Type.codeToType(type);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
index b9f3858..7c69493 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/ColumnTracker.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.hbase.index.covered.update;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 
@@ -45,7 +44,7 @@ public class ColumnTracker implements IndexedColumnGroup {
   public ColumnTracker(Collection<? extends ColumnReference> columns) {
     this.columns = new ArrayList<ColumnReference>(columns);
     // sort the columns
-    Collections.sort(this.columns);
+    // no need to do this: Collections.sort(this.columns);
     this.hashCode = calcHashCode(this.columns);
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
index d09498a..26f620f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/update/IndexUpdateManager.java
@@ -30,12 +30,11 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
 /**
  * Keeps track of the index updates
  */
@@ -182,8 +181,6 @@ public class IndexUpdateManager {
     for (Entry<ImmutableBytesPtr, Collection<Mutation>> updates : map.entrySet()) {
       // get is ok because we always set with just the bytes
       byte[] tableName = updates.getKey().get();
-      // TODO replace this as just storing a byte[], to avoid all the String <-> byte[] swapping
-      // HBase does
       for (Mutation m : updates.getValue()) {
         // skip elements that have been marked for delete
         if (shouldBeRemoved(m)) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
index a28268c..884cca6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.hbase.index.scanner;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
 
 
 /**
@@ -29,17 +29,17 @@ import org.apache.hadoop.hbase.KeyValue;
 public class EmptyScanner implements Scanner {
 
   @Override
-  public KeyValue next() throws IOException {
+  public Cell next() throws IOException {
     return null;
   }
 
   @Override
-  public boolean seek(KeyValue next) throws IOException {
+  public boolean seek(Cell next) throws IOException {
     return false;
   }
 
   @Override
-  public KeyValue peek() throws IOException {
+  public Cell peek() throws IOException {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
index 868e892..9454de5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.hbase.index.scanner;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 
 /**
@@ -33,7 +34,7 @@ public interface Scanner extends Closeable {
    * @return the next keyvalue in the scanner or <tt>null</tt> if there is no next {@link KeyValue}
    * @throws IOException if there is an underlying error reading the data
    */
-  public KeyValue next() throws IOException;
+  public Cell next() throws IOException;
 
   /**
    * Seek to immediately before the given {@link KeyValue}. If that exact {@link KeyValue} is
@@ -43,7 +44,7 @@ public interface Scanner extends Closeable {
    * @return <tt>true</tt> if there are values left in <tt>this</tt>, <tt>false</tt> otherwise
    * @throws IOException if there is an error reading the underlying data.
    */
-  public boolean seek(KeyValue next) throws IOException;
+  public boolean seek(Cell next) throws IOException;
 
   /**
    * Read the {@link KeyValue} at the top of <tt>this</tt> without 'popping' it off the top of the
@@ -51,5 +52,5 @@ public interface Scanner extends Closeable {
    * @return the next {@link KeyValue} or <tt>null</tt> if there are no more values in <tt>this</tt>
    * @throws IOException if there is an error reading the underlying data.
    */
-  public KeyValue peek() throws IOException;
+  public Cell peek() throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index 32e4d84..362ef18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -23,7 +23,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -126,7 +128,7 @@ public class ScannerBuilder {
     return new Scanner() {
 
       @Override
-      public KeyValue next() {
+      public Cell next() {
         try {
           return kvScanner.next();
         } catch (IOException e) {
@@ -135,7 +137,7 @@ public class ScannerBuilder {
       }
 
       @Override
-      public boolean seek(KeyValue next) throws IOException {
+      public boolean seek(Cell next) throws IOException {
         // check to see if the next kv is after the current key, in which case we can use reseek,
         // which will be more efficient
         KeyValue peek = kvScanner.peek();
@@ -143,17 +145,17 @@ public class ScannerBuilder {
         if (peek != null) {
           int compare = KeyValue.COMPARATOR.compare(peek, next);
           if (compare < 0) {
-            return kvScanner.reseek(next);
+            return kvScanner.reseek(KeyValueUtil.ensureKeyValue(next));
           } else if (compare == 0) {
             // we are already at the given key!
             return true;
           }
         }
-        return kvScanner.seek(next);
+        return kvScanner.seek(KeyValueUtil.ensureKeyValue(next));
       }
 
       @Override
-      public KeyValue peek() throws IOException {
+      public Cell peek() throws IOException {
         return kvScanner.peek();
       }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
index 5964647..3335aaa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
@@ -22,16 +22,14 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 
 import com.google.common.collect.Multimap;
 
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-
 /**
  * Handle failures to write to the index tables.
  */
 public interface IndexFailurePolicy extends Stoppable {
-
   public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
 
   /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 4f785eb..3500dd2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -766,7 +766,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         int i = 0;
         for (ColumnReference ref : this.getCoverededColumns()) {
             ImmutableBytesPtr cq = this.indexQualifiers.get(i++);
-            ImmutableBytesPtr value = valueGetter.getLatestValue(ref);
+            ImmutableBytesWritable value = valueGetter.getLatestValue(ref);
             byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey);
             ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
             if (value != null) {
@@ -781,9 +781,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return put;
     }
 
-    public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) {
+    public boolean isRowDeleted(Collection<Cell> pendingUpdates) {
         int nDeleteCF = 0;
-        for (KeyValue kv : pendingUpdates) {
+        for (Cell kv : pendingUpdates) {
             if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode()) {
                 nDeleteCF++;
                 boolean isEmptyCF = Bytes.compareTo(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), 
@@ -798,18 +798,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         return nDeleteCF == this.nDataCFs;
     }
     
-    private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates) throws IOException {
+    private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<Cell> pendingUpdates) throws IOException {
         if (pendingUpdates.isEmpty()) {
             return false;
         }
-        Map<ColumnReference,KeyValue> newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size()); 
-        for (KeyValue kv : pendingUpdates) {
+        Map<ColumnReference,Cell> newState = Maps.newHashMapWithExpectedSize(pendingUpdates.size()); 
+        for (Cell kv : pendingUpdates) {
             newState.put(new ColumnReference(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv)), kv);
         }
         for (ColumnReference ref : indexedColumns) {
-        	KeyValue newValue = newState.get(ref);
+        	Cell newValue = newState.get(ref);
         	if (newValue != null) { // Indexed column has potentially changed
-        		ImmutableBytesPtr oldValue = oldState.getLatestValue(ref);
+        	    ImmutableBytesWritable oldValue = oldState.getLatestValue(ref);
         		boolean newValueSetAsNull = newValue.getTypeByte() == Type.DeleteColumn.getCode();
         		//If the new column value has to be set as null and the older value is null too,
         		//then just skip to the next indexed column.
@@ -834,11 +834,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
      * since we can build the corresponding index row key.
      */
     public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ImmutableBytesWritable dataRowKeyPtr, long ts) throws IOException {
-        return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<KeyValue>emptyList(), ts, null, null);
+        return buildDeleteMutation(kvBuilder, null, dataRowKeyPtr, Collections.<Cell>emptyList(), ts, null, null);
     }
     
     @SuppressWarnings("deprecation")
-    public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
+    public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<Cell> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException {
         byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey);
         // Delete the entire row if any of the indexed columns changed
         if (oldState == null || isRowDeleted(pendingUpdates) || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row
@@ -848,7 +848,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         Delete delete = null;
         // Delete columns for missing key values
-        for (KeyValue kv : pendingUpdates) {
+        for (Cell kv : pendingUpdates) {
             if (kv.getTypeByte() != KeyValue.Type.Put.getCode()) {
                 ColumnReference ref = new ColumnReference(kv.getFamily(), kv.getQualifier());
                 if (coveredColumns.contains(ref)) {
@@ -1304,14 +1304,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
                 .size());
         for (Cell kv : pendingUpdates) {
             // create new pointers to each part of the kv
-            ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getRowArray(),kv.getFamilyOffset(),kv.getFamilyLength());
-            ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength());
             ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
-            valueMap.put(new ColumnReference(kv.getRowArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getRowArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
+            valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value);
         }
         return new ValueGetter() {
             @Override
-            public ImmutableBytesPtr getLatestValue(ColumnReference ref) {
+            public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
                 if(ref.equals(dataEmptyKeyValueRef)) return null;
                 return valueMap.get(ref);
             }


[3/3] phoenix git commit: Secondary indexing with txns

Posted by ja...@apache.org.
Secondary indexing with txns


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5a558e16
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5a558e16
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5a558e16

Branch: refs/heads/txn
Commit: 5a558e16cd7b1e882b274683aff6b655f952dac1
Parents: 826ebf5
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Mar 27 10:35:37 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Mar 27 10:35:37 2015 -0700

----------------------------------------------------------------------
 .../EndToEndCoveredColumnsIndexBuilderIT.java   |  12 +-
 .../covered/example/FailWithoutRetriesIT.java   | 198 +++---
 .../apache/phoenix/hbase/index/ValueGetter.java |   4 +-
 .../hbase/index/builder/BaseIndexBuilder.java   | 170 +++--
 .../hbase/index/builder/IndexBuildManager.java  |  14 -
 .../hbase/index/builder/IndexBuilder.java       |  47 +-
 .../phoenix/hbase/index/covered/Batch.java      |  11 +-
 .../covered/CoveredColumnsIndexBuilder.java     | 491 ---------------
 .../phoenix/hbase/index/covered/IndexCodec.java | 172 +++--
 .../hbase/index/covered/LocalTableState.java    | 453 +++++++------
 .../hbase/index/covered/NonTxIndexBuilder.java  | 405 ++++++++++++
 .../phoenix/hbase/index/covered/TableState.java |  45 +-
 .../hbase/index/covered/TxIndexBuilder.java     | 247 ++++++++
 .../index/covered/data/LazyValueGetter.java     |  18 +-
 .../example/CoveredColumnIndexCodec.java        | 628 +++++++++----------
 .../CoveredColumnIndexSpecifierBuilder.java     |   4 +-
 .../covered/example/CoveredColumnIndexer.java   |   9 +-
 .../index/covered/update/ColumnTracker.java     |   3 +-
 .../covered/update/IndexUpdateManager.java      |   5 +-
 .../hbase/index/scanner/EmptyScanner.java       |   8 +-
 .../phoenix/hbase/index/scanner/Scanner.java    |   7 +-
 .../hbase/index/scanner/ScannerBuilder.java     |  12 +-
 .../hbase/index/write/IndexFailurePolicy.java   |   4 +-
 .../apache/phoenix/index/IndexMaintainer.java   |  28 +-
 .../phoenix/index/PhoenixIndexBuilder.java      |  90 +--
 .../apache/phoenix/index/PhoenixIndexCodec.java | 145 ++---
 .../index/PhoenixIndexFailurePolicy.java        |   2 +-
 .../phoenix/index/PhoenixTxIndexBuilder.java    |  53 ++
 .../index/PhoenixTxIndexFailurePolicy.java      |  50 ++
 .../query/ConnectionQueryServicesImpl.java      |  10 +-
 .../phoenix/schema/tuple/ValueGetterTuple.java  |  16 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  64 +-
 .../covered/CoveredIndexCodecForTesting.java    |  93 ++-
 .../index/covered/TestLocalTableState.java      |  20 +-
 .../example/TestCoveredColumnIndexCodec.java    |  16 +-
 .../phoenix/index/IndexMaintainerTest.java      |   2 +-
 36 files changed, 1904 insertions(+), 1652 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 84c6827..fa85f00 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -59,7 +59,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
- * End-to-End test of just the {@link CoveredColumnsIndexBuilder}, but with a simple
+ * End-to-End test of just the {@link NonTxIndexBuilder}, but with a simple
  * {@link IndexCodec} and BatchCache implementation.
  */
 @Category(NeedsOwnMiniClusterTest.class)
@@ -151,7 +151,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
             ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst();
 
         int count = 0;
-        KeyValue kv;
+        Cell kv;
         while ((kv = kvs.next()) != null) {
           Cell next = expectedKvs.get(count++);
           assertEquals(
@@ -302,9 +302,9 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
     Map<String, String> indexerOpts = new HashMap<String, String>();
     // just need to set the codec - we are going to set it later, but we need something here or the
     // initializer blows up.
-    indexerOpts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY,
+    indexerOpts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY,
       CoveredIndexCodecForTesting.class.getName());
-    Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER);
+    Indexer.enableIndexing(desc, NonTxIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER);
 
     // create the table
     HBaseAdmin admin = UTIL.getHBaseAdmin();
@@ -315,8 +315,8 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
     HRegion region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0);
     Indexer indexer =
         (Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName());
-    CoveredColumnsIndexBuilder builder =
-        (CoveredColumnsIndexBuilder) indexer.getBuilderForTesting();
+    NonTxIndexBuilder builder =
+        (NonTxIndexBuilder) indexer.getBuilderForTesting();
     VerifyingIndexCodec codec = new VerifyingIndexCodec();
     builder.setIndexCodecForTesting(codec);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
index dbe78e7..281ad63 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered.example;
 
@@ -32,6 +24,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -49,105 +42,106 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-
 /**
- * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String}
- * constructor), {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize
- * the exception, and just return <tt>null</tt> to the client, which then just goes and retries.
+ * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String} constructor),
+ * {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize the exception, and just return
+ * <tt>null</tt> to the client, which then just goes and retries.
  */
 @Category(NeedsOwnMiniClusterTest.class)
 public class FailWithoutRetriesIT {
 
-  private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class);
-  @Rule
-  public TableName table = new TableName();
+    private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class);
+    @Rule
+    public TableName table = new TableName();
+
+    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+    private String getIndexTableName() {
+        return Bytes.toString(table.getTableName()) + "_index";
+    }
 
-  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+    public static class FailingTestCodec extends BaseIndexCodec {
 
-  private String getIndexTableName() {
-    return Bytes.toString(table.getTableName()) + "_index";
-  }
+        @Override
+        public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
+            throw new RuntimeException("Intentionally failing deletes for " + FailWithoutRetriesIT.class.getName());
+        }
 
-  public static class FailingTestCodec extends BaseIndexCodec {
+        @Override
+        public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
+            throw new RuntimeException("Intentionally failing upserts for " + FailWithoutRetriesIT.class.getName());
+        }
+
+        @Override
+        public void setContext(TableState state, Mutation mutation) throws IOException {}
+
+    }
 
-    @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
-      throw new RuntimeException("Intentionally failing deletes for "
-          + FailWithoutRetriesIT.class.getName());
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        // setup and verify the config
+        Configuration conf = UTIL.getConfiguration();
+        setUpConfigForMiniCluster(conf);
+        IndexTestingUtils.setupConfig(conf);
+        IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+        // start the cluster
+        UTIL.startMiniCluster();
     }
 
-    @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
-      throw new RuntimeException("Intentionally failing upserts for "
-          + FailWithoutRetriesIT.class.getName());
+    @AfterClass
+    public static void teardownCluster() throws Exception {
+        UTIL.shutdownMiniCluster();
     }
 
-  }
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    // setup and verify the config
-    Configuration conf = UTIL.getConfiguration();
-    setUpConfigForMiniCluster(conf);
-    IndexTestingUtils.setupConfig(conf);
-    IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
-    // start the cluster
-    UTIL.startMiniCluster();
-  }
-
-  @AfterClass
-  public static void teardownCluster() throws Exception {
-    UTIL.shutdownMiniCluster();
-  }
-
-  /**
-   * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't
-   * rethrowing the exception correctly?
-   * <p>
-   * We use a custom codec to enforce the thrown exception.
-   * @throws Exception
-   */
-  @Test(timeout = 300000)
-  public void testQuickFailure() throws Exception {
-    // incorrectly setup indexing for the primary table - target index table doesn't exist, which
-    // should quickly return to the client
-    byte[] family = Bytes.toBytes("family");
-    ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
-    // values are [col1]
-    fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
-    CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
-    // add the index family
-    builder.addIndexGroup(fam1);
-    // usually, we would create the index table here, but we don't for the sake of the test.
-
-    // setup the primary table
-    String primaryTable = Bytes.toString(table.getTableName());
-    @SuppressWarnings("deprecation")
-    HTableDescriptor pTable = new HTableDescriptor(primaryTable);
-    pTable.addFamily(new HColumnDescriptor(family));
-    // override the codec so we can use our test one
-    builder.build(pTable, FailingTestCodec.class);
-
-    // create the primary table
-    HBaseAdmin admin = UTIL.getHBaseAdmin();
-    admin.createTable(pTable);
-    Configuration conf = new Configuration(UTIL.getConfiguration());
-    // up the number of retries/wait time to make it obvious that we are failing with retries here
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
-    conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
-    HTable primary = new HTable(conf, primaryTable);
-    primary.setAutoFlush(false, true);
-
-    // do a simple put that should be indexed
-    Put p = new Put(Bytes.toBytes("row"));
-    p.add(family, null, Bytes.toBytes("value"));
-    primary.put(p);
-    try {
-      primary.flushCommits();
-      fail("Shouldn't have gotten a successful write to the primary table");
-    } catch (RetriesExhaustedWithDetailsException e) {
-      LOG.info("Correclty got a failure of the put!");
+    /**
+     * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't rethrowing the exception
+     * correctly?
+     * <p>
+     * We use a custom codec to enforce the thrown exception.
+     * 
+     * @throws Exception
+     */
+    @Test(timeout = 300000)
+    public void testQuickFailure() throws Exception {
+        // incorrectly setup indexing for the primary table - target index table doesn't exist, which
+        // should quickly return to the client
+        byte[] family = Bytes.toBytes("family");
+        ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
+        // values are [col1]
+        fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
+        CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
+        // add the index family
+        builder.addIndexGroup(fam1);
+        // usually, we would create the index table here, but we don't for the sake of the test.
+
+        // setup the primary table
+        String primaryTable = Bytes.toString(table.getTableName());
+        @SuppressWarnings("deprecation")
+        HTableDescriptor pTable = new HTableDescriptor(primaryTable);
+        pTable.addFamily(new HColumnDescriptor(family));
+        // override the codec so we can use our test one
+        builder.build(pTable, FailingTestCodec.class);
+
+        // create the primary table
+        HBaseAdmin admin = UTIL.getHBaseAdmin();
+        admin.createTable(pTable);
+        Configuration conf = new Configuration(UTIL.getConfiguration());
+        // up the number of retries/wait time to make it obvious that we are failing with retries here
+        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
+        conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
+        HTable primary = new HTable(conf, primaryTable);
+        primary.setAutoFlush(false, true);
+
+        // do a simple put that should be indexed
+        Put p = new Put(Bytes.toBytes("row"));
+        p.add(family, null, Bytes.toBytes("value"));
+        primary.put(p);
+        try {
+            primary.flushCommits();
+            fail("Shouldn't have gotten a successful write to the primary table");
+        } catch (RetriesExhaustedWithDetailsException e) {
+            LOG.info("Correclty got a failure of the put!");
+        }
+        primary.close();
     }
-    primary.close();
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
index a6e36cb..bcadc2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
@@ -19,8 +19,8 @@ package org.apache.phoenix.hbase.index;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 public interface ValueGetter {
 
@@ -32,7 +32,7 @@ public interface ValueGetter {
    *         present.
    * @throws IOException if there is an error accessing the underlying data storage
    */
-  public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException;
+  public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException;
   
   public byte[] getRowKey();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index f9df296..dfb9ad4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -1,95 +1,127 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.builder;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 
 /**
  * Basic implementation of the {@link IndexBuilder} that doesn't do any actual work of indexing.
  * <p>
- * You should extend this class, rather than implementing IndexBuilder directly to maintain
- * compatability going forward.
+ * You should extend this class, rather than implementing IndexBuilder directly to maintain compatability going forward.
  * <p>
- * Generally, you should consider using one of the implemented IndexBuilders (e.g
- * {@link CoveredColumnsIndexBuilder}) as there is a lot of work required to keep an index table
- * up-to-date.
+ * Generally, you should consider using one of the implemented IndexBuilders (e.g {@link NonTxIndexBuilder}) as there is
+ * a lot of work required to keep an index table up-to-date.
  */
 public abstract class BaseIndexBuilder implements IndexBuilder {
+    public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class";
+    private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
 
-  private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
-  protected boolean stopped;
+    protected boolean stopped;
+    protected RegionCoprocessorEnvironment env;
+    protected IndexCodec codec;
 
-  @Override
-  public void extendBaseIndexBuilderInstead() { }
-  
-  @Override
-  public void setup(RegionCoprocessorEnvironment conf) throws IOException {
-    // noop
-  }
+    abstract protected boolean useRawScanToPrimeBlockCache();
+    
+    @Override
+    public void extendBaseIndexBuilderInstead() {}
 
-  @Override
-  public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-    // noop
-  }
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        this.env = env;
+        // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
+        // so we can use it later when generalizing covered indexes
+        Configuration conf = env.getConfiguration();
+        Class<? extends IndexCodec> codecClass = conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
+        try {
+            Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
+            meth.setAccessible(true);
+            this.codec = meth.newInstance();
+            this.codec.initialize(env);
+        } catch (IOException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
 
-  @Override
-  public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
-    // noop
-  }
-  
-  /**
-   * By default, we always attempt to index the mutation. Commonly this can be slow (because the
-   * framework spends the time to do the indexing, only to realize that you don't need it) or not
-   * ideal (if you want to turn on/off indexing on a table without completely reloading it).
- * @throws IOException 
-   */
-  @Override
-  public boolean isEnabled(Mutation m) throws IOException {
-    return true; 
-  }
+    @Override
+    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+        // noop
+    }
 
-  /**
-   * {@inheritDoc}
-   * <p>
-   * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each
-   * mutation always applies to different rows, even if they are in the same batch, or are
-   * independent updates.
-   */
-  @Override
-  public byte[] getBatchId(Mutation m) {
-    return null;
-  }
+    @Override
+    public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+        // noop
+    }
 
-  @Override
-  public void stop(String why) {
-    LOG.debug("Stopping because: " + why);
-    this.stopped = true;
-  }
+    /**
+     * By default, we always attempt to index the mutation. Commonly this can be slow (because the framework spends the
+     * time to do the indexing, only to realize that you don't need it) or not ideal (if you want to turn on/off
+     * indexing on a table without completely reloading it).
+     * 
+     * @throws IOException
+     */
+    @Override
+    public boolean isEnabled(Mutation m) throws IOException {
+        // ask the codec to see if we should even attempt indexing
+        return this.codec.isEnabled(m);
+    }
 
-  @Override
-  public boolean isStopped() {
-    return this.stopped;
-  }
+    /**
+     * Exposed for testing!
+     * 
+     * @param codec
+     *            codec to use for this instance of the builder
+     */
+    public void setIndexCodecForTesting(IndexCodec codec) {
+        this.codec = codec;
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each mutation always applies
+     * to different rows, even if they are in the same batch, or are independent updates.
+     */
+    @Override
+    public byte[] getBatchId(Mutation m) {
+        return this.codec.getBatchId(m);
+    }
+
+    @Override
+    public void stop(String why) {
+        LOG.debug("Stopping because: " + why);
+        this.stopped = true;
+    }
+
+    @Override
+    public boolean isStopped() {
+        return this.stopped;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index ba9534c..d5fd34d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -156,19 +155,6 @@ public class IndexBuildManager implements Stoppable {
     return results;
   }
 
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException {
-    // all we get is a single update, so it would probably just go slower if we needed to queue it
-    // up. It will increase underlying resource contention a little bit, but the mutation case is
-    // far more common, so let's not worry about it for now.
-    // short circuit so we don't waste time.
-    if (!this.delegate.isEnabled(delete)) {
-      return null;
-    }
-
-    return delegate.getIndexUpdate(delete);
-
-  }
-
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
       Collection<KeyValue> filtered) throws IOException {
     // this is run async, so we can take our time here

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index b91a52a..194fdcc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -79,32 +78,26 @@ public interface IndexBuilder extends Stoppable {
   
   public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
 
-  /**
-   * The counter-part to {@link #getIndexUpdate(Mutation)} - your opportunity to update any/all
-   * index tables based on the delete of the primary table row. This is only called for cases where
-   * the client sends a single delete ({@link HTable#delete}). We separate this method from
-   * {@link #getIndexUpdate(Mutation)} only for the ease of implementation as the delete path has
-   * subtly different semantics for updating the families/timestamps from the generic batch path.
-   * <p>
-   * Its up to your implementation to ensure that timestamps match between the primary and index
-   * tables.
-   * <p>
-   * Implementers must ensure that this method is thread-safe - it could (and probably will) be
-   * called concurrently for different mutations, which may or may not be part of the same batch.
-   * @param delete {@link Delete} to the primary table that may be indexed
-   * @return a {@link Map} of the mutations to make -> target index table name
-   * @throws IOException on failure
-   */
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException;
-
-  /**
-   * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal
-   * flush or compaction mechanisms.
-   * @param filtered {@link KeyValue}s that previously existed, but won't be included in further
-   *          output from HBase.
-   * @return a {@link Map} of the mutations to make -> target index table name
-   * @throws IOException on failure
-   */
+    /**
+     * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
+     * mechanisms. Currently not implemented by any implementors nor called, but left here to be implemented if we
+     * ever need it. In Jesse's words:
+     * 
+     * Arguably, this is a correctness piece that should be used, but isn't. Basically, it *could* be that
+     * if a compaction/flush were to remove a key (too old, too many versions) you might want to cleanup the index table
+     * as well, if it were to get out of sync with the primary table. For instance, you might get multiple versions of
+     * the same row, which should eventually age of the oldest version. However, in the index table there would only
+     * ever be two entries for that row - the first one, matching the original row, and the delete marker for the index
+     * update, set when we got a newer version of the primary row. So, a basic HBase scan wouldn't show the index update
+     * b/c its covered by the delete marker, but an older timestamp based read would actually show the index row, even
+     * after the primary table row is gone due to MAX_VERSIONS requirement.
+     *  
+     * @param filtered {@link KeyValue}s that previously existed, but won't be included
+     * in further output from HBase.
+     * 
+     * @return a {@link Map} of the mutations to make -> target index table name
+     * @throws IOException on failure
+     */
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
       Collection<KeyValue> filtered)
       throws IOException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
index e707ea2..5e0da3c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index.covered;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 
 /**
@@ -27,9 +28,9 @@ import org.apache.hadoop.hbase.KeyValue;
  */
 public class Batch {
 
-  private static final long pointDeleteCode = KeyValue.Type.Delete.getCode();
+  private static final byte pointDeleteCode = KeyValue.Type.Delete.getCode();
   private final long timestamp;
-  private List<KeyValue> batch = new ArrayList<KeyValue>();
+  private List<Cell> batch = new ArrayList<Cell>();
   private boolean allPointDeletes = true;
 
   /**
@@ -39,8 +40,8 @@ public class Batch {
     this.timestamp = ts;
   }
 
-  public void add(KeyValue kv){
-    if (pointDeleteCode != kv.getType()) {
+  public void add(Cell kv){
+    if (pointDeleteCode != kv.getTypeByte()) {
       allPointDeletes = false;
     }
     batch.add(kv);
@@ -54,7 +55,7 @@ public class Batch {
     return this.timestamp;
   }
 
-  public List<KeyValue> getKvs() {
+  public List<Cell> getKvs() {
     return this.batch;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
deleted file mode 100644
index 6524fd4..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-
-/**
- * Build covered indexes for phoenix updates.
- * <p>
- * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't
- * need to do any extra synchronization in the IndexBuilder.
- * <p>
- * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or
- * flush, leading to a bloated index that needs to be cleaned up by a background process.
- */
-public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
-
-  private static final Log LOG = LogFactory.getLog(CoveredColumnsIndexBuilder.class);
-  public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class";
-
-  protected RegionCoprocessorEnvironment env;
-  protected IndexCodec codec;
-  protected LocalHBaseState localTable;
-
-  @Override
-  public void setup(RegionCoprocessorEnvironment env) throws IOException {
-    this.env = env;
-    // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
-    // so we can use it later when generalizing covered indexes
-    Configuration conf = env.getConfiguration();
-    Class<? extends IndexCodec> codecClass =
-        conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
-    try {
-      Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
-      meth.setAccessible(true);
-      this.codec = meth.newInstance();
-      this.codec.initialize(env);
-    } catch (IOException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    
-    this.localTable = new LocalTable(env);
-  }
-
-  @Override
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
-    // build the index updates for each group
-    IndexUpdateManager updateMap = new IndexUpdateManager();
-
-    batchMutationAndAddUpdates(updateMap, mutation);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Found index updates for Mutation: " + mutation + "\n" + updateMap);
-    }
-
-    return updateMap.toMap();
-  }
-
-  /**
-   * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each
-   * key-value in the update to see if it matches the others. Generally, this will be the case, but
-   * you can add kvs to a mutation that don't all have the timestamp, so we need to manage
-   * everything in batches based on timestamp.
-   * <p>
-   * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
-   * @param updateMap index updates into which to add new updates. Modified as a side-effect.
-   * @param state current state of the row for the mutation.
-   * @param m mutation to batch
- * @throws IOException 
-   */
-  private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
-    // split the mutation into timestamp-based batches
-    Collection<Batch> batches = createTimestampBatchesFromMutation(m);
-
-    // create a state manager, so we can manage each batch
-    LocalTableState state = new LocalTableState(env, localTable, m);
-
-    // go through each batch of keyvalues and build separate index entries for each
-    boolean cleanupCurrentState = true;
-    for (Batch batch : batches) {
-      /*
-       * We have to split the work between the cleanup and the update for each group because when we
-       * update the current state of the row for the current batch (appending the mutations for the
-       * current batch) the next group will see that as the current state, which will can cause the
-       * a delete and a put to be created for the next group.
-       */
-      if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
-        cleanupCurrentState = false;
-      }
-    }
-  }
-
-  /**
-   * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any
-   * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
-   * the time the method is called.
-   * @param m {@link Mutation} from which to extract the {@link KeyValue}s
-   * @return the mutation, broken into batches and sorted in ascending order (smallest first)
-   */
-  protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
-    Map<Long, Batch> batches = new HashMap<Long, Batch>();
-    for (List<Cell> family : m.getFamilyCellMap().values()) {
-      List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
-      createTimestampBatchesFromKeyValues(familyKVs, batches);
-    }
-    // sort the batches
-    List<Batch> sorted = new ArrayList<Batch>(batches.values());
-    Collections.sort(sorted, new Comparator<Batch>() {
-      @Override
-      public int compare(Batch o1, Batch o2) {
-        return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
-      }
-    });
-    return sorted;
-  }
-
-  /**
-   * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any
-   * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
-   * the time the method is called.
-   * @param kvs {@link KeyValue}s to break into batches
-   * @param batches to update with the given kvs
-   */
-  protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs,
-      Map<Long, Batch> batches) {
-    long now = EnvironmentEdgeManager.currentTimeMillis();
-    byte[] nowBytes = Bytes.toBytes(now);
-
-    // batch kvs by timestamp
-    for (KeyValue kv : kvs) {
-      long ts = kv.getTimestamp();
-      // override the timestamp to the current time, so the index and primary tables match
-      // all the keys with LATEST_TIMESTAMP will then be put into the same batch
-      if (kv.updateLatestStamp(nowBytes)) {
-        ts = now;
-      }
-      Batch batch = batches.get(ts);
-      if (batch == null) {
-        batch = new Batch(ts);
-        batches.put(ts, batch);
-      }
-      batch.add(kv);
-    }
-  }
-
-  /**
-   * For a single batch, get all the index updates and add them to the updateMap
-   * <p>
-   * This method manages cleaning up the entire history of the row from the given timestamp forward
-   * for out-of-order (e.g. 'back in time') updates.
-   * <p>
-   * If things arrive out of order (client is using custom timestamps) we should still see the index
-   * in the correct order (assuming we scan after the out-of-order update in finished). Therefore,
-   * we when we aren't the most recent update to the index, we need to delete the state at the
-   * current timestamp (similar to above), but also issue a delete for the added index updates at
-   * the next newest timestamp of any of the columns in the update; we need to cleanup the insert so
-   * it looks like it was also deleted at that next newest timestamp. However, its not enough to
-   * just update the one in front of us - that column will likely be applied to index entries up the
-   * entire history in front of us, which also needs to be fixed up.
-   * <p>
-   * However, the current update usually will be the most recent thing to be added. In that case,
-   * all we need to is issue a delete for the previous index row (the state of the row, without the
-   * update applied) at the current timestamp. This gets rid of anything currently in the index for
-   * the current state of the row (at the timestamp). Then we can just follow that by applying the
-   * pending update and building the index update based on the new row state.
-   * @param updateMap map to update with new index elements
-   * @param batch timestamp-based batch of edits
-   * @param state local state to update and pass to the codec
-   * @param requireCurrentStateCleanup <tt>true</tt> if we should should attempt to cleanup the
-   *          current state of the table, in the event of a 'back in time' batch. <tt>false</tt>
-   *          indicates we should not attempt the cleanup, e.g. an earlier batch already did the
-   *          cleanup.
-   * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put),
-   *         <tt>false</tt> otherwise
- * @throws IOException 
-   */
-  private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch,
-      LocalTableState state, boolean requireCurrentStateCleanup) throws IOException {
-
-    // need a temporary manager for the current batch. It should resolve any conflicts for the
-    // current batch. Essentially, we can get the case where a batch doesn't change the current
-    // state of the index (all Puts are covered by deletes), in which case we don't want to add
-    // anything
-    // A. Get the correct values for the pending state in the batch
-    // A.1 start by cleaning up the current state - as long as there are key-values in the batch
-    // that are indexed, we need to change the current state of the index. Its up to the codec to
-    // determine if we need to make any cleanup given the pending update.
-    long batchTs = batch.getTimestamp();
-    state.setPendingUpdates(batch.getKvs());
-    addCleanupForCurrentBatch(updateMap, batchTs, state);
-
-    // A.2 do a single pass first for the updates to the current state
-    state.applyPendingUpdates();
-    long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
-    // if all the updates are the latest thing in the index, we are done - don't go and fix history
-    if (ColumnTracker.isNewestTime(minTs)) {
-      return false;
-    }
-
-    // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
-    // index. after this, we have the correct view of the index, from the batch up to the index
-    while(!ColumnTracker.isNewestTime(minTs) ){
-      minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
-    }
-
-    // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
-   if (requireCurrentStateCleanup) {
-      // roll back the pending update. This is needed so we can remove all the 'old' index entries.
-      // We don't need to do the puts here, but just the deletes at the given timestamps since we
-      // just want to completely hide the incorrect entries.
-      state.rollback(batch.getKvs());
-      // setup state
-      state.setPendingUpdates(batch.getKvs());
-
-      // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
-      // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
-      // because the update may have a different set of columns or value based on the update).
-      cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
-
-      // have to roll the state forward again, so the current state is correct
-      state.applyPendingUpdates();
-      return true;
-    }
-    return false;
-  }
-
-  private long addUpdateForGivenTimestamp(long ts, LocalTableState state,
-      IndexUpdateManager updateMap) throws IOException {
-    state.setCurrentTimestamp(ts);
-    ts = addCurrentStateMutationsForBatch(updateMap, state);
-    return ts;
-  }
-
-  private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs,
-      LocalTableState state) throws IOException {
-    // get the cleanup for the current state
-    state.setCurrentTimestamp(batchTs);
-    addDeleteUpdatesToMap(updateMap, state, batchTs);
-    // ignore any index tracking from the delete
-    state.resetTrackedColumns();
-  }
-  
-  /**
-   * Add the necessary mutations for the pending batch on the local state. Handles rolling up
-   * through history to determine the index changes after applying the batch (for the case where the
-   * batch is back in time).
-   * @param updateMap to update with index mutations
-   * @param batch to apply to the current state
-   * @param state current state of the table
-   * @return the minimum timestamp across all index columns requested. If
-   *         {@link ColumnTracker#isNewestTime(long)} returns <tt>true</tt> on the returned
-   *         timestamp, we know that this <i>was not a back-in-time update</i>.
- * @throws IOException 
-   */
-  private long
-      addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) throws IOException {
-
-    // get the index updates for this current batch
-    Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
-    state.resetTrackedColumns();
-
-    /*
-     * go through all the pending updates. If we are sure that all the entries are the latest
-     * timestamp, we can just add the index updates and move on. However, if there are columns that
-     * we skip past (based on the timestamp of the batch), we need to roll back up the history.
-     * Regardless of whether or not they are the latest timestamp, the entries here are going to be
-     * correct for the current batch timestamp, so we add them to the updates. The only thing we
-     * really care about it if we need to roll up the history and fix it as we go.
-     */
-    // timestamp of the next update we need to track
-    long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
-    List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
-    for (IndexUpdate update : upserts) {
-      // this is the one bit where we check the timestamps
-      final ColumnTracker tracker = update.getIndexedColumns();
-      long trackerTs = tracker.getTS();
-      // update the next min TS we need to track
-      if (trackerTs < minTs) {
-        minTs = tracker.getTS();
-      }
-      // track index hints for the next round. Hint if we need an update for that column for the
-      // next timestamp. These columns clearly won't need to update as we go through time as they
-      // already match the most recent possible thing.
-      boolean needsCleanup = false;
-      if (tracker.hasNewerTimestamps()) {
-        columnHints.add(tracker);
-        // this update also needs to be cleaned up at the next timestamp because it not the latest.
-        needsCleanup = true;
-      }
-
-
-      // only make the put if the index update has been setup
-      if (update.isValid()) {
-        byte[] table = update.getTableName();
-        Mutation mutation = update.getUpdate();
-        updateMap.addIndexUpdate(table, mutation);
-
-        // only make the cleanup if we made a put and need cleanup
-        if (needsCleanup) {
-          // there is a TS for the interested columns that is greater than the columns in the
-          // put. Therefore, we need to issue a delete at the same timestamp
-          Delete d = new Delete(mutation.getRow());
-          d.setTimestamp(tracker.getTS());
-          updateMap.addIndexUpdate(table, d);
-        }
-      }
-    }
-    return minTs;
-  }
-
-  /**
-   * Cleanup the index based on the current state from the given batch. Iterates over each timestamp
-   * (for the indexed rows) for the current state of the table and cleans up all the existing
-   * entries generated by the codec.
-   * <p>
-   * Adds all pending updates to the updateMap
-   * @param updateMap updated with the pending index updates from the codec
-   * @param batchTs timestamp from which we should cleanup
-   * @param state current state of the primary table. Should already by setup to the correct state
-   *          from which we want to cleanup.
- * @throws IOException 
-   */
-  private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap,
-      long batchTs, LocalTableState state) throws IOException {
-    // get the cleanup for the current state
-    state.setCurrentTimestamp(batchTs);
-    addDeleteUpdatesToMap(updateMap, state, batchTs);
-    Set<ColumnTracker> trackers = state.getTrackedColumns();
-    long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
-    for (ColumnTracker tracker : trackers) {
-      if (tracker.getTS() < minTs) {
-        minTs = tracker.getTS();
-      }
-    }
-    state.resetTrackedColumns();
-    if (!ColumnTracker.isNewestTime(minTs)) {
-      state.setHints(Lists.newArrayList(trackers));
-      cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
-    }
-  }
-
-
-  /**
-   * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then
-   * add them to the update map.
-   * <p>
-   * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates
-   * applied, etc).
- * @throws IOException 
-   */
-  protected void
-      addDeleteUpdatesToMap(IndexUpdateManager updateMap,
-      LocalTableState state, long ts) throws IOException {
-    Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
-    if (cleanup != null) {
-      for (IndexUpdate d : cleanup) {
-        if (!d.isValid()) {
-          continue;
-        }
-        // override the timestamps in the delete to match the current batch.
-        Delete remove = (Delete)d.getUpdate();
-        remove.setTimestamp(ts);
-        updateMap.addIndexUpdate(d.getTableName(), remove);
-      }
-    }
-  }
-
-  @Override
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException {
-    // stores all the return values
-    IndexUpdateManager updateMap = new IndexUpdateManager();
-
-    // We have to figure out which kind of delete it is, since we need to do different things if its
-    // a general (row) delete, versus a delete of just a single column or family
-    Map<byte[], List<Cell>> families = d.getFamilyCellMap();
-
-    /*
-     * Option 1: its a row delete marker, so we just need to delete the most recent state for each
-     * group, as of the specified timestamp in the delete. This can happen if we have a single row
-     * update and it is part of a batch mutation (prepare doesn't happen until later... maybe a
-     * bug?). In a single delete, this delete gets all the column families appended, so the family
-     * map won't be empty by the time it gets here.
-     */
-    if (families.size() == 0) {
-      LocalTableState state = new LocalTableState(env, localTable, d);
-      // get a consistent view of name
-      long now = d.getTimeStamp();
-      if (now == HConstants.LATEST_TIMESTAMP) {
-        now = EnvironmentEdgeManager.currentTimeMillis();
-        // update the delete's idea of 'now' to be consistent with the index
-        d.setTimestamp(now);
-      }
-      // get deletes from the codec
-      // we only need to get deletes and not add puts because this delete covers all columns
-      addDeleteUpdatesToMap(updateMap, state, now);
-
-      /*
-       * Update the current state for all the kvs in the delete. Generally, we would just iterate
-       * the family map, but since we go here, the family map is empty! Therefore, we need to fake a
-       * bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed
-       * for current version of HBase that has an issue where the batch update doesn't update the
-       * deletes before calling the hook.
-       */
-      byte[] deleteRow = d.getRow();
-      for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) {
-        state.addPendingUpdates(new KeyValue(deleteRow, family, null, now,
-            KeyValue.Type.DeleteFamily));
-      }
-    } else {
-      // Option 2: Its actually a bunch single updates, which can have different timestamps.
-      // Therefore, we need to do something similar to the put case and batch by timestamp
-      batchMutationAndAddUpdates(updateMap, d);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap);
-    }
-
-    return updateMap.toMap();
-  }
-
-  @Override
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
-      Collection<KeyValue> filtered) throws IOException {
-    // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
-    return null;
-  }
-
-  /**
-   * Exposed for testing!
-   * @param codec codec to use for this instance of the builder
-   */
-  public void setIndexCodecForTesting(IndexCodec codec) {
-    this.codec = codec;
-  }
-
-  @Override
-  public boolean isEnabled(Mutation m) throws IOException {
-    // ask the codec to see if we should even attempt indexing
-    return this.codec.isEnabled(m);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index daa631b..e3ef831 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered;
 
@@ -22,89 +14,93 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
 import org.apache.phoenix.index.BaseIndexCodec;
 
-
 /**
  * Codec for creating index updates from the current state of a table.
  * <p>
- * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as
- * features need to be added to the codec, as well as potentially not haivng to implement some
- * methods.
+ * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as features need to be
+ * added to the codec, as well as potentially not haivng to implement some methods.
  */
 public interface IndexCodec {
 
-  /**
-   * Do any code initialization necessary
-   * @param env environment in which the codec is operating
-   * @throws IOException if the codec cannot be initalized correctly
-   */
-  public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+    /**
+     * Do any code initialization necessary
+     * 
+     * @param env
+     *            environment in which the codec is operating
+     * @throws IOException
+     *             if the codec cannot be initalized correctly
+     */
+    public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+
+    /**
+     * Get the index cleanup entries. Currently, this must return just single row deletes (where just the row-key is
+     * specified and no columns are returned) mapped to the table name. For instance, to you have an index 'myIndex'
+     * with row :
+     * 
+     * <pre>
+     * v1,v2,v3 | CF:CQ0  | rowkey
+     *          | CF:CQ1  | rowkey
+     * </pre>
+     * 
+     * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
+     * 
+     * @param state
+     *            the current state of the table that needs to be cleaned up. Generally, you only care about the latest
+     *            column values, for each column you are indexing for each index table.
+     * @return the pairs of (deletes, index table name) that should be applied.
+     * @throws IOException
+     */
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
 
-  /**
-   * Get the index cleanup entries. Currently, this must return just single row deletes (where just
-   * the row-key is specified and no columns are returned) mapped to the table name. For instance,
-   * to you have an index 'myIndex' with row :
-   * 
-   * <pre>
-   * v1,v2,v3 | CF:CQ0  | rowkey
-   *          | CF:CQ1  | rowkey
-   * </pre>
-   * 
-   * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
-   * @param state the current state of the table that needs to be cleaned up. Generally, you only
-   *          care about the latest column values, for each column you are indexing for each index
-   *          table.
-   * @return the pairs of (deletes, index table name) that should be applied.
- * @throws IOException 
-   */
-  public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
+    // table state has the pending update already applied, before calling
+    // get the new index entries
+    /**
+     * Get the index updates for the primary table state, for each index table. The returned {@link Put}s need to be
+     * fully specified (including timestamp) to minimize passes over the same key-values multiple times.
+     * <p>
+     * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so the index entries
+     * match the primary table row. This could be managed at a higher level, but would require iterating all the kvs in
+     * the Put again - very inefficient when compared to the current interface where you must provide a timestamp
+     * anyways (so you might as well provide the right one).
+     * 
+     * @param state
+     *            the current state of the table that needs to an index update Generally, you only care about the latest
+     *            column values, for each column you are indexing for each index table.
+     * @return the pairs of (updates,index table name) that should be applied.
+     * @throws IOException
+     */
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
 
-  // table state has the pending update already applied, before calling
-  // get the new index entries
-  /**
-   * Get the index updates for the primary table state, for each index table. The returned
-   * {@link Put}s need to be fully specified (including timestamp) to minimize passes over the same
-   * key-values multiple times.
-   * <p>
-   * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so
-   * the index entries match the primary table row. This could be managed at a higher level, but
-   * would require iterating all the kvs in the Put again - very inefficient when compared to the
-   * current interface where you must provide a timestamp anyways (so you might as well provide the
-   * right one).
-   * @param state the current state of the table that needs to an index update Generally, you only
-   *          care about the latest column values, for each column you are indexing for each index
-   *          table.
-   * @return the pairs of (updates,index table name) that should be applied.
- * @throws IOException 
-   */
-  public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
+    /**
+     * This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't
+     * take place, we can save a lot of time on the regular Put patch. By making it dynamic, we can save offlining and
+     * then onlining a table just to turn indexing on.
+     * <p>
+     * We can also be smart about even indexing a given update here too - if the update doesn't contain any columns that
+     * we care about indexing, we can save the effort of analyzing the put and further.
+     * 
+     * @param m
+     *            mutation that should be indexed.
+     * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table basis, as each
+     *         codec is instantiated per-region.
+     * @throws IOException
+     */
+    public boolean isEnabled(Mutation m) throws IOException;
 
-  /**
-   * This allows the codec to dynamically change whether or not indexing should take place for a
-   * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
-   * it dynamic, we can save offlining and then onlining a table just to turn indexing on.
-   * <p>
-   * We can also be smart about even indexing a given update here too - if the update doesn't
-   * contain any columns that we care about indexing, we can save the effort of analyzing the put
-   * and further.
-   * @param m mutation that should be indexed.
-   * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table
-   *         basis, as each codec is instantiated per-region.
- * @throws IOException 
-   */
-  public boolean isEnabled(Mutation m) throws IOException;
+    /**
+     * Get the batch identifier of the given mutation. Generally, updates to the table will take place in a batch of
+     * updates; if we know that the mutation is part of a batch, we can build the state much more intelligently.
+     * <p>
+     * <b>If you have batches that have multiple updates to the same row state, you must specify a batch id for each
+     * batch. Otherwise, we cannot guarantee index correctness</b>
+     * 
+     * @param m
+     *            mutation that may or may not be part of the batch
+     * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
+     */
+    public byte[] getBatchId(Mutation m);
 
-  /**
-   * Get the batch identifier of the given mutation. Generally, updates to the table will take place
-   * in a batch of updates; if we know that the mutation is part of a batch, we can build the state
-   * much more intelligently.
-   * <p>
-   * <b>If you have batches that have multiple updates to the same row state, you must specify a
-   * batch id for each batch. Otherwise, we cannot guarantee index correctness</b>
-   * @param m mutation that may or may not be part of the batch
-   * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
-   */
-  public byte[] getBatchId(Mutation m);
+    public void setContext(TableState state, Mutation mutation) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index e4bc193..f47a71a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered;
 
@@ -34,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.util.Pair;
-
+import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.data.IndexMemStore;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -42,203 +34,250 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+import com.google.common.collect.Maps;
 
 /**
  * Manage the state of the HRegion's view of the table, for the single row.
  * <p>
- * Currently, this is a single-use object - you need to create a new one for each row that you need
- * to manage. In the future, we could make this object reusable, but for the moment its easier to
- * manage as a throw-away object.
+ * Currently, this is a single-use object - you need to create a new one for each row that you need to manage. In the
+ * future, we could make this object reusable, but for the moment its easier to manage as a throw-away object.
  * <p>
- * This class is <b>not</b> thread-safe - it requires external synchronization is access
- * concurrently.
+ * This class is <b>not</b> thread-safe - it requires external synchronization is access concurrently.
  */
 public class LocalTableState implements TableState {
 
-  private long ts;
-  private RegionCoprocessorEnvironment env;
-  private KeyValueStore memstore;
-  private LocalHBaseState table;
-  private Mutation update;
-  private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
-  private ScannerBuilder scannerBuilder;
-  private List<KeyValue> kvs = new ArrayList<KeyValue>();
-  private List<? extends IndexedColumnGroup> hints;
-  private CoveredColumns columnSet;
-
-  public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
-    this.env = environment;
-    this.table = table;
-    this.update = update;
-    this.memstore = new IndexMemStore();
-    this.scannerBuilder = new ScannerBuilder(memstore, update);
-    this.columnSet = new CoveredColumns();
-  }
-
-  public void addPendingUpdates(KeyValue... kvs) {
-    if (kvs == null) return;
-    addPendingUpdates(Arrays.asList(kvs));
-  }
-
-  public void addPendingUpdates(List<KeyValue> kvs) {
-    if(kvs == null) return;
-    setPendingUpdates(kvs);
-    addUpdate(kvs);
-  }
-
-  private void addUpdate(List<KeyValue> list) {
-    addUpdate(list, true);
-  }
-
-  private void addUpdate(List<KeyValue> list, boolean overwrite) {
-    if (list == null) return;
-    for (KeyValue kv : list) {
-      this.memstore.add(kv, overwrite);
-    }
-  }
-
-  @Override
-  public RegionCoprocessorEnvironment getEnvironment() {
-    return this.env;
-  }
-
-  @Override
-  public long getCurrentTimestamp() {
-    return this.ts;
-  }
-
-  @Override
-  public void setCurrentTimestamp(long timestamp) {
-    this.ts = timestamp;
-  }
-
-  public void resetTrackedColumns() {
-    this.trackedColumns.clear();
-  }
-
-  public Set<ColumnTracker> getTrackedColumns() {
-    return this.trackedColumns;
-  }
-
-  @Override
-  public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
-      Collection<? extends ColumnReference> indexedColumns) throws IOException {
-    ensureLocalStateInitialized(indexedColumns);
-    // filter out things with a newer timestamp and track the column references to which it applies
-    ColumnTracker tracker = new ColumnTracker(indexedColumns);
-    synchronized (this.trackedColumns) {
-      // we haven't seen this set of columns before, so we need to create a new tracker
-      if (!this.trackedColumns.contains(tracker)) {
-        this.trackedColumns.add(tracker);
-      }
-    }
-
-    Scanner scanner =
-        this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
-
-    return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
-  }
-
-  /**
-   * Initialize the managed local state. Generally, this will only be called by
-   * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside.
-   * Even then, there is still fairly low contention as each new Put/Delete will have its own table
-   * state.
-   */
-  private synchronized void ensureLocalStateInitialized(
-      Collection<? extends ColumnReference> columns) throws IOException {
-    // check to see if we haven't initialized any columns yet
-    Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
-    // we have all the columns loaded, so we are good to go.
-    if (toCover.isEmpty()) {
-      return;
-    }
-
-    // add the current state of the row
-    this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false);
-
-    // add the covered columns to the set
-    for (ColumnReference ref : toCover) {
-      this.columnSet.addColumn(ref);
-    }
-  }
-
-  @Override
-  public Map<String, byte[]> getUpdateAttributes() {
-    return this.update.getAttributesMap();
-  }
-
-  @Override
-  public byte[] getCurrentRowKey() {
-    return this.update.getRow();
-  }
-
-  public Result getCurrentRowState() {
-    KeyValueScanner scanner = this.memstore.getScanner();
-    List<Cell> kvs = new ArrayList<Cell>();
-    while (scanner.peek() != null) {
-      try {
-        kvs.add(scanner.next());
-      } catch (IOException e) {
-        // this should never happen - something has gone terribly arwy if it has
-        throw new RuntimeException("Local MemStore threw IOException!");
-      }
-    }
-    return Result.create(kvs);
-  }
-
-  /**
-   * Helper to add a {@link Mutation} to the values stored for the current row
-   * @param pendingUpdate update to apply
-   */
-  public void addUpdateForTesting(Mutation pendingUpdate) {
-    for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
-      List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue());
-      addUpdate(edits);
-    }
-  }
-
-  /**
-   * @param hints
-   */
-  public void setHints(List<? extends IndexedColumnGroup> hints) {
-    this.hints = hints;
-  }
-
-  @Override
-  public List<? extends IndexedColumnGroup> getIndexColumnHints() {
-    return this.hints;
-  }
-
-  @Override
-  public Collection<KeyValue> getPendingUpdate() {
-    return this.kvs;
-  }
-
-  /**
-   * Set the {@link KeyValue}s in the update for which we are currently building an index update,
-   * but don't actually apply them.
-   * @param update pending {@link KeyValue}s
-   */
-  public void setPendingUpdates(Collection<KeyValue> update) {
-    this.kvs.clear();
-    this.kvs.addAll(update);
-  }
-
-  /**
-   * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
-   */
-  public void applyPendingUpdates() {
-    this.addUpdate(kvs);
-  }
-
-  /**
-   * Rollback all the given values from the underlying state.
-   * @param values
-   */
-  public void rollback(Collection<KeyValue> values) {
-    for (KeyValue kv : values) {
-      this.memstore.rollback(kv);
-    }
-  }
+    private long ts;
+    private RegionCoprocessorEnvironment env;
+    private KeyValueStore memstore;
+    private LocalHBaseState table;
+    private Mutation update;
+    private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
+    private ScannerBuilder scannerBuilder;
+    private List<Cell> kvs = new ArrayList<Cell>();
+    private List<? extends IndexedColumnGroup> hints;
+    private CoveredColumns columnSet;
+    private final Map<String,Object> context = Maps.newHashMap();
+
+    public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
+        this.env = environment;
+        this.table = table;
+        this.update = update;
+        this.memstore = new IndexMemStore();
+        this.scannerBuilder = new ScannerBuilder(memstore, update);
+        this.columnSet = new CoveredColumns();
+    }
+
+    public void addPendingUpdates(Cell... kvs) {
+        if (kvs == null) return;
+        addPendingUpdates(Arrays.asList(kvs));
+    }
+
+    public void addPendingUpdates(List<Cell> kvs) {
+        if (kvs == null) return;
+        setPendingUpdates(kvs);
+        addUpdate(kvs);
+    }
+
+    private void addUpdate(List<Cell> list) {
+        addUpdate(list, true);
+    }
+
+    private void addUpdate(List<Cell> list, boolean overwrite) {
+        if (list == null) return;
+        for (Cell kv : list) {
+            this.memstore.add(KeyValueUtil.ensureKeyValue(kv), overwrite);
+        }
+    }
+
+    @Override
+    public RegionCoprocessorEnvironment getEnvironment() {
+        return this.env;
+    }
+
+    @Override
+    public long getCurrentTimestamp() {
+        return this.ts;
+    }
+
+    /**
+     * Set the current timestamp up to which the table should allow access to the underlying table.
+     * This overrides the timestamp view provided by the indexer - use with care!
+     * @param timestamp timestamp up to which the table should allow access.
+     */
+    public void setCurrentTimestamp(long timestamp) {
+        this.ts = timestamp;
+    }
+
+    public void resetTrackedColumns() {
+        this.trackedColumns.clear();
+    }
+
+    public Set<ColumnTracker> getTrackedColumns() {
+        return this.trackedColumns;
+    }
+
+    /**
+     * Get a scanner on the columns that are needed by the index.
+     * <p>
+     * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given
+     * columns with a timestamp earlier than the timestamp to which the table is currently set (the
+     * current state of the table for which we need to build an update).
+     * <p>
+     * If none of the passed columns matches any of the columns in the pending update (as determined
+     * by {@link ColumnReference#matchesFamily(byte[])} and
+     * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This
+     * is because it doesn't make sense to build index updates when there is no change in the table
+     * state for any of the columns you are indexing.
+     * <p>
+     * <i>NOTE:</i> This method should <b>not</b> be used during
+     * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been
+     * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
+     * need to track the indexed columns.
+     * <p>
+     * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you
+     * request - you will never see a column with the timestamp we are tracking, but the next oldest
+     * timestamp for that column.
+     * @param indexedColumns the columns to that will be indexed
+     * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
+     *         the builder. Even if no update is necessary for the requested columns, you still need
+     *         to return the {@link IndexUpdate}, just don't set the update for the
+     *         {@link IndexUpdate}.
+     * @throws IOException
+     */
+    public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+        Collection<? extends ColumnReference> indexedColumns) throws IOException {
+        ensureLocalStateInitialized(indexedColumns);
+        // filter out things with a newer timestamp and track the column references to which it applies
+        ColumnTracker tracker = new ColumnTracker(indexedColumns);
+        synchronized (this.trackedColumns) {
+            // we haven't seen this set of columns before, so we need to create a new tracker
+            if (!this.trackedColumns.contains(tracker)) {
+                this.trackedColumns.add(tracker);
+            }
+        }
+
+        Scanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
+
+        return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
+    }
+
+    /**
+     * Initialize the managed local state. Generally, this will only be called by
+     * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even
+     * then, there is still fairly low contention as each new Put/Delete will have its own table state.
+     */
+    private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns)
+            throws IOException {
+        // check to see if we haven't initialized any columns yet
+        Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
+        // we have all the columns loaded, so we are good to go.
+        if (toCover.isEmpty()) { return; }
+
+        // add the current state of the row
+        this.addUpdate(this.table.getCurrentRowState(update, toCover).listCells(), false);
+
+        // add the covered columns to the set
+        for (ColumnReference ref : toCover) {
+            this.columnSet.addColumn(ref);
+        }
+    }
+
+    @Override
+    public Map<String, byte[]> getUpdateAttributes() {
+        return this.update.getAttributesMap();
+    }
+
+    @Override
+    public byte[] getCurrentRowKey() {
+        return this.update.getRow();
+    }
+
+    public Result getCurrentRowState() {
+        KeyValueScanner scanner = this.memstore.getScanner();
+        List<Cell> kvs = new ArrayList<Cell>();
+        while (scanner.peek() != null) {
+            try {
+                kvs.add(scanner.next());
+            } catch (IOException e) {
+                // this should never happen - something has gone terribly arwy if it has
+                throw new RuntimeException("Local MemStore threw IOException!");
+            }
+        }
+        return Result.create(kvs);
+    }
+
+    /**
+     * Helper to add a {@link Mutation} to the values stored for the current row
+     * 
+     * @param pendingUpdate
+     *            update to apply
+     */
+    public void addUpdateForTesting(Mutation pendingUpdate) {
+        for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
+            List<Cell> edits = e.getValue();
+            addUpdate(edits);
+        }
+    }
+
+    /**
+     * @param hints
+     */
+    public void setHints(List<? extends IndexedColumnGroup> hints) {
+        this.hints = hints;
+    }
+
+    @Override
+    public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+        return this.hints;
+    }
+
+    @Override
+    public Collection<Cell> getPendingUpdate() {
+        return this.kvs;
+    }
+
+    /**
+     * Set the {@link KeyValue}s in the update for which we are currently building an index update, but don't actually
+     * apply them.
+     * 
+     * @param update
+     *            pending {@link KeyValue}s
+     */
+    public void setPendingUpdates(Collection<Cell> update) {
+        this.kvs.clear();
+        this.kvs.addAll(update);
+    }
+
+    /**
+     * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
+     */
+    public void applyPendingUpdates() {
+        this.addUpdate(kvs);
+    }
+
+    /**
+     * Rollback all the given values from the underlying state.
+     * 
+     * @param values
+     */
+    public void rollback(Collection<Cell> values) {
+        for (Cell kv : values) {
+            this.memstore.rollback(KeyValueUtil.ensureKeyValue(kv));
+        }
+    }
+
+    @Override
+    public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+            throws IOException {
+        Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns);
+        ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
+        return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
+    }
+
+    @Override
+    public Map<String, Object> getContext() {
+        return context;
+    }
 }
\ No newline at end of file