You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2018/03/22 16:57:22 UTC

[4/4] phoenix git commit: PHOENIX-4619 Process transactional updates to local index on server-side

PHOENIX-4619 Process transactional updates to local index on server-side


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5814fcbe
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5814fcbe
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5814fcbe

Branch: refs/heads/4.x-HBase-1.3
Commit: 5814fcbe0529efb9e1d308c09870cefbf1872a95
Parents: d2d10ca
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Mar 17 12:52:38 2018 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Mar 22 09:45:35 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/BaseIndexIT.java      |  16 +-
 .../phoenix/end2end/index/ImmutableIndexIT.java |   3 +-
 .../end2end/index/MutableIndexFailureIT.java    |   8 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  36 +-
 .../PhoenixTransactionalProcessor.java          |   2 +-
 .../apache/phoenix/execute/MutationState.java   | 103 +++-
 .../PhoenixTxIndexMutationGenerator.java        | 449 ++++++++++++++++
 .../PhoenixTxnIndexMutationGenerator.java       | 519 -------------------
 .../org/apache/phoenix/hbase/index/Indexer.java |   1 -
 .../hbase/index/builder/BaseIndexBuilder.java   |   4 +-
 .../hbase/index/builder/BaseIndexCodec.java     |   7 -
 .../phoenix/hbase/index/covered/IndexCodec.java |  14 +-
 .../hbase/index/covered/LocalTableState.java    |  10 +-
 .../hbase/index/covered/NonTxIndexBuilder.java  |   2 +-
 .../phoenix/hbase/index/covered/TableState.java |   8 -
 .../apache/phoenix/index/IndexMaintainer.java   |  23 +-
 .../phoenix/index/PhoenixIndexBuilder.java      |  21 +-
 .../apache/phoenix/index/PhoenixIndexCodec.java |  34 +-
 .../phoenix/index/PhoenixIndexMetaData.java     |  78 +--
 .../index/PhoenixIndexMetaDataBuilder.java      | 106 ++++
 .../index/PhoenixTransactionalIndexer.java      | 442 +---------------
 .../query/ConnectionQueryServicesImpl.java      |   8 +
 .../transaction/OmidTransactionContext.java     |   2 +-
 .../transaction/PhoenixTransactionContext.java  |   2 +-
 .../transaction/TephraTransactionContext.java   |   2 +-
 .../index/covered/CoveredColumnIndexCodec.java  |   6 +-
 .../covered/CoveredIndexCodecForTesting.java    |   5 +-
 .../index/covered/LocalTableStateTest.java      |  10 +-
 .../index/covered/NonTxIndexBuilderTest.java    |   3 +
 .../covered/TestCoveredColumnIndexCodec.java    |   6 +-
 30 files changed, 785 insertions(+), 1145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
index 1483c58..f914256 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java
@@ -239,15 +239,17 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT {
     }
 
     private void assertNoClientSideIndexMutations(Connection conn) throws SQLException {
-        if (mutable) {
-            Iterator<Pair<byte[],List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
-            if (iterator.hasNext()) {
-                byte[] tableName = iterator.next().getFirst(); // skip data table mutations
-                PTable table = PhoenixRuntime.getTable(conn, Bytes.toString(tableName));
+        Iterator<Pair<byte[],List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
+        if (iterator.hasNext()) {
+            byte[] tableName = iterator.next().getFirst(); // skip data table mutations
+            PTable table = PhoenixRuntime.getTable(conn, Bytes.toString(tableName));
+            boolean clientSideUpdate = !localIndex && (!mutable || transactional);
+            if (!clientSideUpdate) {
                 assertTrue(table.getType() == PTableType.TABLE); // should be data table
-                boolean hasIndexData = iterator.hasNext();
-                assertFalse(hasIndexData && !transactional); // should have no index data
             }
+            boolean hasIndexData = iterator.hasNext();
+            // global immutable and global transactional tables are processed client side
+            assertEquals(clientSideUpdate, hasIndexData); 
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index d520824..1db9787 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -252,8 +252,9 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
         Iterator<Pair<byte[], List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
         assertTrue(iterator.hasNext());
         iterator.next();
-        assertEquals((!localIndex || transactional), iterator.hasNext());
+        assertEquals(!localIndex, iterator.hasNext());
     }
+    
 
     // This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back.
     @Ignore

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 7e1367d..38bde7f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -112,7 +112,7 @@ public class MutableIndexFailureIT extends BaseTest {
     public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, boolean failRebuildTask, Boolean throwIndexWriteFailure) {
         this.transactional = transactional;
         this.localIndex = localIndex;
-        this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "") 
+        this.tableDDLOptions = " SALT_BUCKETS=2, COLUMN_ENCODED_BYTES=NONE" + (transactional ? ", TRANSACTIONAL=true " : "") 
                 + (disableIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + disableIndexOnWriteFailure))
                 + (throwIndexWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.THROW_INDEX_WRITE_FAILURE + "=" + throwIndexWriteFailure));
         this.tableName = FailingRegionObserver.FAIL_TABLE_NAME;
@@ -289,7 +289,6 @@ public class MutableIndexFailureIT extends BaseTest {
             assertEquals("z", rs.getString(2));
             assertFalse(rs.next());
 
-            FailingRegionObserver.FAIL_WRITE = true;
             updateTable(conn, true);
             // Verify the metadata for index is correct.
             rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), StringUtil.escapeLike(indexName),
@@ -466,9 +465,12 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
         stmt.setString(1, "b");
         stmt.execute();
+        // Set to fail after the DELETE, since transactional tables will write
+        // uncommitted data when the DELETE is executed.
+        FailingRegionObserver.FAIL_WRITE = true;
         try {
             conn.commit();
-            if (commitShouldFail && !localIndex && this.throwIndexWriteFailure) {
+            if (commitShouldFail && (!localIndex || transactional) && this.throwIndexWriteFailure) {
                 fail();
             }
         } catch (CommitException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 70043bb..7985314 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -226,7 +226,7 @@ public class DeleteCompiler {
                 // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
                 // row key will already have its value.
                 // Check for otherTableRefs being empty required when deleting directly from the index
-                if (otherTableRefs.isEmpty() || (table.getIndexType() != IndexType.LOCAL && table.isImmutableRows())) {
+                if (otherTableRefs.isEmpty() || isMaintainedOnClient(table)) {
                     mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                 }
                 for (int i = 0; i < otherTableRefs.size(); i++) {
@@ -311,12 +311,12 @@ public class DeleteCompiler {
         }
     }
     
-    private List<PTable> getNonDisabledGlobalImmutableIndexes(TableRef tableRef) {
+    private List<PTable> getClientSideMaintainedIndexes(TableRef tableRef) {
         PTable table = tableRef.getTable();
-        if (table.isImmutableRows() && !table.getIndexes().isEmpty()) {
+        if (!table.getIndexes().isEmpty()) {
             List<PTable> nonDisabledIndexes = Lists.newArrayListWithExpectedSize(table.getIndexes().size());
             for (PTable index : table.getIndexes()) {
-                if (index.getIndexState() != PIndexState.DISABLE && index.getIndexType() == IndexType.GLOBAL) {
+                if (index.getIndexState() != PIndexState.DISABLE && isMaintainedOnClient(index)) {
                     nonDisabledIndexes.add(index);
                 }
             }
@@ -459,8 +459,8 @@ public class DeleteCompiler {
            .setTableName(tableName).build().buildException();
         }
         
-        List<PTable> immutableIndexes = getNonDisabledGlobalImmutableIndexes(targetTableRef);
-        final boolean hasImmutableIndexes = !immutableIndexes.isEmpty();
+        List<PTable> clientSideIndexes = getClientSideMaintainedIndexes(targetTableRef);
+        final boolean hasClientSideIndexes = !clientSideIndexes.isEmpty();
 
         boolean isSalted = table.getBucketNum() != null;
         boolean isMultiTenant = connection.getTenantId() != null && table.isMultiTenant();
@@ -468,7 +468,7 @@ public class DeleteCompiler {
         int pkColumnOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
         final int pkColumnCount = table.getPKColumns().size() - pkColumnOffset;
         int selectColumnCount = pkColumnCount;
-        for (PTable index : immutableIndexes) {
+        for (PTable index : clientSideIndexes) {
             selectColumnCount += index.getPKColumns().size() - pkColumnCount;
         }
         Set<PColumn> projectedColumns = new LinkedHashSet<PColumn>(selectColumnCount + pkColumnOffset);
@@ -518,7 +518,7 @@ public class DeleteCompiler {
         // that is being upserted for conflict detection purposes.
         // If we have immutable indexes, we'd increase the number of bytes scanned by executing
         // separate queries against each index, so better to drive from a single table in that case.
-        boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasImmutableIndexes;
+        boolean runOnServer = isAutoCommit && !hasPreOrPostProcessing && !table.isTransactional() && !hasClientSideIndexes;
         HintNode hint = delete.getHint();
         if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
             select = SelectStatement.create(select, HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE));
@@ -529,7 +529,7 @@ public class DeleteCompiler {
         QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe, new SequenceManager(statement));
         final QueryPlan dataPlan = compiler.compile();
         // TODO: the select clause should know that there's a sub query, but doesn't seem to currently
-        queryPlans = Lists.newArrayList(!immutableIndexes.isEmpty()
+        queryPlans = Lists.newArrayList(!clientSideIndexes.isEmpty()
                 ? optimizer.getApplicablePlans(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe)
                 : optimizer.getBestPlan(dataPlan, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactoryToBe));
         // Filter out any local indexes that don't contain all indexed columns.
@@ -559,7 +559,7 @@ public class DeleteCompiler {
         // may have been optimized out. Instead, we check that there's a single SkipScanFilter
         // If we can generate a plan for every index, that means all the required columns are available in every index,
         // hence we can drive the delete from any of the plans.
-        noQueryReqd &= queryPlans.size() == 1 + immutableIndexes.size();
+        noQueryReqd &= queryPlans.size() == 1 + clientSideIndexes.size();
         int queryPlanIndex = 0;
         while (noQueryReqd && queryPlanIndex < queryPlans.size()) {
             QueryPlan plan = queryPlans.get(queryPlanIndex++);
@@ -578,7 +578,6 @@ public class DeleteCompiler {
             // from the data table, while the others will be for deleting rows from immutable indexes.
             List<MutationPlan> mutationPlans = Lists.newArrayListWithExpectedSize(queryPlans.size());
             for (final QueryPlan plan : queryPlans) {
-                final StatementContext context = plan.getContext();
                 mutationPlans.add(new SingleRowDeleteMutationPlan(plan, connection, maxSize, maxSizeBytes));
             }
             return new MultiRowDeleteMutationPlan(dataPlan, mutationPlans);
@@ -628,8 +627,8 @@ public class DeleteCompiler {
                 }
             }
             final QueryPlan bestPlan = bestPlanToBe;
-            final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(immutableIndexes.size());
-            for (PTable index : immutableIndexes) {
+            final List<TableRef>otherTableRefs = Lists.newArrayListWithExpectedSize(clientSideIndexes.size());
+            for (PTable index : clientSideIndexes) {
                 if (!bestPlan.getTableRef().getTable().equals(index)) {
                     otherTableRefs.add(new TableRef(index, targetTableRef.getLowerBoundTimeStamp(), targetTableRef.getTimeStamp()));
                 }
@@ -917,13 +916,13 @@ public class DeleteCompiler {
                     int totalTablesUpdateClientSide = 1; // data table is always updated
                     PTable bestTable = bestPlan.getTableRef().getTable();
                     // global immutable tables are also updated client side (but don't double count the data table)
-                    if (bestPlan != dataPlan && bestTable.getIndexType() == IndexType.GLOBAL && bestTable.isImmutableRows()) {
+                    if (bestPlan != dataPlan && isMaintainedOnClient(bestTable)) {
                         totalTablesUpdateClientSide++;
                     }
                     for (TableRef otherTableRef : otherTableRefs) {
                         PTable otherTable = otherTableRef.getTable();
                         // Don't double count the data table here (which morphs when it becomes a projected table, hence this check)
-                        if (projectedTableRef != otherTableRef && otherTable.getIndexType() == IndexType.GLOBAL && otherTable.isImmutableRows()) {
+                        if (projectedTableRef != otherTableRef && isMaintainedOnClient(otherTable)) {
                             totalTablesUpdateClientSide++;
                         }
                     }
@@ -972,4 +971,11 @@ public class DeleteCompiler {
             return bestPlan;
         }
     }
+    
+    private static boolean isMaintainedOnClient(PTable table) {
+        // Test for not being local (rather than being GLOBAL) so that this doesn't fail
+        // when tested with our projected table.
+        return table.getIndexType() != IndexType.LOCAL && (table.isImmutableRows() || table.isTransactional());
+    }
+    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
index 37fa2ab..ca0c997 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
@@ -22,7 +22,7 @@ import org.apache.phoenix.transaction.TransactionFactory;
 public class PhoenixTransactionalProcessor extends DelegateRegionObserver {
 
     public PhoenixTransactionalProcessor() {
-        super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoProcessor());
+        super(TransactionFactory.getTransactionFactory().getTransactionContext().getCoprocessor());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d3b3ca0..189bc5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.htrace.Span;
 import org.apache.htrace.TraceScope;
+import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.MutationPlan;
@@ -62,6 +63,7 @@ import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.monitoring.GlobalClientMetrics;
@@ -138,7 +140,6 @@ public class MutationState implements SQLCloseable {
     private Map<TableRef, MultiRowMutationState> txMutations = Collections.emptyMap();
 
     final PhoenixTransactionContext phoenixTransactionContext;
-    final PhoenixTxnIndexMutationGenerator phoenixTxnIndexMutationGenerator;
 
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
@@ -180,7 +181,7 @@ public class MutationState implements SQLCloseable {
         boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
         this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
                 : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
-        if (subTask == false) {
+        if (!subTask) {
             if (txContext == null) {
                 phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(connection);
             } else {
@@ -192,8 +193,6 @@ public class MutationState implements SQLCloseable {
             // as it is not thread safe, so we use the tx member variable
             phoenixTransactionContext = TransactionFactory.getTransactionFactory().getTransactionContext(txContext, connection, subTask);
         }
-
-        phoenixTxnIndexMutationGenerator = new PhoenixTxnIndexMutationGenerator(connection, phoenixTransactionContext);
     }
 
     public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection)  throws SQLException {
@@ -496,17 +495,20 @@ public class MutationState implements SQLCloseable {
     private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values,
             final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) {
         final PTable table = tableRef.getTable();
-        final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
-                (includeAllIndexes  || table.isTransactional()) ?
+        final Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this client-side mechanism
+                includeAllIndexes ?
                          IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) :
-                             (table.isImmutableRows()) ?
+                             (table.isImmutableRows() || table.isTransactional()) ?
                                 IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) :
                                     Collections.<PTable>emptyIterator();
+        final List<PTable> indexList = Lists.newArrayList(indexIterator);
+        final Iterator<PTable> indexes = indexList.iterator();
         final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
         generateMutations(tableRef, mutationTimestamp, serverTimestamp, values, mutationList, mutationsPertainingToIndex);
         return new Iterator<Pair<PName,List<Mutation>>>() {
             boolean isFirst = true;
+            Map<byte[],List<Mutation>> indexMutationsMap = null;
 
             @Override
             public boolean hasNext() {
@@ -519,15 +521,34 @@ public class MutationState implements SQLCloseable {
                     isFirst = false;
                     return new Pair<PName,List<Mutation>>(table.getPhysicalName(), mutationList);
                 }
+
                 PTable index = indexes.next();
-                List<Mutation> indexMutations;
+                
+                List<Mutation> indexMutations = null;
                 try {
-                    if ((table.isImmutableRows() && (index.getIndexType() != IndexType.LOCAL)) || !table.isTransactional()) {
-                        indexMutations =
-                            IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex,
-                                connection.getKeyValueBuilder(), connection);
-                    } else {
-                        indexMutations = phoenixTxnIndexMutationGenerator.getIndexUpdates(table, index, mutationsPertainingToIndex);
+                    if (!mutationsPertainingToIndex.isEmpty()) {
+                        if (table.isTransactional()) {
+                            if (indexMutationsMap == null) {
+                                PhoenixTxIndexMutationGenerator generator = newTxIndexMutationGenerator(table, indexList, mutationsPertainingToIndex.get(0).getAttributesMap());
+                                try (HTableInterface htable = connection.getQueryServices().getTable(table.getPhysicalName().getBytes())) {
+                                    Collection<Pair<Mutation, byte[]>> allMutations = generator.getIndexUpdates(htable, mutationsPertainingToIndex.iterator());
+                                    indexMutationsMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                                    for (Pair<Mutation, byte[]> mutation : allMutations) {
+                                        List<Mutation> mutations = indexMutationsMap.get(mutation.getSecond());
+                                        if (mutations == null) {
+                                            mutations = Lists.newArrayList();
+                                            indexMutationsMap.put(mutation.getSecond(), mutations);
+                                        }
+                                        mutations.add(mutation.getFirst());
+                                    }
+                                }
+                            }
+                            indexMutations = indexMutationsMap.get(index.getPhysicalName().getBytes());
+                         } else {
+                            indexMutations =
+                                    IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex,
+                                        connection.getKeyValueBuilder(), connection);
+                        }
                     }
 
                     // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map
@@ -537,13 +558,17 @@ public class MutationState implements SQLCloseable {
                         if (multiRowMutationState!=null) {
                             final List<Mutation> deleteMutations = Lists.newArrayList();
                             generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null);
-                            indexMutations.addAll(deleteMutations);
+                            if (indexMutations == null) {
+                                indexMutations = deleteMutations;
+                            } else {
+                                indexMutations.addAll(deleteMutations);
+                            }
                         }
                     }
                 } catch (SQLException | IOException e) {
                     throw new IllegalDataException(e);
                 }
-                return new Pair<PName,List<Mutation>>(index.getPhysicalName(),indexMutations);
+                return new Pair<PName,List<Mutation>>(index.getPhysicalName(),indexMutations == null ? Collections.<Mutation>emptyList() : indexMutations);
             }
 
             @Override
@@ -554,6 +579,42 @@ public class MutationState implements SQLCloseable {
         };
     }
 
+    private PhoenixTxIndexMutationGenerator newTxIndexMutationGenerator(PTable table, List<PTable> indexes, Map<String,byte[]> attributes) {
+        final List<IndexMaintainer> indexMaintainers = Lists.newArrayListWithExpectedSize(indexes.size());
+        for (PTable index : indexes) {
+            IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
+            indexMaintainers.add(maintainer);
+        }
+        IndexMetaDataCache indexMetaDataCache = new IndexMetaDataCache() {
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public List<IndexMaintainer> getIndexMaintainers() {
+                return indexMaintainers;
+            }
+
+            @Override
+            public PhoenixTransactionContext getTransactionContext() {
+                return phoenixTransactionContext;
+            }
+
+            @Override
+            public int getClientVersion() {
+                return MetaDataProtocol.PHOENIX_VERSION;
+            }
+            
+        };
+        try {
+            PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(indexMetaDataCache, attributes);
+            return new PhoenixTxIndexMutationGenerator(connection.getQueryServices().getConfiguration(), indexMetaData, table.getPhysicalName().getBytes());
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+    }
+    
     private void generateMutations(final TableRef tableRef, final long mutationTimestamp,
             final long serverTimestamp, final MultiRowMutationState values,
             final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) {
@@ -585,17 +646,13 @@ public class MutationState implements SQLCloseable {
                     }
                 }
             }
-            PRow row =
-                    tableRef.getTable()
-                            .newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);
+            PRow row = table.newRow(connection.getKeyValueBuilder(), timestampToUse, key, hasOnDupKey);
             List<Mutation> rowMutations, rowMutationsPertainingToIndex;
             if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
                 row.delete();
                 rowMutations = row.toRowMutations();
-                // Row deletes for index tables are processed by running a re-written query
-                // against the index table (as this allows for flexibility in being able to
-                // delete rows).
-                rowMutationsPertainingToIndex = rowMutations;
+                // The DeleteCompiler already generates the deletes for indexes, so no need to do it again
+                rowMutationsPertainingToIndex = Collections.emptyList();
             } else {
                 for (Map.Entry<PColumn, byte[]> valueEntry : rowEntry.getValue().getColumnValues()
                         .entrySet()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
new file mode 100644
index 0000000..b5031af
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.MultiMutation;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
+import org.apache.phoenix.transaction.PhoenixTransactionalTable;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Longs;
+
+
+public class PhoenixTxIndexMutationGenerator {
+
+    private static final Log LOG = LogFactory.getLog(PhoenixTxIndexMutationGenerator.class);
+
+    private final PhoenixIndexCodec codec;
+    private final PhoenixIndexMetaData indexMetaData;
+
+    public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName, byte[] regionStartKey, byte[] regionEndKey) {
+        this.indexMetaData = indexMetaData;
+        this.codec = new PhoenixIndexCodec(conf, regionStartKey, regionEndKey, tableName);
+    }
+
+    public PhoenixTxIndexMutationGenerator(Configuration conf, PhoenixIndexMetaData indexMetaData, byte[] tableName) {
+        this(conf, indexMetaData, tableName, null, null);
+    }
+
+    private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
+        MultiMutation stored = mutations.get(row);
+        // we haven't seen this row before, so add it
+        if (stored == null) {
+            stored = new MultiMutation(row);
+            mutations.put(row, stored);
+        }
+        stored.addAll(m);
+    }
+
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdates(HTableInterface htable, Iterator<Mutation> mutationIterator) throws IOException, SQLException {
+
+        if (!mutationIterator.hasNext()) {
+            return Collections.emptyList();
+        }
+
+        List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
+        ResultScanner currentScanner = null;
+        // Collect up all mutations in batch
+        Map<ImmutableBytesPtr, MultiMutation> mutations =
+                new HashMap<ImmutableBytesPtr, MultiMutation>();
+        // Collect the set of mutable ColumnReferences so that we can first
+        // run a scan to get the current state. We'll need this to delete
+        // the existing index rows.
+        int estimatedSize = indexMaintainers.size() * 10;
+        Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize);
+        for (IndexMaintainer indexMaintainer : indexMaintainers) {
+            // For transactional tables, we use an index maintainer
+            // to aid in rollback if there's a KeyValue column in the index. The alternative would be
+            // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
+            // client side.
+            Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
+            mutableColumns.addAll(allColumns);
+        }
+
+        Mutation m = mutationIterator.next();
+        Map<String,byte[]> updateAttributes = m.getAttributesMap();
+        byte[] txRollbackAttribute = updateAttributes.get(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
+        boolean isRollback = txRollbackAttribute!=null;
+        
+        boolean isImmutable = indexMetaData.isImmutableRows();
+        Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations;
+        if (isImmutable && !isRollback) {
+            findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
+        } else {
+            findPriorValueMutations = mutations;
+        }
+        
+        while (true) {
+            // add the mutation to the batch set
+            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+            // if we have no non PK columns, no need to find the prior values
+            if ( mutations != findPriorValueMutations && indexMetaData.requiresPriorRowState(m) ) {
+                addMutation(findPriorValueMutations, row, m);
+            }
+            addMutation(mutations, row, m);
+            
+            if (!mutationIterator.hasNext()) {
+                break;
+            }
+            m = mutationIterator.next();
+        }
+        
+        Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
+        // Track if we have row keys with Delete mutations (or Puts that are
+        // Tephra's Delete marker). If there are none, we don't need to do the scan for
+        // prior versions, if there are, we do. Since rollbacks always have delete mutations,
+        // this logic will work there too.
+        if (!findPriorValueMutations.isEmpty()) {
+            List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
+            for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) {
+                keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
+            }
+            Scan scan = new Scan();
+            // Project all mutable columns
+            for (ColumnReference ref : mutableColumns) {
+                scan.addColumn(ref.getFamily(), ref.getQualifier());
+            }
+            /*
+             * Indexes inherit the storage scheme of the data table which means all the indexes have the same
+             * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
+             * supporting new indexes over existing data tables to have a different storage scheme than the data
+             * table.
+             */
+            byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier();
+            
+            // Project empty key value column
+            scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
+            ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
+            scanRanges.initializeScan(scan);
+            PhoenixTransactionalTable txTable = TransactionFactory.getTransactionFactory().getTransactionalTable(indexMetaData.getTransactionContext(), htable);
+            // For rollback, we need to see all versions, including
+            // the last committed version as there may be multiple
+            // checkpointed versions.
+            SkipScanFilter filter = scanRanges.getSkipScanFilter();
+            if (isRollback) {
+                filter = new SkipScanFilter(filter,true);
+                indexMetaData.getTransactionContext().setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
+            }
+            scan.setFilter(filter);
+            currentScanner = txTable.getScanner(scan);
+        }
+        if (isRollback) {
+            processRollback(indexMetaData, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations);
+        } else {
+            processMutation(indexMetaData, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, findPriorValueMutations);
+        }
+        
+        return indexUpdates;
+    }
+
+    private void processMutation(PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
+            ResultScanner scanner,
+            Set<ColumnReference> upsertColumns, 
+            Collection<Pair<Mutation, byte[]>> indexUpdates,
+            Map<ImmutableBytesPtr, MultiMutation> mutations,
+            Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException {
+        List<IndexMaintainer> indexMaintainers = indexMetaData.getIndexMaintainers();
+        if (scanner != null) {
+            Result result;
+            ColumnReference emptyColRef = new ColumnReference(indexMaintainers.get(0)
+                    .getDataEmptyKeyValueCF(), indexMaintainers.get(0).getEmptyKeyValueQualifier());
+            // Process existing data table rows by removing the old index row and adding the new index row
+            while ((result = scanner.next()) != null) {
+                Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
+                TxTableState state = new TxTableState(upsertColumns, indexMetaData.getTransactionContext().getWritePointer(), m, emptyColRef, result);
+                generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
+                generatePuts(indexMetaData, indexUpdates, state);
+            }
+        }
+        // Process new data table by adding new index rows
+        for (Mutation m : mutations.values()) {
+            TxTableState state = new TxTableState(upsertColumns, indexMetaData.getTransactionContext().getWritePointer(), m);
+            generatePuts(indexMetaData, indexUpdates, state);
+            generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
+        }
+    }
+
+    private void processRollback(PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
+            ResultScanner scanner,
+            Set<ColumnReference> mutableColumns,
+            Collection<Pair<Mutation, byte[]>> indexUpdates,
+            Map<ImmutableBytesPtr, MultiMutation> mutations) throws IOException {
+        if (scanner != null) {
+            long readPtr = indexMetaData.getTransactionContext().getReadPointer();
+            Result result;
+            // Loop through last committed row state plus all new rows associated with current transaction
+            // to generate point delete markers for all index rows that were added. We don't have Tephra
+            // manage index rows in change sets because we don't want to be hit with the additional
+            // memory hit and do not need to do conflict detection on index rows.
+            ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier());
+            while ((result = scanner.next()) != null) {
+                Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
+                // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest
+                // (as if we're "replaying" them in time order).
+                List<Cell> cells = result.listCells();
+                Collections.sort(cells, new Comparator<Cell>() {
+
+                    @Override
+                    public int compare(Cell o1, Cell o2) {
+                        int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+                        if (c != 0) return c;
+                        c = o1.getTypeByte() - o2.getTypeByte();
+                        if (c != 0) return c;
+                        c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength());
+                        if (c != 0) return c;
+                        return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength());
+                    }
+                    
+                });
+                int i = 0;
+                int nCells = cells.size();
+                Result oldResult = null, newResult;
+                do {
+                    boolean hasPuts = false;
+                    LinkedList<Cell> singleTimeCells = Lists.newLinkedList();
+                    long writePtr;
+                    Cell cell = cells.get(i);
+                    do {
+                        hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode();
+                        writePtr = cell.getTimestamp();
+                        ListIterator<Cell> it = singleTimeCells.listIterator();
+                        do {
+                            // Add at the beginning of the list to match the expected HBase
+                            // newest to oldest sort order (which TxTableState relies on
+                            // with the Result.getLatestColumnValue() calls). However, we
+                            // still want to add Cells in the expected order for each time
+                            // bound as otherwise we won't find it in our old state.
+                            it.add(cell);
+                        } while (++i < nCells && (cell=cells.get(i)).getTimestamp() == writePtr);
+                    } while (i < nCells && cell.getTimestamp() <= readPtr);
+                    
+                    // Generate point delete markers for the prior row deletion of the old index value.
+                    // The write timestamp is the next timestamp, not the current timestamp,
+                    // as the earliest cells are the current values for the row (and we don't
+                    // want to delete the current row).
+                    if (oldResult != null) {
+                        TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, oldResult);
+                        generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
+                    }
+                    // Generate point delete markers for the new index value.
+                    // If our time batch doesn't have Puts (i.e. we have only Deletes), then do not
+                    // generate deletes. We would have generated the delete above based on the state
+                    // of the previous row. The delete markers do not give us the state we need to
+                    // delete.
+                    if (hasPuts) {
+                        newResult = Result.create(singleTimeCells);
+                        // First row may represent the current state which we don't want to delete
+                        if (writePtr > readPtr) {
+                            TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, newResult);
+                            generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
+                        }
+                        oldResult = newResult;
+                    } else {
+                        oldResult = null;
+                    }
+                } while (i < nCells);
+            }
+        }
+    }
+
+    private void generateDeletes(PhoenixIndexMetaData indexMetaData,
+            Collection<Pair<Mutation, byte[]>> indexUpdates,
+            byte[] attribValue, TxTableState state) throws IOException {
+        Iterable<IndexUpdate> deletes = codec.getIndexDeletes(state, indexMetaData);
+        for (IndexUpdate delete : deletes) {
+            if (delete.isValid()) {
+                delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
+                indexUpdates.add(new Pair<Mutation, byte[]>(delete.getUpdate(),delete.getTableName()));
+            }
+        }
+    }
+
+    private boolean generatePuts(
+            PhoenixIndexMetaData indexMetaData,
+            Collection<Pair<Mutation, byte[]>> indexUpdates,
+            TxTableState state)
+            throws IOException {
+        state.applyMutation();
+        Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, indexMetaData);
+        boolean validPut = false;
+        for (IndexUpdate put : puts) {
+            if (put.isValid()) {
+                indexUpdates.add(new Pair<Mutation, byte[]>(put.getUpdate(),put.getTableName()));
+                validPut = true;
+            }
+        }
+        return validPut;
+    }
+
+
+    private static class TxTableState implements TableState {
+        private final Mutation mutation;
+        private final long currentTimestamp;
+        private final List<KeyValue> pendingUpdates;
+        private final Set<ColumnReference> indexedColumns;
+        private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
+        
+        private TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation mutation) {
+            this.currentTimestamp = currentTimestamp;
+            this.indexedColumns = indexedColumns;
+            this.mutation = mutation;
+            int estimatedSize = indexedColumns.size();
+            this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
+            this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
+            try {
+                CellScanner scanner = mutation.cellScanner();
+                while (scanner.advance()) {
+                    Cell cell = scanner.current();
+                    pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell));
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e); // Impossible
+            }
+        }
+        
+        public TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) {
+            this(indexedColumns, currentTimestamp, m);
+
+            for (ColumnReference ref : indexedColumns) {
+                Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+                if (cell != null) {
+                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                    ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                    valueMap.put(ref, ptr);
+                }
+            }
+        }
+        
+        @Override
+        public long getCurrentTimestamp() {
+            return currentTimestamp;
+        }
+
+        @Override
+        public byte[] getCurrentRowKey() {
+            return mutation.getRow();
+        }
+
+        @Override
+        public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+            return Collections.emptyList();
+        }
+
+        private void applyMutation() {
+            for (Cell cell : pendingUpdates) {
+                if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
+                    ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+                    valueMap.remove(ref);
+                } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
+                    for (ColumnReference ref : indexedColumns) {
+                        if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
+                            valueMap.remove(ref);
+                        }
+                    }
+                } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){
+                    ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+                    if (indexedColumns.contains(ref)) {
+                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                        ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                        valueMap.put(ref, ptr);
+                    }
+                } else {
+                    throw new IllegalStateException("Unexpected mutation type for " + cell);
+                }
+            }
+        }
+        
+        @Override
+        public Collection<KeyValue> getPendingUpdate() {
+            return pendingUpdates;
+        }
+
+        @Override
+        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData)
+                throws IOException {
+            // TODO: creating these objects over and over again is wasteful
+            ColumnTracker tracker = new ColumnTracker(indexedColumns);
+            ValueGetter getter = new ValueGetter() {
+
+                @Override
+                public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+                    return valueMap.get(ref);
+                }
+
+                @Override
+                public byte[] getRowKey() {
+                    return mutation.getRow();
+                }
+                
+            };
+            Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker));
+            return pair;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
deleted file mode 100644
index b596b75..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxnIndexMutationGenerator.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.compile.ScanRanges;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.filter.SkipScanFilter;
-import org.apache.phoenix.hbase.index.MultiMutation;
-import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.IndexUpdate;
-import org.apache.phoenix.hbase.index.covered.TableState;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.types.PVarbinary;
-import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
-import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.primitives.Longs;
-
-
-public class PhoenixTxnIndexMutationGenerator {
-
-    private static final Log LOG = LogFactory.getLog(PhoenixTxnIndexMutationGenerator.class);
-
-    private final PhoenixConnection connection;
-    private final PhoenixTransactionContext phoenixTransactionContext;
-
-    PhoenixTxnIndexMutationGenerator(PhoenixConnection connection, PhoenixTransactionContext phoenixTransactionContext) {
-        this.phoenixTransactionContext = phoenixTransactionContext;
-        this.connection = connection;
-    }
-
-    private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
-        MultiMutation stored = mutations.get(row);
-        // we haven't seen this row before, so add it
-        if (stored == null) {
-            stored = new MultiMutation(row);
-            mutations.put(row, stored);
-        }
-        stored.addAll(m);
-    }
-
-    public List<Mutation> getIndexUpdates(final PTable table, PTable index, List<Mutation> dataMutations) throws IOException, SQLException {
-
-        if (dataMutations.isEmpty()) {
-            return new ArrayList<Mutation>();
-        }
-
-        Map<String,byte[]> updateAttributes = dataMutations.get(0).getAttributesMap();
-        boolean replyWrite = (BaseScannerRegionObserver.ReplayWrite.fromBytes(updateAttributes.get(BaseScannerRegionObserver.REPLAY_WRITES)) != null);
-        byte[] txRollbackAttribute = updateAttributes.get(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
-
-        IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
-
-        boolean isRollback = txRollbackAttribute!=null;
-        boolean isImmutable = index.isImmutableRows();
-        ResultScanner currentScanner = null;
-        HTableInterface txTable = null;
-        // Collect up all mutations in batch
-        Map<ImmutableBytesPtr, MultiMutation> mutations =
-                new HashMap<ImmutableBytesPtr, MultiMutation>();
-        Map<ImmutableBytesPtr, MultiMutation> findPriorValueMutations;
-        if (isImmutable && !isRollback) {
-            findPriorValueMutations = new HashMap<ImmutableBytesPtr, MultiMutation>();
-        } else {
-            findPriorValueMutations = mutations;
-        }
-        // Collect the set of mutable ColumnReferences so that we can first
-        // run a scan to get the current state. We'll need this to delete
-        // the existing index rows.
-        int estimatedSize = 10;
-        Set<ColumnReference> mutableColumns = Sets.newHashSetWithExpectedSize(estimatedSize);
-        // For transactional tables, we use an index maintainer
-        // to aid in rollback if there's a KeyValue column in the index. The alternative would be
-        // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
-        // client side.
-        Set<ColumnReference> allColumns = maintainer.getAllColumns();
-        mutableColumns.addAll(allColumns);
-
-        for(final Mutation m : dataMutations) {
-            // add the mutation to the batch set
-            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-            // if we have no non PK columns, no need to find the prior values
-
-            boolean requiresPriorRowState =  !isImmutable || (maintainer.isRowDeleted(m) && !maintainer.getIndexedColumns().isEmpty());
-            if (mutations != findPriorValueMutations && requiresPriorRowState) {
-                addMutation(findPriorValueMutations, row, m);
-            }
-            addMutation(mutations, row, m);
-        }
-        
-        List<Mutation> indexUpdates = new ArrayList<Mutation>(mutations.size() * 2);
-        try {
-            // Track if we have row keys with Delete mutations (or Puts that are
-            // Tephra's Delete marker). If there are none, we don't need to do the scan for
-            // prior versions, if there are, we do. Since rollbacks always have delete mutations,
-            // this logic will work there too.
-            if (!findPriorValueMutations.isEmpty()) {
-                List<KeyRange> keys = Lists.newArrayListWithExpectedSize(mutations.size());
-                for (ImmutableBytesPtr ptr : findPriorValueMutations.keySet()) {
-                    keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
-                }
-                Scan scan = new Scan();
-                // Project all mutable columns
-                for (ColumnReference ref : mutableColumns) {
-                    scan.addColumn(ref.getFamily(), ref.getQualifier());
-                }
-                /*
-                 * Indexes inherit the storage scheme of the data table which means all the indexes have the same
-                 * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start
-                 * supporting new indexes over existing data tables to have a different storage scheme than the data
-                 * table.
-                 */
-                byte[] emptyKeyValueQualifier = maintainer.getEmptyKeyValueQualifier();
-                
-                // Project empty key value column
-                scan.addColumn(maintainer.getDataEmptyKeyValueCF(), emptyKeyValueQualifier);
-                ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
-                scanRanges.initializeScan(scan);
-                txTable =  connection.getQueryServices().getTable(table.getPhysicalName().getBytes());
-                // For rollback, we need to see all versions, including
-                // the last committed version as there may be multiple
-                // checkpointed versions.
-                SkipScanFilter filter = scanRanges.getSkipScanFilter();
-                if (isRollback) {
-                    filter = new SkipScanFilter(filter,true);
-                    phoenixTransactionContext.setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
-                }
-                scan.setFilter(filter);
-                currentScanner = txTable.getScanner(scan);
-            }
-            if (isRollback) {
-                processRollback(maintainer, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, replyWrite, table);
-            } else {
-                processMutation(maintainer, txRollbackAttribute, currentScanner, mutableColumns, indexUpdates, mutations, findPriorValueMutations, replyWrite, table);
-            }
-        } finally {
-            if (txTable != null) txTable.close();
-        }
-        
-        return indexUpdates;
-    }
-
-    private void processMutation(IndexMaintainer maintainer,
-            byte[] txRollbackAttribute,
-            ResultScanner scanner,
-            Set<ColumnReference> upsertColumns,
-            Collection<Mutation> indexUpdates,
-            Map<ImmutableBytesPtr, MultiMutation> mutations,
-            Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue,
-            boolean replyWrite,
-            final PTable table) throws IOException, SQLException {
-        if (scanner != null) {
-            Result result;
-            ColumnReference emptyColRef = new ColumnReference(maintainer
-                    .getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier());
-            // Process existing data table rows by removing the old index row and adding the new index row
-            while ((result = scanner.next()) != null) {
-                Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow()));
-                TxTableState state = new TxTableState(upsertColumns, phoenixTransactionContext.getWritePointer(), m, emptyColRef, result);
-                generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table);
-                generatePuts(indexUpdates, state, maintainer, replyWrite, table);
-            }
-        }
-        // Process new data table by adding new index rows
-        for (Mutation m : mutations.values()) {
-            TxTableState state = new TxTableState(upsertColumns, phoenixTransactionContext.getWritePointer(), m);
-            generatePuts(indexUpdates, state, maintainer, replyWrite, table);
-            generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table);
-        }
-    }
-
-    private void processRollback(IndexMaintainer maintainer,
-            byte[] txRollbackAttribute,
-            ResultScanner scanner,
-            Set<ColumnReference> mutableColumns,
-            Collection<Mutation> indexUpdates,
-            Map<ImmutableBytesPtr, MultiMutation> mutations,
-            boolean replyWrite,
-            final PTable table) throws IOException, SQLException {
-        if (scanner != null) {
-            Result result;
-            // Loop through last committed row state plus all new rows associated with current transaction
-            // to generate point delete markers for all index rows that were added. We don't have Tephra
-            // manage index rows in change sets because we don't want to be hit with the additional
-            // memory hit and do not need to do conflict detection on index rows.
-            ColumnReference emptyColRef = new ColumnReference(maintainer.getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier());
-            while ((result = scanner.next()) != null) {
-                Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow()));
-                // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest
-                // (as if we're "replaying" them in time order).
-                List<Cell> cells = result.listCells();
-                Collections.sort(cells, new Comparator<Cell>() {
-
-                    @Override
-                    public int compare(Cell o1, Cell o2) {
-                        int c = Longs.compare(o1.getTimestamp(), o2.getTimestamp());
-                        if (c != 0) return c;
-                        c = o1.getTypeByte() - o2.getTypeByte();
-                        if (c != 0) return c;
-                        c = Bytes.compareTo(o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength(), o1.getFamilyArray(), o1.getFamilyOffset(), o1.getFamilyLength());
-                        if (c != 0) return c;
-                        return Bytes.compareTo(o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength(), o1.getQualifierArray(), o1.getQualifierOffset(), o1.getQualifierLength());
-                    }
-
-                });
-                int i = 0;
-                int nCells = cells.size();
-                Result oldResult = null, newResult;
-                long readPtr = phoenixTransactionContext.getReadPointer();
-                do {
-                    boolean hasPuts = false;
-                    LinkedList<Cell> singleTimeCells = Lists.newLinkedList();
-                    long writePtr;
-                    Cell cell = cells.get(i);
-                    do {
-                        hasPuts |= cell.getTypeByte() == KeyValue.Type.Put.getCode();
-                        writePtr = cell.getTimestamp();
-                        ListIterator<Cell> it = singleTimeCells.listIterator();
-                        do {
-                            // Add at the beginning of the list to match the expected HBase
-                            // newest to oldest sort order (which TxTableState relies on
-                            // with the Result.getLatestColumnValue() calls). However, we
-                            // still want to add Cells in the expected order for each time
-                            // bound as otherwise we won't find it in our old state.
-                            it.add(cell);
-                        } while (++i < nCells && (cell = cells.get(i)).getTimestamp() == writePtr);
-                    } while (i < nCells && cell.getTimestamp() <= readPtr);
-
-                    // Generate point delete markers for the prior row deletion of the old index value.
-                    // The write timestamp is the next timestamp, not the current timestamp,
-                    // as the earliest cells are the current values for the row (and we don't
-                    // want to delete the current row).
-                    if (oldResult != null) {
-                        TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, oldResult);
-                        generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table);
-                    }
-                    // Generate point delete markers for the new index value.
-                    // If our time batch doesn't have Puts (i.e. we have only Deletes), then do not
-                    // generate deletes. We would have generated the delete above based on the state
-                    // of the previous row. The delete markers do not give us the state we need to
-                    // delete.
-                    if (hasPuts) {
-                        newResult = Result.create(singleTimeCells);
-                        // First row may represent the current state which we don't want to delete
-                        if (writePtr > readPtr) {
-                            TxTableState state = new TxTableState(mutableColumns, writePtr, m, emptyColRef, newResult);
-                            generateDeletes(indexUpdates, txRollbackAttribute, state, maintainer, replyWrite, table);
-                        }
-                        oldResult = newResult;
-                    } else {
-                        oldResult = null;
-                    }
-                } while (i < nCells);
-            }
-        }
-    }
-
-    private Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws IOException, SQLException {
-        if (maintainer.isRowDeleted(state.getPendingUpdate())) {
-            return Collections.emptyList();
-        }
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        ptr.set(state.getCurrentRowKey());
-        List<IndexUpdate> indexUpdates = Lists.newArrayList();
-        Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), replyWrite, false, null);
-        ValueGetter valueGetter = statePair.getFirst();
-        IndexUpdate indexUpdate = statePair.getSecond();
-        indexUpdate.setTable(maintainer.isLocalIndex() ? table.getName().getBytes() : maintainer.getIndexTableName());
-
-        byte[] regionStartKey = null;
-        byte[] regionEndkey = null;
-        if(maintainer.isLocalIndex()) {
-            HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(), state.getCurrentRowKey());
-            regionStartKey = tableRegionLocation.getRegionInfo().getStartKey();
-            regionEndkey = tableRegionLocation.getRegionInfo().getEndKey();
-        }
-
-        Put put = maintainer.buildUpdateMutation(PhoenixIndexCodec.KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), regionStartKey, regionEndkey);
-        indexUpdate.setUpdate(put);
-        indexUpdates.add(indexUpdate);
-
-        return indexUpdates;
-    }
-
-    private Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMaintainer maintainer, boolean replyWrite, final PTable table) throws IOException, SQLException {
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        ptr.set(state.getCurrentRowKey());
-        List<IndexUpdate> indexUpdates = Lists.newArrayList();
-        // For transactional tables, we use an index maintainer
-        // to aid in rollback if there's a KeyValue column in the index. The alternative would be
-        // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
-        // client side.
-        Set<ColumnReference> cols = Sets.newHashSet(maintainer.getAllColumns());
-        cols.add(new ColumnReference(maintainer.getDataEmptyKeyValueCF(), maintainer.getEmptyKeyValueQualifier()));
-        Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(cols, replyWrite, true, null);
-        ValueGetter valueGetter = statePair.getFirst();
-        if (valueGetter!=null) {
-            IndexUpdate indexUpdate = statePair.getSecond();
-            indexUpdate.setTable(maintainer.isLocalIndex() ? table.getName().getBytes() : maintainer.getIndexTableName());
-
-            byte[] regionStartKey = null;
-            byte[] regionEndkey = null;
-            if(maintainer.isLocalIndex()) {
-                HRegionLocation tableRegionLocation = connection.getQueryServices().getTableRegionLocation(table.getPhysicalName().getBytes(), state.getCurrentRowKey());
-                regionStartKey = tableRegionLocation.getRegionInfo().getStartKey();
-                regionEndkey = tableRegionLocation.getRegionInfo().getEndKey();
-            }
-
-            Delete delete = maintainer.buildDeleteMutation(PhoenixIndexCodec.KV_BUILDER, valueGetter, ptr, state.getPendingUpdate(),
-                    state.getCurrentTimestamp(), regionStartKey, regionEndkey);
-            indexUpdate.setUpdate(delete);
-            indexUpdates.add(indexUpdate);
-        }
-        return indexUpdates;
-    }
-
-    private void generateDeletes(Collection<Mutation> indexUpdates,
-            byte[] attribValue,
-            TxTableState state,
-            IndexMaintainer maintainer,
-            boolean replyWrite,
-            final PTable table) throws IOException, SQLException {
-        Iterable<IndexUpdate> deletes = getIndexDeletes(state, maintainer, replyWrite, table);
-        for (IndexUpdate delete : deletes) {
-            if (delete.isValid()) {
-                delete.getUpdate().setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, attribValue);
-                indexUpdates.add(delete.getUpdate());
-            }
-        }
-    }
-
-    private boolean generatePuts(Collection<Mutation> indexUpdates,
-            TxTableState state,
-            IndexMaintainer maintainer,
-            boolean replyWrite,
-            final PTable table) throws IOException, SQLException {
-        state.applyMutation();
-        Iterable<IndexUpdate> puts = getIndexUpserts(state, maintainer, replyWrite, table);
-        boolean validPut = false;
-        for (IndexUpdate put : puts) {
-            if (put.isValid()) {
-                indexUpdates.add(put.getUpdate());
-                validPut = true;
-            }
-        }
-        return validPut;
-    }
-
-
-    private static class TxTableState implements TableState {
-        private final Mutation mutation;
-        private final long currentTimestamp;
-        private final List<KeyValue> pendingUpdates;
-        private final Set<ColumnReference> indexedColumns;
-        private final Map<ColumnReference, ImmutableBytesWritable> valueMap;
-        
-        private TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation mutation) {
-            this.currentTimestamp = currentTimestamp;
-            this.indexedColumns = indexedColumns;
-            this.mutation = mutation;
-            int estimatedSize = indexedColumns.size();
-            this.valueMap = Maps.newHashMapWithExpectedSize(estimatedSize);
-            this.pendingUpdates = Lists.newArrayListWithExpectedSize(estimatedSize);
-            try {
-                CellScanner scanner = mutation.cellScanner();
-                while (scanner.advance()) {
-                    Cell cell = scanner.current();
-                    pendingUpdates.add(KeyValueUtil.ensureKeyValue(cell));
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e); // Impossible
-            }
-        }
-
-        public TxTableState(Set<ColumnReference> indexedColumns, long currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) {
-            this(indexedColumns, currentTimestamp, m);
-
-            for (ColumnReference ref : indexedColumns) {
-                Cell cell = r.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
-                if (cell != null) {
-                    ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                    ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                    valueMap.put(ref, ptr);
-                }
-            }
-        }
-
-        @Override
-        public RegionCoprocessorEnvironment getEnvironment() {
-            return null;
-        }
-
-        @Override
-        public long getCurrentTimestamp() {
-            return currentTimestamp;
-        }
-
-        @Override
-        public byte[] getCurrentRowKey() {
-            return mutation.getRow();
-        }
-
-        @Override
-        public List<? extends IndexedColumnGroup> getIndexColumnHints() {
-            return Collections.emptyList();
-        }
-
-        private void applyMutation() {
-            for (Cell cell : pendingUpdates) {
-                if (cell.getTypeByte() == KeyValue.Type.Delete.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteColumn.getCode()) {
-                    ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
-                    valueMap.remove(ref);
-                } else if (cell.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() || cell.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
-                    for (ColumnReference ref : indexedColumns) {
-                        if (ref.matchesFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())) {
-                            valueMap.remove(ref);
-                        }
-                    }
-                } else if (cell.getTypeByte() == KeyValue.Type.Put.getCode()){
-                    ColumnReference ref = new ColumnReference(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
-                    if (indexedColumns.contains(ref)) {
-                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-                        ptr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                        valueMap.put(ref, ptr);
-                    }
-                } else {
-                    throw new IllegalStateException("Unexpected mutation type for " + cell);
-                }
-            }
-        }
-        
-        @Override
-        public Collection<KeyValue> getPendingUpdate() {
-            return pendingUpdates;
-        }
-
-        @Override
-        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData)
-                throws IOException {
-            // TODO: creating these objects over and over again is wasteful
-            ColumnTracker tracker = new ColumnTracker(indexedColumns);
-            ValueGetter getter = new ValueGetter() {
-
-                @Override
-                public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
-                    return valueMap.get(ref);
-                }
-
-                @Override
-                public byte[] getRowKey() {
-                    return mutation.getRow();
-                }
-                
-            };
-            Pair<ValueGetter, IndexUpdate> pair = new Pair<ValueGetter, IndexUpdate>(getter, new IndexUpdate(tracker));
-            return pair;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index f77c162..7b9bc1f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -87,7 +87,6 @@ import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
 import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
 import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
-import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.trace.TracingUtils;
 import org.apache.phoenix.trace.util.NullSpan;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index a2edd45..f13e97a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -58,9 +58,7 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
             Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
             meth.setAccessible(true);
             this.codec = meth.newInstance();
-            this.codec.initialize(env);
-        } catch (IOException e) {
-            throw e;
+            this.codec.initialize(conf, env.getRegionInfo().getStartKey(), env.getRegionInfo().getEndKey(), env.getRegion().getRegionInfo().getTable().getName());
         } catch (Exception e) {
             throw new IOException(e);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
index cf6e95e..7489a8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
@@ -20,16 +20,9 @@ package org.apache.phoenix.hbase.index.builder;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 
 public abstract class BaseIndexCodec implements IndexCodec {
-
-    @Override
-    public void initialize(RegionCoprocessorEnvironment env) throws IOException {
-        // noop
-    }
-
     /**
      * {@inheritDoc}
      * <p>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5814fcbe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index e6d683e..7dde941 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -11,9 +11,9 @@ package org.apache.phoenix.hbase.index.covered;
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
 
 /**
@@ -24,16 +24,6 @@ import org.apache.phoenix.hbase.index.builder.BaseIndexCodec;
  */
 public interface IndexCodec {
     /**
-     * Do any code initialization necessary
-     * 
-     * @param env
-     *            environment in which the codec is operating
-     * @throws IOException
-     *             if the codec cannot be initalized correctly
-     */
-    public void initialize(RegionCoprocessorEnvironment env) throws IOException;
-
-    /**
      * Get the index cleanup entries. Currently, this must return just single row deletes (where just the row-key is
      * specified and no columns are returned) mapped to the table name. For instance, to you have an index 'myIndex'
      * with row :
@@ -89,4 +79,6 @@ public interface IndexCodec {
      * @throws IOException
      */
     public boolean isEnabled(Mutation m) throws IOException;
+
+    public void initialize(Configuration conf, byte[] startKey, byte[] endKey, byte[] tableName);
 }
\ No newline at end of file