You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/11/15 18:48:03 UTC

[08/37] phoenix git commit: PHOENIX-4280 Delete doesn't work when immutable indexes are in building state

PHOENIX-4280 Delete doesn't work when immutable indexes are in building state


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a49aed8e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a49aed8e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a49aed8e

Branch: refs/heads/4.x-HBase-1.1
Commit: a49aed8e755ccf35e1938754ebba982ca456ab3c
Parents: b1fa6b5
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Oct 19 17:52:29 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 15 10:41:23 2017 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/index/DropColumnIT.java     |   3 +-
 .../phoenix/end2end/index/ImmutableIndexIT.java | 105 ++++++++++++++++-
 .../end2end/index/IndexMaintenanceIT.java       |   7 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  18 ++-
 .../hbase/index/builder/BaseIndexCodec.java     |  33 +++---
 .../hbase/index/covered/IndexMetaData.java      |  13 ++-
 .../hbase/index/covered/LocalTableState.java    |  69 ++++++-----
 .../hbase/index/covered/NonTxIndexBuilder.java  | 115 +------------------
 .../hbase/index/scanner/ScannerBuilder.java     |   2 +-
 .../hbase/index/util/IndexManagementUtil.java   |   2 -
 .../apache/phoenix/index/IndexMaintainer.java   |  29 ++++-
 .../phoenix/index/PhoenixIndexMetaData.java     |  14 ++-
 .../index/PhoenixTransactionalIndexer.java      |  34 ++----
 .../index/covered/LocalTableStateTest.java      |  31 ++---
 .../index/covered/NonTxIndexBuilderTest.java    |   2 +-
 15 files changed, 255 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
index 4f6c37e..badb2a6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
@@ -294,7 +295,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT {
         if (!mutable && columnEncoded) {
             KeyValueColumnExpression colExpression = new SingleCellColumnExpression(localIndexCol, "0:V2", localIndexTable.getEncodingScheme());
             ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-            colExpression.evaluate(new ResultTuple(result), ptr);
+            assertTrue(colExpression.evaluate(new ResultTuple(result), ptr));
             colValue = ptr.copyBytesIfNecessary();
         }
         else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 4c43068..9eb5440 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -29,6 +29,7 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -40,6 +41,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -47,12 +49,15 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
@@ -146,8 +151,13 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
             String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4";
             try {
                 conn.createStatement().execute(dml);
-                fail();
+                if (!localIndex) {
+                    fail();
+                }
             } catch (SQLException e) {
+                if (localIndex) {
+                    throw e;
+                }
                 assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(),
                     e.getErrorCode());
             }
@@ -156,6 +166,99 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    @Test
+    public void testDeleteFromPartialPK() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IND_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String ddl =
+                    "CREATE TABLE " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions;
+            Statement stmt = conn.createStatement();
+            stmt.execute(ddl);
+            populateTestTable(fullTableName);
+            ddl =
+                    "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON "
+                            + fullTableName + " (char_pk, varchar_pk)";
+            stmt.execute(ddl);
+
+            ResultSet rs;
+
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX*/ COUNT(*) FROM " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+
+            String dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar1'";
+            conn.createStatement().execute(dml);
+            assertIndexMutations(conn);
+            conn.commit();
+            
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX*/ COUNT(*) FROM " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+        }
+    }
+
+    @Test
+    public void testDeleteFromNonPK() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IND_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String ddl =
+                    "CREATE TABLE " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions;
+            Statement stmt = conn.createStatement();
+            stmt.execute(ddl);
+            populateTestTable(fullTableName);
+            ddl =
+                    "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON "
+                            + fullTableName + " (varchar_col1, varchar_pk)";
+            stmt.execute(ddl);
+
+            ResultSet rs;
+
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX*/ COUNT(*) FROM " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+
+            String dml = "DELETE from " + fullTableName + " WHERE varchar_col1='varchar_a' AND varchar_pk='varchar1'";
+            conn.createStatement().execute(dml);
+            assertIndexMutations(conn);
+            conn.commit();
+            
+            TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)));
+            
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX*/ COUNT(*) FROM " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+        }
+    }
+
+    private void assertIndexMutations(Connection conn) throws SQLException {
+        Iterator<Pair<byte[], List<KeyValue>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn);
+        assertTrue(iterator.hasNext());
+        iterator.next();
+        assertEquals(!localIndex, iterator.hasNext());
+    }
+
     // This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back.
     @Ignore
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
index 7d02e80..d5895ae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMaintenanceIT.java
@@ -344,13 +344,14 @@ public class IndexMaintenanceIT extends ParallelStatsDisabledIT {
             String dml = "DELETE from " + fullDataTableName + " WHERE long_col2 = 2";
             try {
                 conn.createStatement().execute(dml);
-                if (!mutable) {
+                if (!mutable && !localIndex) {
                     fail();
                 }
             } catch (SQLException e) {
-                if (!mutable) {
-                    assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode());
+                if (mutable || localIndex) {
+                    throw e;
                 }
+                assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode());
             }
 
             if (!mutable) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index be07cf4..eb252d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.ServerCacheClient;
@@ -47,6 +48,7 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -62,7 +64,6 @@ import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
@@ -75,13 +76,13 @@ import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -167,6 +168,11 @@ public class DeleteCompiler {
                 for (int i = 0; i < indexTableRefs.size(); i++) {
                     ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
                     rs.getCurrentRow().getKey(indexPtr);
+                    // Translate the data table row to the index table row
+                    if (sourceTableRef.getTable().getType() != PTableType.INDEX) {
+                        IndexMaintainer maintainer = indexTableRefs.get(i).getTable().getIndexMaintainer(table, connection);
+                        indexPtr.set(maintainer.buildRowKey(null, indexPtr, null, null, HConstants.LATEST_TIMESTAMP));
+                    }
                     indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
                 }
                 if (mutations.size() > maxSize) {
@@ -242,12 +248,12 @@ public class DeleteCompiler {
         
     }
     
-    private Map<PTableKey, PTable> getNonDisabledImmutableIndexes(TableRef tableRef) {
+    private Map<PTableKey, PTable> getNonDisabledGlobalImmutableIndexes(TableRef tableRef) {
         PTable table = tableRef.getTable();
         if (table.isImmutableRows() && !table.getIndexes().isEmpty()) {
             Map<PTableKey, PTable> nonDisabledIndexes = new HashMap<PTableKey, PTable>(table.getIndexes().size());
             for (PTable index : table.getIndexes()) {
-                if (index.getIndexState() != PIndexState.DISABLE) {
+                if (index.getIndexState() != PIndexState.DISABLE && index.getIndexType() == IndexType.GLOBAL) {
                     nonDisabledIndexes.put(index.getKey(), index);
                 }
             }
@@ -401,7 +407,7 @@ public class DeleteCompiler {
                    .setTableName(tableName).build().buildException();
                 }
                 
-                immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe);
+                immutableIndex = getNonDisabledGlobalImmutableIndexes(tableRefToBe);
                 boolean mayHaveImmutableIndexes = !immutableIndex.isEmpty();
                 noQueryReqd = !hasLimit;
                 // Can't run on same server for transactional data, as we need the row keys for the data
@@ -444,7 +450,7 @@ public class DeleteCompiler {
                     // of immutable indexes.
                     table = connection.getTable(new PTableKey(table.getTenantId(), table.getName().getString()));
                     tableRefToBe.setTable(table);
-                    immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe);
+                    immutableIndex = getNonDisabledGlobalImmutableIndexes(tableRefToBe);
                 }
             } catch (MetaDataEntityNotFoundException e) {
                 // Catch column/column family not found exception, as our meta data may

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
index 1ce4e2e..cf6e95e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
@@ -23,25 +23,22 @@ import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
 
-/**
- *
- */
 public abstract class BaseIndexCodec implements IndexCodec {
 
-  @Override
-  public void initialize(RegionCoprocessorEnvironment env) throws IOException {
-    // noop
-  }
+    @Override
+    public void initialize(RegionCoprocessorEnvironment env) throws IOException {
+        // noop
+    }
 
-  /**
-   * {@inheritDoc}
-   * <p>
-   * By default, the codec is always enabled. Subclasses should override this method if they want do
-   * decide to index on a per-mutation basis.
- * @throws IOException 
-   */
-  @Override
-  public boolean isEnabled(Mutation m) throws IOException {
-    return true;
-  }
+    /**
+     * {@inheritDoc}
+     * <p>
+     * By default, the codec is always enabled. Subclasses should override this method if they want do
+     * decide to index on a per-mutation basis.
+     * @throws IOException 
+     */
+    @Override
+    public boolean isEnabled(Mutation m) throws IOException {
+        return true;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
index 5314631..20ed855 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -17,14 +17,15 @@
  */
 package org.apache.phoenix.hbase.index.covered;
 
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 
 public interface IndexMetaData {
     public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {
 
         @Override
-        public boolean isImmutableRows() {
-            return false;
+        public boolean requiresPriorRowState(Mutation m) {
+            return true;
         }
 
         @Override
@@ -32,7 +33,13 @@ public interface IndexMetaData {
           return null;
         }};
 
-    public boolean isImmutableRows();
+        
+    /**
+     * Determines whether or not we need to look up the old row to retrieve old row values for maintaining the index.
+     * @param m mutation being performed on the data table
+     * @return true if prior row state is required and false otherwise
+     */
+    public boolean requiresPriorRowState(Mutation m);
 
     public ReplayWrite getReplayWrite();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 0f5a9f9..f7784e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -164,8 +164,27 @@ public class LocalTableState implements TableState {
      * @throws IOException
      */
     public Pair<CoveredDeleteScanner, IndexUpdate> getIndexedColumnsTableState(
-        Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData) throws IOException {
-        ensureLocalStateInitialized(indexedColumns, ignoreNewerMutations, indexMetaData);
+        Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean isStateForDeletes, IndexMetaData indexMetaData) throws IOException {
+        // check to see if we haven't initialized any columns yet
+        Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(indexedColumns);
+        
+        // add the covered columns to the set
+        for (ColumnReference ref : toCover) {
+            this.columnSet.addColumn(ref);
+        }
+        boolean requiresPriorRowState = indexMetaData.requiresPriorRowState(update);
+        if (!toCover.isEmpty()) {
+            // no need to perform scan to find prior row values when the indexed columns are immutable, as
+            // by definition, there won't be any. If we have indexed non row key columns, then we need to
+            // look up the row so that we can formulate the delete of the index row correctly. We'll always
+            // have our "empty" key value column, so we check if we have more than that as a basis for
+            // needing to lookup the prior row values.
+            if (requiresPriorRowState) {
+                // add the current state of the row. Uses listCells() to avoid a new array creation.
+                this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
+            }
+        }
+
         // filter out things with a newer timestamp and track the column references to which it applies
         ColumnTracker tracker = new ColumnTracker(indexedColumns);
         synchronized (this.trackedColumns) {
@@ -175,35 +194,27 @@ public class LocalTableState implements TableState {
             }
         }
 
-        CoveredDeleteScanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts, returnNullScannerIfRowNotFound);
-
+        CoveredDeleteScanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts,
+                // If we're determining the index state for deletes and either
+                // a) we've looked up the prior row state or
+                // b) we're inserting immutable data
+                // then allow a null scanner to be returned.
+                // FIXME: this is crappy code - we need to simplify the global mutable secondary index implementation
+                // TODO: use mutable transactional secondary index implementation instead (PhoenixTransactionalIndexer)
+                isStateForDeletes && (requiresPriorRowState || insertingData(update)));
         return new Pair<CoveredDeleteScanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
     }
 
-    /**
-     * Initialize the managed local state. Generally, this will only be called by
-     * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even
-     * then, there is still fairly low contention as each new Put/Delete will have its own table state.
-     * @param indexMetaData TODO
-     */
-    private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations, IndexMetaData indexMetaData)
-            throws IOException {
-        // check to see if we haven't initialized any columns yet
-        Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
-        // we have all the columns loaded, so we are good to go.
-        if (toCover.isEmpty()) { return; }
-
-        // no need to perform scan to find prior row values when the indexed columns are immutable, as
-        // by definition, there won't be any.
-        if (!indexMetaData.isImmutableRows()) {
-            // add the current state of the row. Uses listCells() to avoid a new array creation.
-            this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
-        }
-
-        // add the covered columns to the set
-        for (ColumnReference ref : toCover) {
-            this.columnSet.addColumn(ref);
+ 
+    private static boolean insertingData(Mutation m) {
+        for (Collection<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if (KeyValue.Type.codeToType(cell.getTypeByte()) != KeyValue.Type.Put) {
+                    return false;
+                }
+            }
         }
+        return true;
     }
 
     @Override
@@ -264,9 +275,9 @@ public class LocalTableState implements TableState {
     }
 
     @Override
-    public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean returnNullScannerIfRowNotFound, IndexMetaData indexMetaData)
+    public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations, boolean isStateForDeletes, IndexMetaData indexMetaData)
             throws IOException {
-        Pair<CoveredDeleteScanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations, returnNullScannerIfRowNotFound, indexMetaData);
+        Pair<CoveredDeleteScanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations, isStateForDeletes, indexMetaData);
         ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
         return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
index 50e2c3f..8dd57c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilder.java
@@ -10,10 +10,8 @@
 package org.apache.phoenix.hbase.index.covered;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,9 +27,6 @@ import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-
-import com.google.common.collect.Lists;
 
 /**
  * Build covered indexes for phoenix updates.
@@ -99,17 +94,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
             }
         }
 
-        // go through each batch of keyvalues and build separate index entries for each
-        boolean cleanupCurrentState = !indexMetaData.isImmutableRows();
-        /*
-         * We have to split the work between the cleanup and the update for each group because when we update the
-         * current state of the row for the current batch (appending the mutations for the current batch) the next
-         * group will see that as the current state, which will can cause the a delete and a put to be created for
-         * the next group.
-         */
-        if (addMutationsForBatch(manager, batch, state, cleanupCurrentState, indexMetaData)) {
-            cleanupCurrentState = false;
-        }
+        addMutationsForBatch(manager, batch, state, indexMetaData);
     }
 
     /**
@@ -138,17 +123,13 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
      *            timestamp-based batch of edits
      * @param state
      *            local state to update and pass to the codec
-     * @param requireCurrentStateCleanup
-     *            <tt>true</tt> if we should should attempt to cleanup the current state of the table, in the event of a
-     *            'back in time' batch. <tt>false</tt> indicates we should not attempt the cleanup, e.g. an earlier
-     *            batch already did the cleanup.
      * @param indexMetaData TODO
      * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put), <tt>false</tt>
      *         otherwise
      * @throws IOException
      */
     private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch, LocalTableState state,
-            boolean requireCurrentStateCleanup, IndexMetaData indexMetaData) throws IOException {
+            IndexMetaData indexMetaData) throws IOException {
 
         // need a temporary manager for the current batch. It should resolve any conflicts for the
         // current batch. Essentially, we can get the case where a batch doesn't change the current
@@ -160,9 +141,7 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
         // determine if we need to make any cleanup given the pending update.
         long batchTs = batch.getTimestamp();
         state.setPendingUpdates(batch.getKvs());
-        if (!indexMetaData.isImmutableRows()) {
-            addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
-        }
+        addCleanupForCurrentBatch(updateMap, batchTs, state, indexMetaData);
 
         // A.2 do a single pass first for the updates to the current state
         state.applyPendingUpdates();
@@ -170,36 +149,6 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
         // FIXME: PHOENIX-4057 do not attempt to issue index updates
         // for out-of-order mutations since it corrupts the index.
         return false;
-        
-//        long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap, indexMetaData);
-//        // if all the updates are the latest thing in the index, we are done - don't go and fix history
-//        if (ColumnTracker.isNewestTime(minTs)) { return false; }
-//
-//        // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
-//        // index. after this, we have the correct view of the index, from the batch up to the index
-//        while (!ColumnTracker.isNewestTime(minTs)) {
-//            minTs = addUpdateForGivenTimestamp(minTs, state, updateMap, indexMetaData);
-//        }
-//
-//        // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
-//        if (requireCurrentStateCleanup) {
-//            // roll back the pending update. This is needed so we can remove all the 'old' index entries.
-//            // We don't need to do the puts here, but just the deletes at the given timestamps since we
-//            // just want to completely hide the incorrect entries.
-//            state.rollback(batch.getKvs());
-//            // setup state
-//            state.setPendingUpdates(batch.getKvs());
-//
-//            // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
-//            // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
-//            // because the update may have a different set of columns or value based on the update).
-//            cleanupIndexStateFromBatchOnward(updateMap, batchTs, state, indexMetaData);
-//
-//            // have to roll the state forward again, so the current state is correct
-//            state.applyPendingUpdates();
-//            return true;
-//        }
-//        return false;
     }
 
     private long addUpdateForGivenTimestamp(long ts, LocalTableState state, IndexUpdateManager updateMap, IndexMetaData indexMetaData)
@@ -249,7 +198,6 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
          */
         // timestamp of the next update we need to track
         long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
-        List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
         for (IndexUpdate update : upserts) {
             // this is the one bit where we check the timestamps
             final ColumnTracker tracker = update.getIndexedColumns();
@@ -265,71 +213,17 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
                 continue;
             }
             
-            // track index hints for the next round. Hint if we need an update for that column for the
-            // next timestamp. These columns clearly won't need to update as we go through time as they
-            // already match the most recent possible thing.
-            boolean needsCleanup = false;
-            if (tracker.hasNewerTimestamps()) {
-                columnHints.add(tracker);
-                // this update also needs to be cleaned up at the next timestamp because it not the latest.
-                needsCleanup = true;
-            }
-
             // only make the put if the index update has been setup
             if (update.isValid()) {
                 byte[] table = update.getTableName();
                 Mutation mutation = update.getUpdate();
                 updateMap.addIndexUpdate(table, mutation);
-
-                // only make the cleanup if we made a put and need cleanup
-                if (needsCleanup) {
-                    // there is a TS for the interested columns that is greater than the columns in the
-                    // put. Therefore, we need to issue a delete at the same timestamp
-                    Delete d = new Delete(mutation.getRow());
-                    d.setTimestamp(tracker.getTS());
-                    updateMap.addIndexUpdate(table, d);
-                }
             }
         }
         return minTs;
     }
 
     /**
-     * Cleanup the index based on the current state from the given batch. Iterates over each timestamp (for the indexed
-     * rows) for the current state of the table and cleans up all the existing entries generated by the codec.
-     * <p>
-     * Adds all pending updates to the updateMap
-     * 
-     * @param updateMap
-     *            updated with the pending index updates from the codec
-     * @param batchTs
-     *            timestamp from which we should cleanup
-     * @param state
-     *            current state of the primary table. Should already by setup to the correct state from which we want to
-     *            cleanup.
-     * @param indexMetaData TODO
-     * @throws IOException
-     */
-    private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap, long batchTs, LocalTableState state, IndexMetaData indexMetaData)
-            throws IOException {
-        // get the cleanup for the current state
-        state.setCurrentTimestamp(batchTs);
-        addDeleteUpdatesToMap(updateMap, state, batchTs, indexMetaData);
-        Set<ColumnTracker> trackers = state.getTrackedColumns();
-        long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
-        for (ColumnTracker tracker : trackers) {
-            if (tracker.getTS() < minTs) {
-                minTs = tracker.getTS();
-            }
-        }
-        state.resetTrackedColumns();
-        if (!ColumnTracker.isNewestTime(minTs)) {
-            state.setHints(Lists.newArrayList(trackers));
-            cleanupIndexStateFromBatchOnward(updateMap, minTs, state, indexMetaData);
-        }
-    }
-
-    /**
      * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState, IndexMetaData)} and then add them to the
      * update map.
      * <p>
@@ -340,9 +234,6 @@ public class NonTxIndexBuilder extends BaseIndexBuilder {
      */
     protected void addDeleteUpdatesToMap(IndexUpdateManager updateMap, LocalTableState state, long ts, IndexMetaData indexMetaData)
             throws IOException {
-        if (indexMetaData.isImmutableRows()) {
-            return;
-        }
         Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state, indexMetaData);
         if (cleanup != null) {
             for (IndexUpdate d : cleanup) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index 5547958..ad09c0c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -71,7 +71,7 @@ public class ScannerBuilder {
     // filter out kvs based on deletes
     ApplyAndFilterDeletesFilter deleteFilter = new ApplyAndFilterDeletesFilter(getAllFamilies(indexedColumns));
     filters.addFilter(deleteFilter);
-
+    
     // combine the family filters and the rest of the filters as a
     return getFilteredScanner(filters, returnNullIfRowNotFound, deleteFilter.getDeleteTracker());
   }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 697caef..a4a34a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -25,7 +25,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,7 +33,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 83b1d58..b4566a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.ByteStringer;
@@ -1048,10 +1049,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
     }
 
     private enum DeleteType {SINGLE_VERSION, ALL_VERSIONS};
-    private DeleteType getDeleteTypeOrNull(Collection<KeyValue> pendingUpdates) {
+    private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> pendingUpdates) {
+        return getDeleteTypeOrNull(pendingUpdates, this.nDataCFs);
+    }
+   
+    private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> pendingUpdates, int nCFs) {
         int nDeleteCF = 0;
         int nDeleteVersionCF = 0;
-        for (KeyValue kv : pendingUpdates) {
+        for (Cell kv : pendingUpdates) {
         	if (kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode()) {
                 nDeleteVersionCF++;
             }
@@ -1064,22 +1069,34 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         // This is what a delete looks like on the server side for mutable indexing...
         // Should all be one or the other for DeleteFamily versus DeleteFamilyVersion, but just in case not
         DeleteType deleteType = null; 
-        if (nDeleteVersionCF > 0 && nDeleteVersionCF >= this.nDataCFs) {
+        if (nDeleteVersionCF > 0 && nDeleteVersionCF >= nCFs) {
         	deleteType = DeleteType.SINGLE_VERSION;
         } else {
 			int nDelete = nDeleteCF + nDeleteVersionCF;
-			if (nDelete>0 && nDelete >= this.nDataCFs) {
+			if (nDelete>0 && nDelete >= nCFs) {
 				deleteType = DeleteType.ALL_VERSIONS;
 			}
 		}
         return deleteType;
     }
     
-    public boolean isRowDeleted(Collection<KeyValue> pendingUpdates) {
+    public boolean isRowDeleted(Collection<? extends Cell> pendingUpdates) {
         return getDeleteTypeOrNull(pendingUpdates) != null;
     }
     
-    private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<KeyValue> pendingUpdates, long ts) throws IOException {
+    public boolean isRowDeleted(Mutation m) {
+        if (m.getFamilyCellMap().size() < this.nDataCFs) {
+            return false;
+        }
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            if (getDeleteTypeOrNull(cells, 1) == null) { // Checking CFs one by one
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean hasIndexedColumnChanged(ValueGetter oldState, Collection<? extends Cell> pendingUpdates, long ts) throws IOException {
         if (pendingUpdates.isEmpty()) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 7908103..05371a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.IndexMetaDataCache;
@@ -43,6 +44,7 @@ public class PhoenixIndexMetaData implements IndexMetaData {
     private final IndexMetaDataCache indexMetaDataCache;
     private final ReplayWrite replayWrite;
     private final boolean isImmutable;
+    private final boolean hasNonPkColumns;
     
     private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
         if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
@@ -102,10 +104,13 @@ public class PhoenixIndexMetaData implements IndexMetaData {
     public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
         this.indexMetaDataCache = getIndexMetaData(env, attributes);
         boolean isImmutable = true;
+        boolean hasNonPkColumns = false;
         for (IndexMaintainer maintainer : indexMetaDataCache.getIndexMaintainers()) {
             isImmutable &= maintainer.isImmutableRows();
+            hasNonPkColumns |= !maintainer.getIndexedColumns().isEmpty();
         }
         this.isImmutable = isImmutable;
+        this.hasNonPkColumns = hasNonPkColumns;
         this.attributes = attributes;
         this.replayWrite = getReplayWrite(attributes);
     }
@@ -122,12 +127,17 @@ public class PhoenixIndexMetaData implements IndexMetaData {
         return attributes;
     }
     
+    @Override
     public ReplayWrite getReplayWrite() {
         return replayWrite;
     }
-
-    @Override
+    
     public boolean isImmutableRows() {
         return isImmutable;
     }
+
+    @Override
+    public boolean requiresPriorRowState(Mutation m) {
+        return !isImmutable || (indexMetaDataCache.getIndexMaintainers().get(0).isRowDeleted(m) && hasNonPkColumns);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index bc53b6b..3495267 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -89,7 +89,6 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -284,16 +283,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         } else {
             findPriorValueMutations = mutations;
         }
-        while(mutationIterator.hasNext()) {
-            Mutation m = mutationIterator.next();
-            // add the mutation to the batch set
-            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-            if (mutations != findPriorValueMutations && isDeleteMutation(m)) {
-                addMutation(findPriorValueMutations, row, m);
-            }
-            addMutation(mutations, row, m);
-        }
-        
         // Collect the set of mutable ColumnReferences so that we can first
         // run a scan to get the current state. We'll need this to delete
         // the existing index rows.
@@ -309,6 +298,17 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             mutableColumns.addAll(allColumns);
         }
 
+        while(mutationIterator.hasNext()) {
+            Mutation m = mutationIterator.next();
+            // add the mutation to the batch set
+            ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+            // if we have no non PK columns, no need to find the prior values
+            if (mutations != findPriorValueMutations && indexMetaData.requiresPriorRowState(m)) {
+                addMutation(findPriorValueMutations, row, m);
+            }
+            addMutation(mutations, row, m);
+        }
+        
         Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * indexMaintainers.size());
         try {
             // Track if we have row keys with Delete mutations (or Puts that are
@@ -363,17 +363,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         return indexUpdates;
     }
 
-    private static boolean isDeleteMutation(Mutation m) {
-        for (Map.Entry<byte[],List<Cell>> cellMap : m.getFamilyCellMap().entrySet()) {
-            for (Cell cell : cellMap.getValue()) {
-                if (cell.getTypeByte() != KeyValue.Type.Put.getCode() || TransactionUtil.isDelete(cell)) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
-
     private void processMutation(RegionCoprocessorEnvironment env,
             PhoenixIndexMetaData indexMetaData, byte[] txRollbackAttribute,
             ResultScanner scanner,
@@ -398,6 +387,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         for (Mutation m : mutations.values()) {
             TxTableState state = new TxTableState(env, upsertColumns, indexMetaData.getAttributes(), txnContext.getWritePointer(), m);
             generatePuts(indexMetaData, indexUpdates, state);
+            generateDeletes(indexMetaData, indexUpdates, txRollbackAttribute, state);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index 0efb63a..052930d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -57,13 +58,13 @@ public class LocalTableStateTest {
   private static final IndexMetaData indexMetaData = new IndexMetaData() {
 
     @Override
-    public boolean isImmutableRows() {
-        return false;
+    public ReplayWrite getReplayWrite() {
+        return null;
     }
 
     @Override
-    public ReplayWrite getReplayWrite() {
-        return null;
+    public boolean requiresPriorRowState(Mutation m) {
+        return true;
     }
       
   };
@@ -120,14 +121,14 @@ public class LocalTableStateTest {
       IndexMetaData indexMetaData = new IndexMetaData() {
 
           @Override
-          public boolean isImmutableRows() {
-              return false;
-          }
-
-          @Override
           public ReplayWrite getReplayWrite() {
               return null;
           }
+
+        @Override
+        public boolean requiresPriorRowState(Mutation m) {
+            return true;
+        }
             
         };
     Put m = new Put(row);
@@ -157,16 +158,16 @@ public class LocalTableStateTest {
       IndexMetaData indexMetaData = new IndexMetaData() {
 
           @Override
-          public boolean isImmutableRows() {
-              return true;
-          }
-
-          @Override
           public ReplayWrite getReplayWrite() {
               return null;
           }
+
+        @Override
+        public boolean requiresPriorRowState(Mutation m) {
+            return false;
+        }
             
-        };
+    };
     Put m = new Put(row);
     m.add(fam, qual, ts, val);
     // setup mocks

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a49aed8e/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index d06967d..d94cce0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -145,7 +145,7 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest {
         Mockito.when(mockRegionInfo.getEndKey()).thenReturn(Bytes.toBytes("z"));
 
         mockIndexMetaData = Mockito.mock(PhoenixIndexMetaData.class);
-        Mockito.when(mockIndexMetaData.isImmutableRows()).thenReturn(false);
+        Mockito.when(mockIndexMetaData.requiresPriorRowState((Mutation)Mockito.any())).thenReturn(true);
         Mockito.when(mockIndexMetaData.getIndexMaintainers())
                 .thenReturn(Collections.singletonList(getTestIndexMaintainer()));