You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/07/01 02:12:36 UTC

[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactional Tables

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
     new 74e24a0  PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactional Tables
74e24a0 is described below

commit 74e24a0a0dd0e1923c85d3b5d9c7e70ba61badb4
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Tue Jun 11 16:13:25 2019 -0700

    PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactional Tables
---
 .../end2end/IndexToolForPartialBuildIT.java        |  90 ++--
 .../org/apache/phoenix/end2end/IndexToolIT.java    |   4 +-
 .../phoenix/end2end/index/ImmutableIndexIT.java    | 194 +++++++-
 .../org/apache/phoenix/execute/MutationState.java  | 498 +++++++++++++--------
 .../phoenix/hbase/index/IndexRegionObserver.java   |   2 +-
 .../org/apache/phoenix/index/IndexMaintainer.java  |   3 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |   3 +-
 .../apache/phoenix/query/QueryServicesOptions.java |   1 +
 .../java/org/apache/phoenix/util/IndexUtil.java    |  62 ++-
 .../java/org/apache/phoenix/util/TestUtil.java     |  15 +-
 10 files changed, 585 insertions(+), 287 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 19ffe1a..a0956f2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -17,13 +17,10 @@
  */
 package org.apache.phoenix.end2end;
 
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -34,21 +31,18 @@ import java.sql.Statement;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
@@ -60,6 +54,7 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -83,7 +78,6 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
     public static Map<String, String> getServerProperties() {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
         serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
         serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
         serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
@@ -105,14 +99,14 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
         String schemaName = generateUniqueName();
         String dataTableName = generateUniqueName();
         String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
-        final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
+        final String indxTable = String.format("%s_IDX", dataTableName);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
         props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
         final Connection conn = DriverManager.getConnection(getUrl(), props);
         Statement stmt = conn.createStatement();
-        try {
+        try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();){
             if (isNamespaceEnabled) {
                 conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
             }
@@ -121,25 +115,36 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
                             fullTableName, tableDDLOptions));
             String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
             PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-            FailingRegionObserver.FAIL_WRITE = false;
             // insert two rows
             upsertRow(stmt1, 1000);
             upsertRow(stmt1, 2000);
 
             conn.commit();
             stmt.execute(String.format("CREATE INDEX %s ON %s  (LPAD(UPPER(NAME),11,'x')||'_xyz') ", indxTable, fullTableName));
-            FailingRegionObserver.FAIL_WRITE = true;
             upsertRow(stmt1, 3000);
             upsertRow(stmt1, 4000);
             upsertRow(stmt1, 5000);
-            try {
-                conn.commit();
-                fail();
-            } catch (SQLException e) {} catch (Exception e) {}
+            conn.commit();
+
+            // delete these indexes
+            PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
+            try (Table hTable = admin.getConnection().
+                    getTable(TableName.valueOf(pindexTable.getPhysicalName().toString()));) {
+                Scan scan = new Scan();
+                ResultScanner scanner = hTable.getScanner(scan);
+                int cnt = 0;
+                for (Result res = scanner.next(); res != null; res = scanner.next()) {
+                    cnt++;
+                    if (cnt > 2) {
+                        hTable.delete(new Delete(res.getRow()));
+                    }
+                }
+            }
+            TestUtil.doMajorCompaction(conn, pindexTable.getPhysicalName().toString());
+
             conn.createStatement()
                     .execute(String.format("ALTER INDEX %s on %s REBUILD ASYNC", indxTable, fullTableName));
             
-            FailingRegionObserver.FAIL_WRITE = false;
             ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
                     new String[] { PTableType.INDEX.toString() });
             assertTrue(rs.next());
@@ -150,21 +155,6 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
             upsertRow(stmt1, 6000);
             upsertRow(stmt1, 7000);
             conn.commit();
-            
-			rs = conn.createStatement()
-					.executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + ","
-							+ PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
-							+"\""+ SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " ("
-							+ PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
-							+ PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
-							+ PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
-			rs.next();
-            PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
-            assertEquals(PIndexState.BUILDING, pindexTable.getIndexState());
-            assertEquals(rs.getLong(1), pindexTable.getTimeStamp());
-
-            //assert disabled timestamp
-            assertEquals(0, rs.getLong(2));
 
             String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName);
             rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
@@ -173,7 +163,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
             // assert we are pulling from data table.
 			assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, isNamespaceEnabled);
 
-            rs = stmt1.executeQuery(selectSql);
+			rs = stmt1.executeQuery(selectSql);
             for (int i = 1; i <= 7; i++) {
                 assertTrue(rs.next());
                 assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
@@ -260,24 +250,4 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
         stmt.executeUpdate();
     }
     
-
-    public static class FailingRegionObserver extends SimpleRegionObserver {
-        public static volatile boolean FAIL_WRITE = false;
-        public static final String INDEX_NAME = "IDX";
-        @Override
-        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-            if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
-                throw new DoNotRetryIOException();
-            }
-            Mutation operation = miniBatchOp.getOperation(0);
-            Set<byte[]> keySet = operation.getFamilyMap().keySet();
-            for(byte[] family: keySet) {
-                if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
-                    throw new DoNotRetryIOException();
-                }
-            }
-        }
-
-    }
-    
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 4efb15b..7819374 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -144,7 +144,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
                             }
                         }
                         else {
-                            // Due to PHOENIX-5376, the bulk load option is ignored for global indexes
+                            // Due to PHOENIX-5375 and PHOENIX-5376, the snapshot and bulk load options are ignored for global indexes
                             list.add(new Object[]{transactionProvider, mutable, localIndex,
                                     true, false, false});
                         }
@@ -154,8 +154,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         }
         // Add the usetenantId
         list.add(new Object[] { null, false, false, true, false, true});
-        // do one run over snapshots
-        list.add(new Object[] { null, false, false, true, true, false});
         return TestUtil.filterTxParamData(list,0);
     }
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index d676051..1877e9e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -17,15 +17,22 @@
  */
 package org.apache.phoenix.end2end.index;
 
+import static org.apache.phoenix.end2end.IndexToolIT.assertExplainPlan;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
+import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.getRowCount;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
@@ -40,26 +47,44 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider;
 import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
 import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -107,18 +132,21 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
         serverProps.put("hbase.coprocessor.region.classes", CreateIndexRegionObserver.class.getName());
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5);
         clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
         clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000");
+        clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, "true");
+        clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
+        clientProps.put(HConstants.HBASE_CLIENT_PAUSE, "1");
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
     @Parameters(name="ImmutableIndexIT_localIndex={0},transactional={1},transactionProvider={2},columnEncoded={3}") // name is used by failsafe as file name in reports
     public static Collection<Object[]> data() {
 		return TestUtil.filterTxParamData(
-		        Arrays.asList(new Object[][] { 
+		        Arrays.asList(new Object[][] {
     				{ false, false, null, false }, { false, false, null, true },
-    				{ false, true, "OMID", false }, 
+    				{ false, true, "OMID", false },
                     { false, true, "TEPHRA", false }, { false, true, "TEPHRA", true },
     				{ true, false, null, false }, { true, false, null, true },
                     { true, true, "TEPHRA", false }, { true, true, "TEPHRA", true },
@@ -262,7 +290,165 @@ public class ImmutableIndexIT extends BaseUniqueNamesOwnClusterIT {
                 (transactionProvider != null && 
                  transactionProvider.isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)), iterator.hasNext());
     }
-    
+
+    private void createAndPopulateTableAndIndexForConsistentIndex(Connection conn, String tableName, String indexName,
+            int numOfRowsToInsert, String storageProps)
+            throws Exception {
+        String tableOptions = tableDDLOptions;
+        if (storageProps != null) {
+            tableOptions += " ,IMMUTABLE_STORAGE_SCHEME=" + storageProps;
+        }
+        String ddl = "CREATE TABLE " + tableName + TestUtil.TEST_TABLE_SCHEMA + tableOptions;
+        INDEX_DDL =
+                "CREATE " + " INDEX IF NOT EXISTS " + SchemaUtil.getTableNameFromFullName(indexName)
+                        + " ON " + tableName + " (long_pk, varchar_pk)"
+                        + " INCLUDE (long_col1, long_col2) ";
+
+        conn.createStatement().execute(ddl);
+        conn.createStatement().execute(INDEX_DDL);
+        upsertRows(conn, tableName, numOfRowsToInsert);
+        conn.commit();
+
+        TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE);
+    }
+
+    @Test
+    public void testGlobalImmutableIndexCreate() throws Exception {
+        if (localIndex || transactionProvider != null) {
+            return;
+        }
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+        ArrayList<String> immutableStorageProps = new ArrayList<String>();
+        immutableStorageProps.add(null);
+        if (!tableDDLOptions.contains(IMMUTABLE_STORAGE_SCHEME)) {
+           immutableStorageProps.add(SINGLE_CELL_ARRAY_WITH_OFFSETS.toString());
+        }
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(true);
+            for (String storageProp : immutableStorageProps) {
+                String tableName = "TBL_" + generateUniqueName();
+                String indexName = "IND_" + generateUniqueName();
+                String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+                String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+                TABLE_NAME = fullTableName;
+                int numRows = 1;
+                createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName,
+                        numRows, storageProp);
+
+                ResultSet rs;
+                rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
+                assertTrue(rs.next());
+                assertEquals(numRows, rs.getInt(1));
+                rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+                assertTrue(rs.next());
+                assertEquals(numRows, rs.getInt(1));
+                assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName,
+                        IndexRegionObserver.VERIFIED_BYTES));
+
+                // Now try to fail Phase1 and observe that index state is not DISABLED
+                try (Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
+                    admin.disableTable(TableName.valueOf(fullIndexName));
+                    boolean isWriteOnDisabledIndexFailed = false;
+                    try {
+                        upsertRows(conn, fullTableName, numRows);
+                    } catch (SQLException ex) {
+                        isWriteOnDisabledIndexFailed = true;
+                    }
+                    assertEquals(true, isWriteOnDisabledIndexFailed);
+                    PIndexState indexState = TestUtil.getIndexState(conn, fullIndexName);
+                    assertEquals(PIndexState.ACTIVE, indexState);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testGlobalImmutableIndexDelete() throws Exception {
+        if (localIndex || transactionProvider != null) {
+            return;
+        }
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IND_" + generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+        TABLE_NAME = fullTableName;
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+                Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();) {
+            conn.setAutoCommit(true);
+            int numRows = 2;
+            createAndPopulateTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName, numRows, null);
+
+            String dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar1'";
+            conn.createStatement().execute(dml);
+            conn.commit();
+            ResultSet rs;
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(numRows - 1, rs.getInt(1));
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
+            assertTrue(rs.next());
+            assertEquals(numRows - 1, rs.getInt(1));
+            assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.VERIFIED_BYTES));
+
+            // Force delete to fail (data removed but operation failed) on data table and check index table row remains as unverified
+            TestUtil.addCoprocessor(conn, fullTableName, DeleteFailingRegionObserver.class);
+            dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar2'";
+            boolean isDeleteFailed = false;
+            try {
+                conn.createStatement().execute(dml);
+            } catch (Exception ex) {
+                isDeleteFailed = true;
+            }
+            assertEquals(true, isDeleteFailed);
+            TestUtil.removeCoprocessor(conn, fullTableName, DeleteFailingRegionObserver.class);
+            assertEquals(numRows - 1, getRowCount(conn.unwrap(PhoenixConnection.class).getQueryServices()
+                    .getTable(Bytes.toBytes(fullIndexName)), false));
+            assertEquals(true, verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.UNVERIFIED_BYTES));
+
+            // Now delete via hbase, read from unverified index and see that we don't get any data
+            admin.disableTable(TableName.valueOf(fullTableName));
+            admin.truncateTable(TableName.valueOf(fullTableName), true);
+            String selectFromIndex = "SELECT long_pk, varchar_pk, long_col1 FROM " + TABLE_NAME + " WHERE varchar_pk='varchar2' AND long_pk=2";
+            rs =
+                    conn.createStatement().executeQuery(
+                            "EXPLAIN " + selectFromIndex);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(false, actualExplainPlan, fullTableName, fullIndexName);
+
+            rs = conn.createStatement().executeQuery(selectFromIndex);
+            assertFalse(rs.next());
+        }
+    }
+
+    public static class DeleteFailingRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws
+                IOException {
+            throw new DoNotRetryIOException();
+        }
+    }
+
+    public static boolean verifyRowsForEmptyColValue(Connection conn, String tableName, byte[] valueBytes)
+            throws IOException, SQLException {
+        PTable table = PhoenixRuntime.getTable(conn, tableName);
+        byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+        byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+        HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(table.getPhysicalName().getBytes());
+        Scan scan = new Scan();
+        scan.addColumn(emptyCF, emptyCQ);
+        ResultScanner resultScanner = htable.getScanner(scan);
+
+        for (Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
+            if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, valueBytes.length,
+                    valueBytes, 0, valueBytes.length) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
 
     // This test is know to flap. We need PHOENIX-2582 to be fixed before enabling this back.
     @Ignore
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index eef36fa..6b3f1d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -32,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -41,7 +42,10 @@ import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -54,6 +58,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -93,10 +98,14 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
 import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
 import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.apache.phoenix.util.TransactionUtil;
@@ -897,13 +906,20 @@ public class MutationState implements SQLCloseable {
                     continue;
                 }
                 // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
-                long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef,
-                        multiRowMutationState) : serverTimeStamps[i++];
-                Long scn = connection.getSCN();
-                long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+                long
+                        serverTimestamp =
+                        serverTimeStamps == null ?
+                                validateAndGetServerTimestamp(tableRef, multiRowMutationState) :
+                                serverTimeStamps[i++];
                 final PTable table = tableRef.getTable();
-                Iterator<Pair<PName, List<Mutation>>> mutationsIterator = addRowMutations(tableRef,
-                        multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll);
+                Long scn = connection.getSCN();
+                long mutationTimestamp = scn == null ?
+                    (table.isTransactional() == true ? HConstants.LATEST_TIMESTAMP : EnvironmentEdgeManager.currentTimeMillis())
+                        : scn;
+                Iterator<Pair<PName, List<Mutation>>>
+                        mutationsIterator =
+                        addRowMutations(tableRef, multiRowMutationState, mutationTimestamp,
+                                serverTimestamp, false, sendAll);
                 // build map from physical table to mutation list
                 boolean isDataTable = true;
                 while (mutationsIterator.hasNext()) {
@@ -911,7 +927,9 @@ public class MutationState implements SQLCloseable {
                     PName hTableName = pair.getFirst();
                     List<Mutation> mutationList = pair.getSecond();
                     TableInfo tableInfo = new TableInfo(isDataTable, hTableName, tableRef);
-                    List<Mutation> oldMutationList = physicalTableMutationMap.put(tableInfo, mutationList);
+                    List<Mutation>
+                            oldMutationList =
+                            physicalTableMutationMap.put(tableInfo, mutationList);
                     if (oldMutationList != null) mutationList.addAll(0, oldMutationList);
                     isDataTable = false;
                 }
@@ -932,220 +950,304 @@ public class MutationState implements SQLCloseable {
                     joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations);
                 }
             }
-            long serverTimestamp = HConstants.LATEST_TIMESTAMP;
-            Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = physicalTableMutationMap.entrySet()
-                    .iterator();
-            while (mutationsIterator.hasNext()) {
-                Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next();
-                TableInfo tableInfo = pair.getKey();
-                byte[] htableName = tableInfo.getHTableName().getBytes();
-                List<Mutation> mutationList = pair.getValue();
-                List<List<Mutation>> mutationBatchList =
-                        getMutationBatchList(batchSize, batchSizeBytes, mutationList);
-
-                // create a span per target table
-                // TODO maybe we can be smarter about the table name to string here?
-                Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName));
-
-                int retryCount = 0;
-                boolean shouldRetry = false;
-                long numMutations = 0;
-                long mutationSizeBytes = 0;
-                long mutationCommitTime = 0;
-                long numFailedMutations = 0;
-                ;
-                long startTime = 0;
-                boolean shouldRetryIndexedMutation = false;
-                IndexWriteException iwe = null;
-                do {
-                    TableRef origTableRef = tableInfo.getOrigTableRef();
-                    PTable table = origTableRef.getTable();
-                    table.getIndexMaintainers(indexMetaDataPtr, connection);
-                    final ServerCache cache = tableInfo.isDataTable() ? 
-                            IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
-                                    mutationList, indexMetaDataPtr) : null;
-                    // If we haven't retried yet, retry for this case only, as it's possible that
-                    // a split will occur after we send the index metadata cache to all known
-                    // region servers.
-                    shouldRetry = cache != null;
-                    SQLException sqlE = null;
-                    Table hTable = connection.getQueryServices().getTable(htableName);
-                    try {
-                        if (table.isTransactional()) {
-                            // Track tables to which we've sent uncommitted data
-                            if (tableInfo.isDataTable()) {
-                                uncommittedPhysicalNames.add(table.getPhysicalName().getString());
-                                phoenixTransactionContext.markDMLFence(table);
-                            }
-                            // Only pass true for last argument if the index is being written to on it's own (i.e. initial
-                            // index population), not if it's being written to for normal maintenance due to writes to
-                            // the data table. This case is different because the initial index population does not need
-                            // to be done transactionally since the index is only made active after all writes have
-                            // occurred successfully.
-                            hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX);
+
+            Map<TableInfo, List<Mutation>> unverifiedIndexMutations = new LinkedHashMap<>();
+            Map<TableInfo, List<Mutation>> verifiedOrDeletedIndexMutations = new LinkedHashMap<>();
+            filterIndexCheckerMutations(physicalTableMutationMap, unverifiedIndexMutations,
+                    verifiedOrDeletedIndexMutations);
+
+            // Phase 1: Send index mutations with the empty column value = "unverified"
+            sendMutations(unverifiedIndexMutations.entrySet().iterator(), span, indexMetaDataPtr);
+
+            // Phase 2: Send data table and other indexes
+            sendMutations(physicalTableMutationMap.entrySet().iterator(), span, indexMetaDataPtr);
+
+            // Phase 3: Send put index mutations with the empty column value = "verified" and/or delete index mutations
+            try {
+                sendMutations(verifiedOrDeletedIndexMutations.entrySet().iterator(), span, indexMetaDataPtr);
+            } catch (SQLException ex) {
+                // TODO: add a metric here
+                logger.warn(
+                        "Ignoring exception that happened during setting index verified value to verified=TRUE "
+                                + verifiedOrDeletedIndexMutations.toString(),
+                        ex);
+            }
+
+        }
+    }
+
+    private void sendMutations(Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator, Span span, ImmutableBytesWritable indexMetaDataPtr)
+            throws SQLException {
+        while (mutationsIterator.hasNext()) {
+            Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next();
+            TableInfo tableInfo = pair.getKey();
+            byte[] htableName = tableInfo.getHTableName().getBytes();
+            List<Mutation> mutationList = pair.getValue();
+            List<List<Mutation>> mutationBatchList =
+                    getMutationBatchList(batchSize, batchSizeBytes, mutationList);
+
+            // create a span per target table
+            // TODO maybe we can be smarter about the table name to string here?
+            Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName));
+
+            int retryCount = 0;
+            boolean shouldRetry = false;
+            long numMutations = 0;
+            long mutationSizeBytes = 0;
+            long mutationCommitTime = 0;
+            long numFailedMutations = 0;
+
+            long startTime = 0;
+            boolean shouldRetryIndexedMutation = false;
+            IndexWriteException iwe = null;
+            do {
+                TableRef origTableRef = tableInfo.getOrigTableRef();
+                PTable table = origTableRef.getTable();
+                table.getIndexMaintainers(indexMetaDataPtr, connection);
+                final ServerCache cache = tableInfo.isDataTable() ?
+                        IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
+                                mutationList, indexMetaDataPtr) : null;
+                // If we haven't retried yet, retry for this case only, as it's possible that
+                // a split will occur after we send the index metadata cache to all known
+                // region servers.
+                shouldRetry = cache != null;
+                SQLException sqlE = null;
+                Table hTable = connection.getQueryServices().getTable(htableName);
+                try {
+                    if (table.isTransactional()) {
+                        // Track tables to which we've sent uncommitted data
+                        if (tableInfo.isDataTable()) {
+                            uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+                            phoenixTransactionContext.markDMLFence(table);
                         }
-                        numMutations = mutationList.size();
-                        GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
-                        mutationSizeBytes = calculateMutationSize(mutationList);
-
-                        startTime = System.currentTimeMillis();
-                        child.addTimelineAnnotation("Attempt " + retryCount);
-                        Iterator<List<Mutation>> itrListMutation = mutationBatchList.iterator();
-                        while (itrListMutation.hasNext()) {
-                            final List<Mutation> mutationBatch = itrListMutation.next();
-                            if (shouldRetryIndexedMutation) {
-                                // if there was an index write failure, retry the mutation in a loop
-                                final Table finalHTable = hTable;
-                                final ImmutableBytesWritable finalindexMetaDataPtr =
-                                        indexMetaDataPtr;
-                                final PTable finalPTable = table;
-                                PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() {
-                                    @Override
-                                    public void doMutation() throws IOException {
-                                        try {
-                                            finalHTable.batch(mutationBatch);
-                                        } catch (InterruptedException e) {
-                                            Thread.currentThread().interrupt();
-                                            throw new IOException(e);
-                                        } catch (IOException e) {
-                                            e = updateTableRegionCacheIfNecessary(e);
-                                            throw e;
-                                        }
+                        // Only pass true for last argument if the index is being written to on it's own (i.e. initial
+                        // index population), not if it's being written to for normal maintenance due to writes to
+                        // the data table. This case is different because the initial index population does not need
+                        // to be done transactionally since the index is only made active after all writes have
+                        // occurred successfully.
+                        hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX);
+                    }
+                    numMutations = mutationList.size();
+                    GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
+                    mutationSizeBytes = calculateMutationSize(mutationList);
+
+                    startTime = System.currentTimeMillis();
+                    child.addTimelineAnnotation("Attempt " + retryCount);
+                    Iterator<List<Mutation>> itrListMutation = mutationBatchList.iterator();
+                    while (itrListMutation.hasNext()) {
+                        final List<Mutation> mutationBatch = itrListMutation.next();
+                        if (shouldRetryIndexedMutation) {
+                            // if there was an index write failure, retry the mutation in a loop
+                            final Table finalHTable = hTable;
+                            final ImmutableBytesWritable finalindexMetaDataPtr =
+                                    indexMetaDataPtr;
+                            final PTable finalPTable = table;
+                            PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() {
+                                @Override
+                                public void doMutation() throws IOException {
+                                    try {
+                                        finalHTable.batch(mutationBatch);
+                                    } catch (InterruptedException e) {
+                                        Thread.currentThread().interrupt();
+                                        throw new IOException(e);
+                                    } catch (IOException e) {
+                                        e = updateTableRegionCacheIfNecessary(e);
+                                        throw e;
                                     }
+                                }
 
-                                    @Override
-                                    public List<Mutation> getMutationList() {
-                                        return mutationBatch;
-                                    }
+                                @Override
+                                public List<Mutation> getMutationList() {
+                                    return mutationBatch;
+                                }
 
-                                    private IOException
-                                            updateTableRegionCacheIfNecessary(IOException ioe) {
-                                        SQLException sqlE =
-                                                ServerUtil.parseLocalOrRemoteServerException(ioe);
-                                        if (sqlE != null
-                                                && sqlE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
-                                                        .getErrorCode()) {
-                                            try {
-                                                connection.getQueryServices().clearTableRegionCache(
+                                private IOException
+                                updateTableRegionCacheIfNecessary(IOException ioe) {
+                                    SQLException sqlE =
+                                            ServerUtil.parseLocalOrRemoteServerException(ioe);
+                                    if (sqlE != null
+                                            && sqlE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
+                                            .getErrorCode()) {
+                                        try {
+                                            connection.getQueryServices().clearTableRegionCache(
                                                     finalHTable.getName().getName());
-                                                IndexMetaDataCacheClient.setMetaDataOnMutations(
+                                            IndexMetaDataCacheClient.setMetaDataOnMutations(
                                                     connection, finalPTable, mutationBatch,
                                                     finalindexMetaDataPtr);
-                                            } catch (SQLException e) {
-                                                return ServerUtil.createIOException(
+                                        } catch (SQLException e) {
+                                            return ServerUtil.createIOException(
                                                     "Exception during updating index meta data cache",
                                                     ioe);
-                                            }
                                         }
-                                        return ioe;
                                     }
-                                }, iwe, connection, connection.getQueryServices().getProps());
-                                shouldRetryIndexedMutation = false;
-                            } else {
-                                hTable.batch(mutationBatch);
-                            }
-                            // remove each batch from the list once it gets applied
-                            // so when failures happens for any batch we only start
-                            // from that batch only instead of doing duplicate reply of already
-                            // applied batches from entire list, also we can set
-                            // REPLAY_ONLY_INDEX_WRITES for first batch
-                            // only in case of 1121 SQLException
-                            itrListMutation.remove();
-
-                            batchCount++;
-                            if (logger.isDebugEnabled())
-                                logger.debug("Sent batch of " + mutationBatch.size() + " for "
-                                        + Bytes.toString(htableName));
-                        }
-                        child.stop();
-                        child.stop();
-                        shouldRetry = false;
-                        mutationCommitTime = System.currentTimeMillis() - startTime;
-                        GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
-                        numFailedMutations = 0;
-
-                        // Remove batches as we process them
-                        mutations.remove(origTableRef);
-                        if (tableInfo.isDataTable()) {
-                            numRows -= numMutations;
-                            // recalculate the estimated size
-                            estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(mutations);
+                                    return ioe;
+                                }
+                            }, iwe, connection, connection.getQueryServices().getProps());
+                            shouldRetryIndexedMutation = false;
+                        } else {
+                            hTable.batch(mutationBatch);
                         }
-                    } catch (Exception e) {
-                        mutationCommitTime = System.currentTimeMillis() - startTime;
-                        serverTimestamp = ServerUtil.parseServerTimestamp(e);
-                        SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
-                        if (inferredE != null) {
-                            if (shouldRetry
-                                    && retryCount == 0
-                                    && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
-                                            .getErrorCode()) {
-                                // Swallow this exception once, as it's possible that we split after sending the index
-                                // metadata
-                                // and one of the region servers doesn't have it. This will cause it to have it the next
-                                // go around.
-                                // If it fails again, we don't retry.
-                                String msg = "Swallowing exception and retrying after clearing meta cache on connection. "
-                                        + inferredE;
-                                logger.warn(LogUtil.addCustomAnnotations(msg, connection));
-                                connection.getQueryServices().clearTableRegionCache(htableName);
-
-                                // add a new child span as this one failed
-                                child.addTimelineAnnotation(msg);
-                                child.stop();
-                                child = Tracing.child(span, "Failed batch, attempting retry");
-
-                                continue;
-                            } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
-                                iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE);
-                                if (iwe != null && !shouldRetryIndexedMutation) {
-                                    // For an index write failure, the data table write succeeded,
-                                    // so when we retry we need to set REPLAY_WRITES
-                                    // for first batch in list only.
-                                    for (Mutation m : mutationBatchList.get(0)) {
-                                        if (!PhoenixIndexMetaData.
-                                                isIndexRebuild(m.getAttributesMap())) {
-                                            m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                        // remove each batch from the list once it gets applied
+                        // so when failures happens for any batch we only start
+                        // from that batch only instead of doing duplicate reply of already
+                        // applied batches from entire list, also we can set
+                        // REPLAY_ONLY_INDEX_WRITES for first batch
+                        // only in case of 1121 SQLException
+                        itrListMutation.remove();
+
+                        batchCount++;
+                        if (logger.isDebugEnabled())
+                            logger.debug("Sent batch of " + mutationBatch.size() + " for "
+                                    + Bytes.toString(htableName));
+                    }
+                    child.stop();
+                    child.stop();
+                    shouldRetry = false;
+                    mutationCommitTime = System.currentTimeMillis() - startTime;
+                    GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
+                    numFailedMutations = 0;
+
+                    // Remove batches as we process them
+                    mutations.remove(origTableRef);
+                    if (tableInfo.isDataTable()) {
+                        numRows -= numMutations;
+                        // recalculate the estimated size
+                        estimatedSize = KeyValueUtil.getEstimatedRowMutationSize(mutations);
+                    }
+                } catch (Exception e) {
+                    mutationCommitTime = System.currentTimeMillis() - startTime;
+                    long serverTimestamp = ServerUtil.parseServerTimestamp(e);
+                    SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
+                    if (inferredE != null) {
+                        if (shouldRetry
+                                && retryCount == 0
+                                && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND
+                                .getErrorCode()) {
+                            // Swallow this exception once, as it's possible that we split after sending the index
+                            // metadata
+                            // and one of the region servers doesn't have it. This will cause it to have it the next
+                            // go around.
+                            // If it fails again, we don't retry.
+                            String msg = "Swallowing exception and retrying after clearing meta cache on connection. "
+                                    + inferredE;
+                            logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+                            connection.getQueryServices().clearTableRegionCache(htableName);
+
+                            // add a new child span as this one failed
+                            child.addTimelineAnnotation(msg);
+                            child.stop();
+                            child = Tracing.child(span, "Failed batch, attempting retry");
+
+                            continue;
+                        } else if (inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
+                            iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE);
+                            if (iwe != null && !shouldRetryIndexedMutation) {
+                                // For an index write failure, the data table write succeeded,
+                                // so when we retry we need to set REPLAY_WRITES
+                                // for first batch in list only.
+                                for (Mutation m : mutationBatchList.get(0)) {
+                                    if (!PhoenixIndexMetaData.isIndexRebuild(
+                                            m.getAttributesMap())){
+                                        m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
                                                 BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES
-                                                );
-                                        }
-                                        KeyValueUtil.setTimestamp(m, serverTimestamp);
+                                        );
                                     }
-                                    shouldRetry = true;
-                                    shouldRetryIndexedMutation = true;
-                                    continue;
+                                    KeyValueUtil.setTimestamp(m, serverTimestamp);
                                 }
+                                shouldRetry = true;
+                                shouldRetryIndexedMutation = true;
+                                continue;
                             }
-                            e = inferredE;
                         }
-                        // Throw to client an exception that indicates the statements that
-                        // were not committed successfully.
-                        int[] uncommittedStatementIndexes = getUncommittedStatementIndexes();
-                        sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp);
-                        numFailedMutations = uncommittedStatementIndexes.length;
-                        GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
+                        e = inferredE;
+                    }
+                    // Throw to client an exception that indicates the statements that
+                    // were not committed successfully.
+                    int[] uncommittedStatementIndexes = getUncommittedStatementIndexes();
+                    sqlE = new CommitException(e, uncommittedStatementIndexes, serverTimestamp);
+                    numFailedMutations = uncommittedStatementIndexes.length;
+                    GLOBAL_MUTATION_BATCH_FAILED_COUNT.update(numFailedMutations);
+                } finally {
+                    MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes,
+                            mutationCommitTime, numFailedMutations);
+                    mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
+                    try {
+                        if (cache != null) cache.close();
                     } finally {
-                        MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes,
-                                mutationCommitTime, numFailedMutations);
-                        mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric);
                         try {
-                            if (cache != null) cache.close();
-                        } finally {
-                            try {
-                                hTable.close();
-                            } catch (IOException e) {
-                                if (sqlE != null) {
-                                    sqlE.setNextException(ServerUtil.parseServerException(e));
-                                } else {
-                                    sqlE = ServerUtil.parseServerException(e);
-                                }
+                            hTable.close();
+                        } catch (IOException e) {
+                            if (sqlE != null) {
+                                sqlE.setNextException(ServerUtil.parseServerException(e));
+                            } else {
+                                sqlE = ServerUtil.parseServerException(e);
                             }
-                            if (sqlE != null) { throw sqlE; }
                         }
+                        if (sqlE != null) { throw sqlE; }
                     }
-                } while (shouldRetry && retryCount++ < 1);
+                }
+            } while (shouldRetry && retryCount++ < 1);
+        }
+    }
+
+    private void filterIndexCheckerMutations(Map<TableInfo, List<Mutation>> mutationMap,
+            Map<TableInfo, List<Mutation>> unverifiedIndexMutations,
+            Map<TableInfo, List<Mutation>> verifiedOrDeletedIndexMutations) throws SQLException {
+        Iterator<Entry<TableInfo, List<Mutation>>> mapIter = mutationMap.entrySet().iterator();
+
+        while (mapIter.hasNext()) {
+            Entry<TableInfo, List<Mutation>> pair = mapIter.next();
+            TableInfo tableInfo = pair.getKey();
+            if (tableInfo.getOrigTableRef().getTable().isImmutableRows() && IndexUtil.isGlobalIndexCheckerEnabled(connection, tableInfo.getHTableName())) {
+                PTable table = PhoenixRuntime.getTable(connection, tableInfo.getHTableName().getString());
+                byte[] emptyCF = SchemaUtil.getEmptyColumnFamily(table);
+                byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst();
+
+                List<Mutation> mutations = pair.getValue();
+
+                for (Mutation m : mutations) {
+                    if (m == null) {
+                        continue;
+                    }
+                    if (m instanceof Delete) {
+                        Put put = new Put(m.getRow());
+                        put.addColumn(emptyCF, emptyCQ, IndexRegionObserver.getMaxTimestamp(m),
+                                IndexRegionObserver.UNVERIFIED_BYTES);
+                        // The Delete gets marked as unverified in Phase 1 and gets deleted on Phase 3.
+                        addToMap(unverifiedIndexMutations, tableInfo, put);
+                        addToMap(verifiedOrDeletedIndexMutations, tableInfo, m);
+                    } else if (m instanceof Put) {
+                        long timestamp = IndexRegionObserver.getMaxTimestamp(m);
+
+                        // Phase 1 index mutations are set to unverified
+                        ((Put) m).addColumn(emptyCF, emptyCQ, timestamp, IndexRegionObserver.UNVERIFIED_BYTES);
+                        addToMap(unverifiedIndexMutations, tableInfo, m);
+
+                        // Phase 3 mutations are verified
+                        Put verifiedPut = new Put(m.getRow());
+                        verifiedPut.addColumn(emptyCF, emptyCQ, timestamp,
+                                IndexRegionObserver.VERIFIED_BYTES);
+                        addToMap(verifiedOrDeletedIndexMutations, tableInfo, verifiedPut);
+                    } else {
+                        addToMap(unverifiedIndexMutations, tableInfo, m);
+                    }
+                }
+
+                mapIter.remove();
             }
+
+        }
+    }
+
+    private void addToMap(Map<TableInfo, List<Mutation>> map, TableInfo tableInfo, Mutation mutation) {
+        List<Mutation> mutations = null;
+        if (map.containsKey(tableInfo)) {
+            mutations = map.get(tableInfo);
+        } else {
+            mutations = Lists.newArrayList();
         }
+        mutations.add(mutation);
+        map.put(tableInfo, mutations);
     }
 
     /**
@@ -1270,7 +1372,7 @@ public class MutationState implements SQLCloseable {
                             logger.info(e.getClass().getName() + " at timestamp " + getInitialWritePointer()
                                     + " with retry count of " + retryCount);
                         retryCommit = (e.getErrorCode() == SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION
-                                .getErrorCode() && retryCount < MAX_COMMIT_RETRIES);
+                            .getErrorCode() && retryCount < MAX_COMMIT_RETRIES);
                         if (sqlE == null) {
                             sqlE = e;
                         } else {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 520aff3..516a088 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -376,7 +376,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
         "Somehow didn't return an index update but also didn't propagate the failure to the client!");
   }
 
-  private long getMaxTimestamp(Mutation m) {
+  public static long getMaxTimestamp(Mutation m) {
       long maxTs = 0;
       long ts = 0;
       Iterator iterator = m.getFamilyCellMap().entrySet().iterator();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 2e62745..6ba147f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -974,10 +974,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
             put = new Put(indexRowKey);
             // add the keyvalue for the empty row
             put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey),
-                this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts,
+                    this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts,
                     QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR));
             put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
         }
+
         ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey);
         if (immutableStorageScheme != ImmutableStorageScheme.ONE_CELL_PER_COLUMN) {
             // map from index column family to list of pair of index column and data column (for covered columns)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index dd06593..30bbc46 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -906,7 +906,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             if (tableType == PTableType.INDEX && !isTransactional) {
                 if (!indexRegionObserverEnabled && descriptor.hasCoprocessor(GlobalIndexChecker.class.getName())) {
                     descriptor.removeCoprocessor(GlobalIndexChecker.class.getName());
-                } else if (indexRegionObserverEnabled && !descriptor.hasCoprocessor(GlobalIndexChecker.class.getName())) {
+                } else if (indexRegionObserverEnabled && !descriptor.hasCoprocessor(GlobalIndexChecker.class.getName()) &&
+                        !isLocalIndexTable(descriptor.getFamiliesKeys())) {
                     descriptor.addCoprocessor(GlobalIndexChecker.class.getName(), null, priority - 1, null);
                 }
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 9df3f74..d67cb11 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -194,6 +194,7 @@ public class QueryServicesOptions {
     public static final long DEFAULT_GROUPBY_MAX_CACHE_MAX = 1024L*1024L*100L;  // 100 Mb
 
     public static final long DEFAULT_SEQUENCE_CACHE_SIZE = 100;  // reserve 100 sequences at a time
+    public static final  int GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN = 10;
     public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS =  60000 * 30; // 30 mins
     public static final long DEFAULT_MAX_SERVER_METADATA_CACHE_SIZE =  1024L*1024L*20L; // 20 Mb
     public static final long DEFAULT_MAX_CLIENT_METADATA_CACHE_SIZE =  1024L*1024L*10L; // 10 Mb
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 79a0dfe..aebe9fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -40,7 +40,10 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -96,6 +99,7 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -106,6 +110,7 @@ import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.ColumnRef;
@@ -138,7 +143,12 @@ import com.google.common.collect.Lists;
 public class IndexUtil {
     public static final String INDEX_COLUMN_NAME_SEP = ":";
     public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP);
-    
+
+    private static Cache<String, Boolean> indexNameGlobalIndexCheckerEnabledMap = CacheBuilder.newBuilder()
+            .expireAfterWrite(QueryServicesOptions.GLOBAL_INDEX_CHECKER_ENABLED_MAP_EXPIRATION_MIN,
+                    TimeUnit.MINUTES)
+            .build();
+
     private IndexUtil() {
     }
 
@@ -295,11 +305,37 @@ public class IndexUtil {
                             .getLength()) == 0);
     }
 
+
+    public static boolean isGlobalIndexCheckerEnabled(PhoenixConnection connection, PName index)
+            throws SQLException {
+        String indexName = index.getString();
+        Boolean entry = indexNameGlobalIndexCheckerEnabledMap.getIfPresent(indexName);
+        if (entry != null){
+            return entry;
+        }
+
+        boolean result = false;
+        try {
+            HTableDescriptor desc = connection.getQueryServices().getTableDescriptor(index.getBytes());
+
+            if (desc != null) {
+                if (desc.hasCoprocessor(GlobalIndexChecker.class.getName())) {
+                    result = true;
+                }
+            }
+            indexNameGlobalIndexCheckerEnabledMap.put(indexName, result);
+        } catch (TableNotFoundException ex) {
+            // We can swallow this because some indexes don't have separate tables like local indexes
+        }
+
+        return result;
+    }
+
     public static List<Mutation> generateIndexData(final PTable table, PTable index,
             final MultiRowMutationState multiRowMutationState, List<Mutation> dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection)
             throws SQLException {
         try {
-        	final ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+            final ImmutableBytesPtr ptr = new ImmutableBytesPtr();
             IndexMaintainer maintainer = index.getIndexMaintainer(table, connection);
             List<Mutation> indexMutations = Lists.newArrayListWithExpectedSize(dataMutations.size());
             for (final Mutation dataMutation : dataMutations) {
@@ -313,12 +349,12 @@ public class IndexUtil {
                  */
                 if (dataMutation instanceof Put) {
                     ValueGetter valueGetter = new ValueGetter() {
-                    	
-                    	@Override
+
+                        @Override
                         public byte[] getRowKey() {
-                    		return dataMutation.getRow();
-                    	}
-        
+                            return dataMutation.getRow();
+                        }
+
                         @Override
                         public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
                             // Always return null for our empty key value, as this will cause the index
@@ -335,15 +371,15 @@ public class IndexUtil {
                             }
                             for (Cell kv : kvs) {
                                 if (Bytes.compareTo(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), family, 0, family.length) == 0 &&
-                                    Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), qualifier, 0, qualifier.length) == 0) {
-                                  ImmutableBytesPtr ptr = new ImmutableBytesPtr();
-                                  kvBuilder.getValueAsPtr(kv, ptr);
-                                  return ptr;
+                                        Bytes.compareTo(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(), qualifier, 0, qualifier.length) == 0) {
+                                    ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+                                    kvBuilder.getValueAsPtr(kv, ptr);
+                                    return ptr;
                                 }
                             }
                             return null;
                         }
-                        
+
                     };
                     byte[] regionStartKey = null;
                     byte[] regionEndkey = null;
@@ -929,7 +965,7 @@ public class IndexUtil {
     }
 
     public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException {
-        if (table.isTransactional() || table.isImmutableRows() || table.getType() != PTableType.INDEX) {
+        if (table.isTransactional() || table.getType() != PTableType.INDEX) {
             return;
         }
         PTable indexTable = table;
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 4a628d8..ae5a84e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -51,7 +51,6 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -72,7 +71,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.phoenix.compile.AggregationManager;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.SequenceManager;
@@ -875,9 +873,13 @@ public class TestUtil {
         System.out.println("-----------------------------------------------");
     }
 
-    public static int getRawRowCount(HTableInterface table) throws IOException {
+    public static int getRawRowCount(Table table) throws IOException {
+        return getRowCount(table, true);
+    }
+
+    public static int getRowCount(Table table, boolean isRaw) throws IOException {
         Scan s = new Scan();
-        s.setRaw(true);;
+        s.setRaw(isRaw);;
         s.setMaxVersions();
         int rows = 0;
         try (ResultScanner scanner = table.getScanner(s)) {
@@ -938,6 +940,7 @@ public class TestUtil {
 
     public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
         int maxTries = 60, nTries = 0;
+        PIndexState actualIndexState = null;
         do {
             String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName);
             String index = SchemaUtil.getTableNameFromFullName(fullIndexName);
@@ -947,7 +950,6 @@ public class TestUtil {
                     + ") = (" + "'" + schema + "','" + index + "') "
                     + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
             ResultSet rs = conn.createStatement().executeQuery(query);
-            PIndexState actualIndexState = null;
             if (rs.next()) {
                 actualIndexState = PIndexState.fromSerializedValue(rs.getString(1));
                 boolean matchesExpected = (actualIndexState == expectedIndexState);
@@ -956,7 +958,8 @@ public class TestUtil {
                 }
             }
         } while (++nTries < maxTries);
-        fail("Ran out of time waiting for index state to become " + expectedIndexState);
+        fail("Ran out of time waiting for index state to become " + expectedIndexState + " last seen actual state is " +
+                (actualIndexState == null ? "Unknown" : actualIndexState.toString()));
     }
 
     public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws InterruptedException, SQLException {