You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/06/11 19:24:22 UTC

[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5156 Consistent Mutable Global Indexes for Non-Transactional Tables

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new e435e50  PHOENIX-5156 Consistent Mutable Global Indexes for Non-Transactional Tables
e435e50 is described below

commit e435e505ae5c7e5a174bde4d5fb56c53add723e7
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Jun 6 23:10:47 2019 -0700

    PHOENIX-5156 Consistent Mutable Global Indexes for Non-Transactional Tables
---
 ...WALReplayWithIndexWritesAndCompressedWALIT.java |   2 +
 .../end2end/ExplainPlanWithStatsEnabledIT.java     |  19 +-
 .../phoenix/end2end/IndexBuildTimestampIT.java     |   2 +-
 .../phoenix/end2end/ParallelStatsDisabledIT.java   |   1 -
 .../end2end/index/GlobalIndexCheckerIT.java        | 253 ++++++
 .../end2end/index/GlobalMutableNonTxIndexIT.java   |  12 +-
 ...MutableNonTxIndexWithLazyPostBatchWriteIT.java} |  25 +-
 .../end2end/index/MutableIndexFailureIT.java       |   1 +
 .../index/MutableIndexFailureWithNamespaceIT.java  |   3 +-
 .../end2end/index/PartialIndexRebuilderIT.java     |   3 +-
 .../org/apache/phoenix/rpc/PhoenixServerRpcIT.java |   3 +-
 .../org/apache/phoenix/util/IndexScrutinyIT.java   |   6 +-
 .../coprocessor/BaseScannerRegionObserver.java     |   5 +
 .../UngroupedAggregateRegionObserver.java          |   4 +
 .../org/apache/phoenix/execute/BaseQueryPlan.java  |   4 +-
 .../phoenix/filter/ColumnProjectionFilter.java     |  10 +
 .../EncodedQualifiersColumnProjectionFilter.java   |   4 +
 .../MultiEncodedCQKeyValueComparisonFilter.java    |   6 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   | 882 +++++++++++++++++++++
 .../apache/phoenix/hbase/index/LockManager.java    |  20 +-
 .../hbase/index/builder/BaseIndexBuilder.java      |   4 +-
 .../hbase/index/builder/BaseIndexCodec.java        |   3 +-
 .../hbase/index/builder/IndexBuildManager.java     |  32 +-
 .../phoenix/hbase/index/builder/IndexBuilder.java  |   4 +-
 .../phoenix/hbase/index/covered/IndexCodec.java    |   2 +-
 ...a => AbstractParallelWriterIndexCommitter.java} | 125 ++-
 .../phoenix/hbase/index/write/IndexCommitter.java  |   2 +-
 .../phoenix/hbase/index/write/IndexWriter.java     |  77 +-
 .../LazyParallelWriterIndexCommitter.java}         |  34 +-
 .../index/write/ParallelWriterIndexCommitter.java  | 194 +----
 .../TrackingParallelWriterIndexCommitter.java      |  14 +-
 .../apache/phoenix/index/GlobalIndexChecker.java   | 378 +++++++++
 .../org/apache/phoenix/index/IndexMaintainer.java  |  11 +
 .../apache/phoenix/index/PhoenixIndexBuilder.java  |   2 +-
 .../apache/phoenix/index/PhoenixIndexCodec.java    |   2 +-
 .../phoenix/iterate/TableResultIterator.java       |   2 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  43 +-
 .../org/apache/phoenix/query/QueryConstants.java   |   3 +
 .../org/apache/phoenix/query/QueryServices.java    |   7 +
 .../apache/phoenix/query/QueryServicesOptions.java |   4 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |   3 +
 .../java/org/apache/phoenix/util/IndexUtil.java    |  94 ++-
 .../java/org/apache/phoenix/query/BaseTest.java    |   3 +
 43 files changed, 1960 insertions(+), 348 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
index 72ef074..fa248a5 100644
--- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
+++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ConfigUtil;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -81,6 +82,7 @@ import org.slf4j.LoggerFactory;
  * This test should only have a single test - otherwise we will start/stop the minicluster multiple
  * times, which is probably not what you want to do (mostly because its so much effort).
  */
+@Ignore
 @Category(NeedsOwnMiniClusterTest.class)
 public class WALReplayWithIndexWritesAndCompressedWALIT {
 
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
index bd95dc1..072b39d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java
@@ -160,10 +160,13 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
         String sql = "SELECT * FROM " + tableA + " where (c1.a > c2.b) limit 1";
         List<Object> binds = Lists.newArrayList();
         try (Connection conn = DriverManager.getConnection(getUrl())) {
+            ResultSet rs = conn.createStatement().executeQuery(sql);
+            assertFalse(rs.next());
             Estimate info = getByteRowEstimates(conn, sql, binds);
-            assertEquals((Long) 691L, info.estimatedBytes);
+            assertEquals((Long) 390L, info.estimatedBytes);
             assertEquals((Long) 10L, info.estimatedRows);
             assertTrue(info.estimateInfoTs > 0);
+
         }
     }
 
@@ -186,8 +189,20 @@ public class ExplainPlanWithStatsEnabledIT extends ParallelStatsEnabledIT {
         List<Object> binds = Lists.newArrayList();
         binds.add(0);
         try (Connection conn = DriverManager.getConnection(getUrl())) {
+            try (PreparedStatement statement = conn.prepareStatement(sql)) {
+                int paramIdx = 1;
+                for (Object bind : binds) {
+                    statement.setObject(paramIdx++, bind);
+                }
+                ResultSet rs = statement.executeQuery(sql);
+                assertTrue(rs.next());
+                assertEquals(100, rs.getInt(1));
+                assertEquals(1, rs.getInt(2));
+                assertEquals(3, rs.getInt(3));
+                assertTrue(rs.next());
+            }
             Estimate info = getByteRowEstimates(conn, sql, binds);
-            assertEquals((Long) 691L, info.estimatedBytes);
+            assertEquals((Long) 390L, info.estimatedBytes);
             assertEquals((Long) 10L, info.estimatedRows);
             assertTrue(info.estimateInfoTs > 0);
         }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
index 2d166a5..e39a81f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexBuildTimestampIT.java
@@ -76,7 +76,7 @@ public class IndexBuildTimestampIT extends BaseUniqueNamesOwnClusterIT {
     @Parameters(
             name = "mutable={0},localIndex={1},async={2},view={3}")
     public static Collection<Object[]> data() {
-        List<Object[]> list = Lists.newArrayListWithExpectedSize(8);
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(16);
         boolean[] Booleans = new boolean[]{false, true};
         for (boolean mutable : Booleans) {
             for (boolean localIndex : Booleans) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
index a46de49..5b1532b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelStatsDisabledIT.java
@@ -21,7 +21,6 @@ package org.apache.phoenix.end2end;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.QueryBuilder;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
new file mode 100644
index 0000000..6a28e71
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import com.google.common.collect.Lists;
+
+@RunWith(Parameterized.class)
+public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
+    private final boolean async;
+    private final String tableDDLOptions;
+
+    public GlobalIndexCheckerIT(boolean async, boolean encoded) {
+        this.async = async;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (!encoded) {
+            optionBuilder.append(" COLUMN_ENCODED_BYTES=0 ");
+        }
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Parameters(
+            name = "async={0},encoded={1}")
+    public static Collection<Object[]> data() {
+        List<Object[]> list = Lists.newArrayListWithExpectedSize(4);
+        boolean[] Booleans = new boolean[]{true, false};
+            for (boolean async : Booleans) {
+                for (boolean encoded : Booleans) {
+                    list.add(new Object[]{async, encoded});
+                }
+            }
+        return list;
+    }
+
+    public static void assertExplainPlan(Connection conn, String selectSql,
+                                         String dataTableFullName, String indexTableFullName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+        String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+        IndexToolIT.assertExplainPlan(false, actualExplainPlan, dataTableFullName, indexTableFullName);
+    }
+
+    private void populateTable(String tableName) throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("create table " + tableName +
+                " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+        conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
+        conn.commit();
+        conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')");
+        conn.commit();
+        conn.close();
+    }
+
+    @Test
+    public void testSkipPostIndexDeleteUpdate() throws Exception {
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName);
+        Connection conn = DriverManager.getConnection(getUrl());
+        String indexName = generateUniqueName();
+        conn.createStatement().execute("CREATE INDEX " + indexName + " on " +
+                dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+        if (async) {
+            // run the index MR job.
+            IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName);
+        }
+        String selectSql =  "SELECT id from " + dataTableName + " WHERE val1  = 'ab'";
+        ResultSet rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue(rs.next());
+        assertEquals("a", rs.getString(1));
+        assertFalse(rs.next());
+
+        // Configure Indexer to skip the last write phase (i.e., the post index update phase) where the verify flag is set
+        // to true and/or index rows are deleted and check that this does not impact the correctness
+        IndexRegionObserver.setSkipPostIndexUpdatesForTesting(true);
+        String dml = "DELETE from " + dataTableName + " WHERE id  = 'a'";
+        assertEquals(1, conn.createStatement().executeUpdate(dml));
+        conn.commit();
+
+        // The index rows are actually not deleted yet because Indexer skipped delete operation. However, they are
+        // made unverified in the pre index update phase (i.e., the first write phase)
+        dml = "DELETE from " + dataTableName + " WHERE val1  = 'ab'";
+        // This DML will scan the Index table and detect unverified index rows. This will trigger read repair which
+        // result in deleting these rows since the corresponding data table rows are deleted already. So, the number of
+        // rows to be deleted by the "DELETE" DML will be zero since the rows deleted by read repair will not be visible
+        // to the DML
+        assertEquals(0,conn.createStatement().executeUpdate(dml));
+
+        // Count the number of index rows
+        String query = "SELECT COUNT(*) from " + indexName;
+        // There should be one row in the index table
+        rs = conn.createStatement().executeQuery(query);
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        conn.close();
+    }
+
+    @Test
+    public void testPartialRowUpdate() throws Exception {
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName);
+        Connection conn = DriverManager.getConnection(getUrl());
+        String indexName = generateUniqueName();
+        conn.createStatement().execute("CREATE INDEX " + indexName + " on " +
+                dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+        if (async) {
+            // run the index MR job.
+            IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName);
+        }
+        conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+        conn.commit();
+        conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('c', 'cde')");
+        conn.commit();
+        String selectSql =  "SELECT * from " + dataTableName + " WHERE val1  = 'ab'";
+        // Verify that we will read from the index table
+        assertExplainPlan(conn, selectSql, dataTableName, indexName);
+        ResultSet rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue(rs.next());
+        assertEquals("a", rs.getString(1));
+        assertEquals("ab", rs.getString(2));
+        assertEquals("abcc", rs.getString(3));
+        assertEquals("abcd", rs.getString(4));
+        assertFalse(rs.next());
+
+        conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', 'ab', 'abcdd')");
+        conn.commit();
+        // Verify that we will read from the index table
+        assertExplainPlan(conn, selectSql, dataTableName, indexName);
+        rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue(rs.next());
+        assertEquals("a", rs.getString(1));
+        assertEquals("ab", rs.getString(2));
+        assertEquals("abcc", rs.getString(3));
+        assertEquals("abcdd", rs.getString(4));
+        assertFalse(rs.next());
+        conn.close();
+    }
+
+    @Test
+    public void testSkipPostIndexPartialRowUpdate() throws Exception {
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName);
+        Connection conn = DriverManager.getConnection(getUrl());
+        String indexName = generateUniqueName();
+        conn.createStatement().execute("CREATE INDEX " + indexName + " on " +
+                dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+        if (async) {
+            // run the index MR job.
+            IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName);
+        }
+        // Configure Indexer to skip the last write phase (i.e., the post index update phase) where the verify flag is set
+        // to true and/or index rows are deleted and check that this does not impact the correctness
+        IndexRegionObserver.setSkipPostIndexUpdatesForTesting(true);
+        conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+        conn.commit();
+        conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('c', 'cd','cde')");
+        conn.commit();
+        String selectSql =  "SELECT val2, val3 from " + dataTableName + " WHERE val1  = 'ab'";
+        // Verify that we will read from the index table
+        assertExplainPlan(conn, selectSql, dataTableName, indexName);
+        ResultSet rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue(rs.next());
+        assertEquals("abcc", rs.getString(1));
+        assertEquals("abcd", rs.getString(2));
+        assertFalse(rs.next());
+        conn.close();
+    }
+
+    @Test
+    public void testSkipDataTableAndPostIndexPartialRowUpdate() throws Exception {
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName);
+        Connection conn = DriverManager.getConnection(getUrl());
+        String indexName = generateUniqueName();
+        conn.createStatement().execute("CREATE INDEX " + indexName + "1 on " +
+                dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : ""));
+        conn.createStatement().execute("CREATE INDEX " + indexName + "2 on " +
+                dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : ""));
+        if (async) {
+            // run the index MR job.
+            IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "1");
+            IndexToolIT.runIndexTool(true, false, null, dataTableName, indexName + "2");
+        }
+        // Configure Indexer to skip the last two write phase (i.e., the data table update and post index update phase)
+        // and check that this does not impact the correctness
+        IndexRegionObserver.setSkipDataTableUpdatesForTesting(true);
+        IndexRegionObserver.setSkipPostIndexUpdatesForTesting(true);
+        conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+        conn.commit();
+        IndexRegionObserver.setSkipDataTableUpdatesForTesting(false);
+        IndexRegionObserver.setSkipPostIndexUpdatesForTesting(false);
+        conn.createStatement().execute("upsert into " + dataTableName + " (id, val3) values ('a', 'abcdd')");
+        conn.commit();
+        String selectSql =  "SELECT val2, val3 from " + dataTableName + " WHERE val1  = 'ab'";
+        // Verify that we will read from the first index table
+        assertExplainPlan(conn, selectSql, dataTableName, indexName + "1");
+        ResultSet rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue(rs.next());
+        assertEquals("abc", rs.getString(1));
+        assertEquals("abcdd", rs.getString(2));
+        assertFalse(rs.next());
+        selectSql =  "SELECT val2, val3 from " + dataTableName + " WHERE val2  = 'abc'";
+        // Verify that we will read from the second index table
+        assertExplainPlan(conn, selectSql, dataTableName, indexName + "2");
+        rs = conn.createStatement().executeQuery(selectSql);
+        assertTrue(rs.next());
+        assertEquals("abc", rs.getString(1));
+        assertEquals("abcdd", rs.getString(2));
+        assertFalse(rs.next());
+        conn.close();
+    }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
index 023bc1e..35d5049 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
@@ -20,19 +20,23 @@ package org.apache.phoenix.end2end.index;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.junit.runners.Parameterized.Parameters;
 
 public class GlobalMutableNonTxIndexIT extends BaseIndexIT {
 
-    public GlobalMutableNonTxIndexIT(boolean localIndex, boolean mutable, String transactionProvider, boolean columnEncoded) {
+    public GlobalMutableNonTxIndexIT(boolean localIndex, boolean mutable, String transactionProvider, boolean columnEncoded, boolean skipPostIndexUpdates) {
         super(localIndex, mutable, transactionProvider, columnEncoded);
+        IndexRegionObserver.setSkipPostIndexUpdatesForTesting(skipPostIndexUpdates);
     }
 
-    @Parameters(name="GlobalMutableNonTxIndexIT_localIndex={0},mutable={1},transactionProvider={2},columnEncoded={3}") // name is used by failsafe as file name in reports
+    @Parameters(name="GlobalMutableNonTxIndexIT_localIndex={0},mutable={1},transactionProvider={2},columnEncoded={3},skipPostIndexUpdates={4}") // name is used by failsafe as file name in reports
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
-                { false, true, null, false }, { false, true, null, true }
+                {false, true, null, false, false},
+                {false, true, null, false, true},
+                {false, true, null, true, false},
+                {false, true, null, true, true}
            });
     }
-
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexWithLazyPostBatchWriteIT.java
similarity index 53%
copy from phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
copy to phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexWithLazyPostBatchWriteIT.java
index 023bc1e..d296d0d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexWithLazyPostBatchWriteIT.java
@@ -17,22 +17,23 @@
  */
 package org.apache.phoenix.end2end.index;
 
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.Map;
 
-import org.junit.runners.Parameterized.Parameters;
+import com.google.common.collect.Maps;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 
-public class GlobalMutableNonTxIndexIT extends BaseIndexIT {
+public class GlobalMutableNonTxIndexWithLazyPostBatchWriteIT extends GlobalMutableNonTxIndexIT {
 
-    public GlobalMutableNonTxIndexIT(boolean localIndex, boolean mutable, String transactionProvider, boolean columnEncoded) {
-        super(localIndex, mutable, transactionProvider, columnEncoded);
+    public GlobalMutableNonTxIndexWithLazyPostBatchWriteIT(boolean localIndex, boolean mutable, String transactionProvider, boolean columnEncoded, boolean skipPostIndexUpdates) {
+        super(localIndex, mutable, transactionProvider, columnEncoded, skipPostIndexUpdates);
     }
 
-    @Parameters(name="GlobalMutableNonTxIndexIT_localIndex={0},mutable={1},transactionProvider={2},columnEncoded={3}") // name is used by failsafe as file name in reports
-    public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] {
-                { false, true, null, false }, { false, true, null, true }
-           });
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true");
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
-
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 8be32fc..adc5ae9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -142,6 +142,7 @@ public class MutableIndexFailureIT extends BaseTest {
         Map<String, String> serverProps = getServerProps();
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
         clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
         NUM_SLAVES_BASE = 4;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
         indexRebuildTaskRegionEnvironment =
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
index b6ec49a..fb6e555 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
@@ -50,9 +50,10 @@ public class MutableIndexFailureWithNamespaceIT extends MutableIndexFailureIT {
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = getServerProps();
         serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(3);
         clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.TRUE.toString());
         clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
         NUM_SLAVES_BASE = 4;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
         TableName systemTable = SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 6b21815..6ec3e2f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -96,8 +96,9 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000");
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED));
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
         clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, Boolean.FALSE.toString());
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
         indexRebuildTaskRegionEnvironment =
                 (RegionCoprocessorEnvironment) getUtility()
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index ab05c16..c49a822 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -99,7 +99,8 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
             ensureTablesOnDifferentRegionServers(dataTableFullName, indexTableFullName);
             TestPhoenixIndexRpcSchedulerFactory.reset();
             upsertRow(conn, dataTableFullName);
-            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor())
+            // An index row is updated twice, once before the data table row, and once after it. Thus, the factory is invoked twice
+            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(), Mockito.times(2))
                     .dispatch(Mockito.any(CallRunner.class));
             TestPhoenixIndexRpcSchedulerFactory.reset();
             // run select query that should use the index
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index 3277e32..e7e27a9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -48,7 +48,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
                 IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
                 fail();
             } catch (AssertionError e) {
-                assertEquals(e.getMessage(),"Expected data table row count to match expected:<2> but was:<1>");
+                assertEquals("Expected data table row count to match expected:<2> but was:<1>", e.getMessage());
             }
         }
     }
@@ -72,7 +72,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
                 IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
                 fail();
             } catch (AssertionError e) {
-                assertEquals(e.getMessage(),"Expected to find PK in data table: ('x')");
+                assertEquals("Expected to find PK in data table: ('x')", e.getMessage());
             }
         }
     }
@@ -97,7 +97,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
                 IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
                 fail();
             } catch (AssertionError e) {
-                assertEquals(e.getMessage(),"Expected equality for V2, but '2'!='1'");
+                assertEquals("Expected equality for V2, but '2'!='1'", e.getMessage());
             }
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b02d5b4..24374b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -108,6 +108,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public final static String IMMUTABLE_STORAGE_ENCODING_SCHEME = "_ImmutableStorageEncodingScheme";
     public final static String USE_ENCODED_COLUMN_QUALIFIER_LIST = "_UseEncodedColumnQualifierList";
     public static final String CLIENT_VERSION = "_ClientVersion";
+    public static final String CHECK_VERIFY_COLUMN = "_CheckVerifyColumn";
+    public static final String PHYSICAL_DATA_TABLE_NAME = "_PhysicalDataTableName";
+    public static final String EMPTY_COLUMN_FAMILY_NAME = "_EmptyCFName";
+    public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
+    public static final String SCAN_LIMIT = "_ScanLimit";
     
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index f6569f5..5595dda 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1067,6 +1067,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
         }
         byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        byte[] scanLimitBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_LIMIT);
+        int scanLimit = (scanLimitBytes != null) ? Bytes.toInt(scanLimitBytes) : 0;
         boolean hasMore;
         int rowCount = 0;
         try {
@@ -1121,6 +1123,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             mutations.clear();
                         }
                         rowCount++;
+                        if (rowCount == scanLimit)
+                            break;
                     }
                     
                 } while (hasMore);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index 1b5876c..4f1c0fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -394,7 +394,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
     }
 
-    private void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) {
+    public static void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) {
         int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0);
         int nViewConstants = 0;
         if (dataTable.getType() == PTableType.VIEW) {
@@ -424,7 +424,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
     }
 
-    private void serializeViewConstantsIntoScan(byte[][] viewConstants, Scan scan) {
+    private static void serializeViewConstantsIntoScan(byte[][] viewConstants, Scan scan) {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         try {
             DataOutputStream output = new DataOutputStream(stream);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
index 3d6843d..6e84b5a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java
@@ -189,4 +189,14 @@ public class ColumnProjectionFilter extends FilterBase implements Writable {
     public ReturnCode filterKeyValue(Cell ignored) throws IOException {
       return ReturnCode.INCLUDE_AND_NEXT_COL;
     }
+
+    public void addTrackedColumn(ImmutableBytesPtr cf, ImmutableBytesPtr cq) {
+        NavigableSet<ImmutableBytesPtr> columns = columnsTracker.get(cf);
+
+        if (columns == null) {
+            columns = new TreeSet<>();
+            columnsTracker.put(cf, columns);
+        }
+        columns.add(cq);
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java
index 92ac128..8d401f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/EncodedQualifiersColumnProjectionFilter.java
@@ -152,6 +152,10 @@ public class EncodedQualifiersColumnProjectionFilter extends FilterBase implemen
     public ReturnCode filterKeyValue(Cell ignored) throws IOException {
       return ReturnCode.INCLUDE_AND_NEXT_COL;
     }
+
+    public void addTrackedColumn(int qualifier) {
+        trackedColumns.set(qualifier);
+    }
     
     interface ColumnTracker {
         
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java
index 4e26515..b14a957 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiEncodedCQKeyValueComparisonFilter.java
@@ -375,7 +375,11 @@ public class MultiEncodedCQKeyValueComparisonFilter extends BooleanExpressionFil
         this.minQualifier = minMaxQualifiers.getFirst();
         this.maxQualifier = minMaxQualifiers.getSecond();
     }
-    
+
+    public void setMinQualifier(int minQualifier) {
+        this.minQualifier = minQualifier;
+    }
+
     public static MultiEncodedCQKeyValueComparisonFilter parseFrom(final byte [] pbBytes) throws DeserializationException {
         try {
             return (MultiEncodedCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new MultiEncodedCQKeyValueComparisonFilter());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
new file mode 100644
index 0000000..520aff3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -0,0 +1,882 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
+import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
+import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.LockManager.RowLock;
+import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
+import org.apache.phoenix.hbase.index.builder.IndexBuilder;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
+import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
+import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
+import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.trace.TracingUtils;
+import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+/**
+ * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
+ * to an {@link IndexBuilder} to determine the actual updates to make.
+ * We don't need to implement {@link #postPut(ObserverContext, Put, WALEdit, Durability)} and
+ * {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} hooks because
+ * Phoenix always does batch mutations.
+ * <p>
+ */
+public class IndexRegionObserver extends BaseRegionObserver {
+
+  private static final Log LOG = LogFactory.getLog(IndexRegionObserver.class);
+  private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS);
+  private static final OperationStatus NOWRITE = new OperationStatus(OperationStatusCode.SUCCESS);
+    protected static final byte VERIFIED_BYTE = 1;
+    protected static final byte UNVERIFIED_BYTE = 2;
+    public static final byte[] UNVERIFIED_BYTES = new byte[] { UNVERIFIED_BYTE };
+    public static final byte[] VERIFIED_BYTES = new byte[] { VERIFIED_BYTE };
+
+  /**
+   * Class to represent pending data table rows
+   */
+  private static class PendingRow {
+      private long latestTimestamp;
+      private long count;
+
+      PendingRow(long latestTimestamp) {
+          count = 1;
+          this.latestTimestamp = latestTimestamp;
+      }
+
+      public void add(long timestamp) {
+          count++;
+          if (latestTimestamp < timestamp) {
+              latestTimestamp = timestamp;
+          }
+      }
+
+      public void remove() {
+          count--;
+      }
+
+      public long getCount() {
+          return count;
+      }
+
+      public long getLatestTimestamp() {
+          return latestTimestamp;
+      }
+  }
+
+  private static boolean skipPostIndexUpdatesForTesting = false;
+  private static boolean skipDataTableUpdatesForTesting = false;
+
+  public static void setSkipPostIndexUpdatesForTesting(boolean skip) {
+      skipPostIndexUpdatesForTesting = skip;
+  }
+
+  public static void setSkipDataTableUpdatesForTesting(boolean skip) {
+      skipDataTableUpdatesForTesting = skip;
+  }
+
+  // Hack to get around not being able to save any state between
+  // coprocessor calls. TODO: remove after HBASE-18127 when available
+  private static class BatchMutateContext {
+      private final int clientVersion;
+      // The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
+      // the verified column) will have the value false ("unverified") on these mutations
+      private Collection<Pair<Mutation, byte[]>> preIndexUpdates = Collections.emptyList();
+      // The collection of index mutations that will be applied after the data table mutations. The empty column (i.e.,
+      // the verified column) will have the value true ("verified") on the put mutations
+      private Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList();
+      // The collection of candidate index mutations that will be applied after the data table mutations
+      private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
+      private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+      // The set of row keys for the data table rows of this batch such that for each of these rows there exists another
+      // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the
+      // row (i.e., concurrent updates).
+      private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
+
+      private BatchMutateContext(int clientVersion) {
+          this.clientVersion = clientVersion;
+      }
+  }
+  
+  private ThreadLocal<BatchMutateContext> batchMutateContext =
+          new ThreadLocal<BatchMutateContext>();
+  
+  /** Configuration key for the {@link IndexBuilder} to use */
+  public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
+
+  /**
+   * Configuration key for if the indexer should check the version of HBase is running. Generally,
+   * you only want to ignore this for testing or for custom versions of HBase.
+   */
+  public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
+
+  private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
+
+  public static final String INDEX_LAZY_POST_BATCH_WRITE = "org.apache.hadoop.hbase.index.lazy.post_batch.write";
+  private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
+
+  private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold";
+  private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold";
+  private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000;
+  private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold";
+  private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment";
+  private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000;
+
+  // Index writers get invoked before and after data table updates
+  protected IndexWriter preWriter;
+  protected IndexWriter postWriter;
+
+  protected IndexBuildManager builder;
+  private LockManager lockManager;
+
+  // The collection of pending data table rows
+  private Map<ImmutableBytesPtr, PendingRow> pendingRows = new ConcurrentHashMap<>();
+
+  /**
+   * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
+   * more robust in the face of recoverying index regions that were on the same server as the
+   * primary table region
+   */
+  private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
+
+  private MetricsIndexerSource metricSource;
+
+  private boolean stopped;
+  private boolean disabled;
+  private long slowIndexWriteThreshold;
+  private long slowIndexPrepareThreshold;
+  private long slowPostOpenThreshold;
+  private long slowPreIncrementThreshold;
+  private int rowLockWaitDuration;
+
+  private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+      try {
+          final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+          String serverName = env.getRegionServerServices().getServerName().getServerName();
+          if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
+              // make sure the right version <-> combinations are allowed.
+              String errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration());
+              if (errormsg != null) {
+                  IOException ioe = new IOException(errormsg);
+                  env.getRegionServerServices().abort(errormsg, ioe);
+                  throw ioe;
+              }
+          }
+          this.builder = new IndexBuildManager(env);
+          // Clone the config since it is shared
+          DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION);
+          // setup the actual index writer
+          // setup the actual index preWriter
+          this.preWriter = new IndexWriter(indexWriterEnv, serverName + "-index-preWriter", false);
+          if (env.getConfiguration().getBoolean(INDEX_LAZY_POST_BATCH_WRITE, INDEX_LAZY_POST_BATCH_WRITE_DEFAULT)) {
+              this.postWriter = new IndexWriter(indexWriterEnv, new LazyParallelWriterIndexCommitter(), serverName + "-index-postWriter", false);
+          }
+          else {
+              this.postWriter = this.preWriter;
+          }
+
+          this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration",
+                DEFAULT_ROWLOCK_WAIT_DURATION);
+          this.lockManager = new LockManager();
+
+          // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
+          this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+          setSlowThresholds(e.getConfiguration());
+      } catch (NoSuchMethodError ex) {
+          disabled = true;
+          super.start(e);
+          LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
+      }
+  }
+
+  /**
+   * Extracts the slow call threshold values from the configuration.
+   */
+  private void setSlowThresholds(Configuration c) {
+      slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY,
+          INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT);
+      slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY,
+          INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT);
+      slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY,
+          INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT);
+      slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY,
+          INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT);
+  }
+
+  private String getCallTooSlowMessage(String callName, long duration, long threshold) {
+      StringBuilder sb = new StringBuilder(64);
+      sb.append("(callTooSlow) ").append(callName).append(" duration=").append(duration);
+      sb.append("ms, threshold=").append(threshold).append("ms");
+      return sb.toString();
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    if (this.stopped) {
+      return;
+    }
+    if (this.disabled) {
+        return;
+      }
+    this.stopped = true;
+    String msg = "Indexer is being stopped";
+    this.builder.stop(msg);
+    this.preWriter.stop(msg);
+    this.postWriter.stop(msg);
+  }
+
+  /**
+     * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
+     * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
+     * real increment, though, it's really more of a Put. We translate the Increment into a
+     * list of mutations, at most a single Put and Delete that are the changes upon executing
+     * the list of ON DUPLICATE KEY clauses for this row.
+   */
+  @Override
+  public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
+                                           final Increment inc) throws IOException {
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+      try {
+          List<Mutation> mutations = this.builder.executeAtomicOp(inc);
+          if (mutations == null) {
+              return null;
+          }
+          // Causes the Increment to be ignored as we're committing the mutations
+          // ourselves below.
+          e.bypass();
+          e.complete();
+          // ON DUPLICATE KEY IGNORE will return empty list if row already exists
+          // as no action is required in that case.
+          if (!mutations.isEmpty()) {
+              Region region = e.getEnvironment().getRegion();
+              // Otherwise, submit the mutations directly here
+              region.batchMutate(mutations.toArray(new Mutation[0]), HConstants.NO_NONCE,
+                      HConstants.NO_NONCE);
+          }
+          return Result.EMPTY_RESULT;
+      } catch (Throwable t) {
+          throw ServerUtil.createIOException(
+                  "Unable to process ON DUPLICATE IGNORE for " +
+                          e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() +
+                          "(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
+      } finally {
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexPrepareThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("preIncrementAfterRowLock", duration, slowPreIncrementThreshold));
+              }
+              metricSource.incrementSlowDuplicateKeyCheckCalls();
+          }
+          metricSource.updateDuplicateKeyCheckTime(duration);
+      }
+  }
+
+  @Override
+  public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      if (this.disabled) {
+          return;
+      }
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+      try {
+          preBatchMutateWithExceptions(c, miniBatchOp);
+          return;
+      } catch (Throwable t) {
+          rethrowIndexingException(t);
+      } finally {
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexPrepareThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold));
+              }
+              metricSource.incrementNumSlowIndexPrepareCalls();
+          }
+          metricSource.updateIndexPrepareTime(duration);
+      }
+      throw new RuntimeException(
+        "Somehow didn't return an index update but also didn't propagate the failure to the client!");
+  }
+
+  private long getMaxTimestamp(Mutation m) {
+      long maxTs = 0;
+      long ts = 0;
+      Iterator iterator = m.getFamilyCellMap().entrySet().iterator();
+      while (iterator.hasNext()) {
+          Map.Entry<byte[], List<Cell>> entry = (Map.Entry) iterator.next();
+          Iterator<Cell> cellIterator = entry.getValue().iterator();
+          while (cellIterator.hasNext()) {
+              Cell cell = cellIterator.next();
+              ts = cell.getTimestamp();
+              if (ts > maxTs) {
+                  maxTs = ts;
+              }
+          }
+      }
+      return maxTs;
+  }
+
+  private void ignoreAtomicOperations (MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          Mutation m = miniBatchOp.getOperation(i);
+          if (this.builder.isAtomicOp(m)) {
+              miniBatchOp.setOperationStatus(i, IGNORE);
+              continue;
+          }
+      }
+  }
+
+  private void lockRows(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) throws IOException {
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+              continue;
+          }
+          Mutation m = miniBatchOp.getOperation(i);
+          if (this.builder.isEnabled(m)) {
+              context.rowLocks.add(lockManager.lockRow(m.getRow(), rowLockWaitDuration));
+          }
+      }
+  }
+
+  private void populatePendingRows(BatchMutateContext context, long now) {
+      for (RowLock rowLock : context.rowLocks) {
+          ImmutableBytesPtr rowKey = rowLock.getRowKey();
+          PendingRow pendingRow = pendingRows.get(rowKey);
+          if (pendingRow == null) {
+              pendingRows.put(rowKey, new PendingRow(now));
+          } else {
+              // m is a mutation on a row that has already a pending mutation in progress from another batch
+              pendingRow.add(now);
+              context.pendingRows.add(rowKey);
+          }
+      }
+  }
+
+  private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                        long now, ReplayWrite replayWrite) throws IOException {
+      Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
+      boolean copyMutations = false;
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+              continue;
+          }
+          Mutation m = miniBatchOp.getOperation(i);
+          if (this.builder.isEnabled(m)) {
+              // Track whether or not we need to
+              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+              if (mutationsMap.containsKey(row)) {
+                  copyMutations = true;
+              } else {
+                  mutationsMap.put(row, null);
+              }
+          }
+      }
+      // early exit if it turns out we don't have any edits
+      if (mutationsMap.isEmpty()) {
+          return null;
+      }
+      // If we're copying the mutations
+      Collection<Mutation> originalMutations;
+      Collection<? extends Mutation> mutations;
+      if (copyMutations) {
+          originalMutations = null;
+          mutations = mutationsMap.values();
+      } else {
+          originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size());
+          mutations = originalMutations;
+      }
+
+      boolean resetTimeStamp = replayWrite == null;
+
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          Mutation m = miniBatchOp.getOperation(i);
+          // skip this mutation if we aren't enabling indexing
+          // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
+          // should be indexed, which means we need to expose another method on the builder. Such is the
+          // way optimization go though.
+          if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) {
+              if (resetTimeStamp) {
+                  // Unless we're replaying edits to rebuild the index, we update the time stamp
+                  // of the data table to prevent overlapping time stamps (which prevents index
+                  // inconsistencies as this case isn't handled correctly currently).
+                  for (List<Cell> cells : m.getFamilyCellMap().values()) {
+                      for (Cell cell : cells) {
+                          CellUtil.setTimestamp(cell, now);
+                      }
+                  }
+              }
+              // No need to write the table mutations when we're rebuilding
+              // the index as they're already written and just being replayed.
+              if (replayWrite == ReplayWrite.INDEX_ONLY
+                      || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY || skipDataTableUpdatesForTesting) {
+                  miniBatchOp.setOperationStatus(i, NOWRITE);
+              }
+
+              // Only copy mutations if we found duplicate rows
+              // which only occurs when we're partially rebuilding
+              // the index (since we'll potentially have both a
+              // Put and a Delete mutation for the same row).
+              if (copyMutations) {
+                  // Add the mutation to the batch set
+
+                  ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+                  MultiMutation stored = mutationsMap.get(row);
+                  // we haven't seen this row before, so add it
+                  if (stored == null) {
+                      stored = new MultiMutation(row);
+                      mutationsMap.put(row, stored);
+                  }
+                  stored.addAll(m);
+              } else {
+                  originalMutations.add(m);
+              }
+          }
+      }
+
+      if (copyMutations || replayWrite != null) {
+          mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations);
+      }
+      return mutations;
+  }
+
+  private void prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+                                     MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context,
+                                     Collection<? extends Mutation> mutations) throws Throwable {
+      IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
+      if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
+          throw new DoNotRetryIOException(
+                  "preBatchMutateWithExceptions: indexMetaData is not an instance of PhoenixIndexMetaData " +
+                          c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+      }
+      List<IndexMaintainer> maintainers = ((PhoenixIndexMetaData)indexMetaData).getIndexMaintainers();
+
+      List<Pair<Mutation, byte[]>> indexUpdatesForDeletes;
+      // get the current span, or just use a null-span to avoid a bunch of if statements
+      try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
+          Span current = scope.getSpan();
+          if (current == null) {
+              current = NullSpan.INSTANCE;
+          }
+          long start = EnvironmentEdgeManager.currentTimeMillis();
+
+          // get the index updates for all elements in this batch
+          Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
+                  this.builder.getIndexUpdates(miniBatchOp, mutations);
+
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexPrepareThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold));
+              }
+              metricSource.incrementNumSlowIndexPrepareCalls();
+          }
+          metricSource.updateIndexPrepareTime(duration);
+          current.addTimelineAnnotation("Built index updates, doing preStep");
+          TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
+          byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+          List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+          indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
+          context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
+          while(indexUpdatesItr.hasNext()) {
+              Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
+              if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
+                  localUpdates.add(next.getFirst().getFirst());
+                  indexUpdatesItr.remove();
+              }
+              else {
+                  // get index maintainer for this index table
+                  IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
+                  if (indexMaintainer == null) {
+                      throw new DoNotRetryIOException(
+                              "preBatchMutateWithExceptions: indexMaintainer is null " +
+                                      c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+                  }
+                  byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+                  byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+                  // add the VERIFIED cell, which is the empty cell
+                  Mutation m = next.getFirst().getFirst();
+                  boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
+                  long ts = getMaxTimestamp(m);
+                  if (rebuild) {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+                      }
+                  } else {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES);
+                          // Ignore post index updates (i.e., the third write phase updates) for this row if it is
+                          // going through concurrent updates
+                          ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
+                          if (!context.pendingRows.contains(rowKey)) {
+                              Put put = new Put(m.getRow());
+                              put.addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+                              context.intermediatePostIndexUpdates.add(new Pair<>(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()), next.getSecond()));
+                          }
+                      } else {
+                          // For a delete mutation, first unverify the existing row in the index table and then delete
+                          // the row from the index table after deleting the corresponding row from the data table
+                          indexUpdatesItr.remove();
+                          Put put = new Put(m.getRow());
+                          put.addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES);
+                          indexUpdatesForDeletes.add(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()));
+                          // Ignore post index updates (i.e., the third write phase updates) for this row if it is
+                          // going through concurrent updates
+                          ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
+                          if (!context.pendingRows.contains(rowKey)) {
+                              context.intermediatePostIndexUpdates.add(next);
+                          }
+                      }
+                  }
+              }
+          }
+          if (!localUpdates.isEmpty()) {
+              miniBatchOp.addOperationsFromCP(0,
+                      localUpdates.toArray(new Mutation[localUpdates.size()]));
+          }
+          if (!indexUpdatesForDeletes.isEmpty()) {
+              context.preIndexUpdates = indexUpdatesForDeletes;
+          }
+
+          if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
+              context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
+          }
+          for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
+              context.preIndexUpdates.add(update.getFirst());
+          }
+      }
+  }
+
+  public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
+          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
+      ignoreAtomicOperations(miniBatchOp);
+      BatchMutateContext context = new BatchMutateContext(this.builder.getIndexMetaData(miniBatchOp).getClientVersion());
+      setBatchMutateContext(c, context);
+      Mutation firstMutation = miniBatchOp.getOperation(0);
+      ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+      /*
+       * Exclusively lock all rows so we get a consistent read
+       * while determining the index updates
+       */
+      if (replayWrite == null) {
+          lockRows(miniBatchOp, context);
+      }
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
+      // concurrent updates
+      if (replayWrite == null) {
+          populatePendingRows(context, now);
+      }
+      // First group all the updates for a single row into a single update to be processed
+      Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
+      // early exit if it turns out we don't have any edits
+      if (mutations == null) {
+          return;
+      }
+      prepareIndexMutations(c, miniBatchOp, context, mutations);
+      // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
+      // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates
+      // can be prepared in less than one millisecond
+      if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) {
+          Thread.sleep(1);
+          LOG.debug("slept 1ms for " + c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+      }
+      // Release the locks before making RPC calls for index updates
+      for (RowLock rowLock : context.rowLocks) {
+          rowLock.release();
+      }
+      // Do the index updates
+      doPre(c, context, miniBatchOp);
+      if (replayWrite == null) {
+          // Acquire the locks again before letting the region proceed with data table updates
+          List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
+          for (RowLock rowLock : context.rowLocks) {
+              rowLocks.add(lockManager.lockRow(rowLock.getRowKey(), rowLockWaitDuration));
+          }
+          context.rowLocks.clear();
+          context.rowLocks = rowLocks;
+          // Check if we need to skip post index update for any of the row
+          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> iterator = context.intermediatePostIndexUpdates.iterator();
+          while (iterator.hasNext()) {
+              // Check if this row is going through another mutation which has a newer timestamp. If so,
+              // ignore the pending updates for this row
+              Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
+              ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+              PendingRow pendingRow = pendingRows.get(rowKey);
+              // Has any concurrent mutation arrived for the same row? if so, skip post index updates
+              // and let read repair resolve conflicts
+              if (pendingRow.getLatestTimestamp() > now) {
+                  iterator.remove();
+              }
+          }
+          // We are done with handling concurrent mutations. So we can remove the rows of this batch from
+          // the collection of pending rows
+          removePendingRows(context);
+      }
+      if (context.postIndexUpdates.isEmpty() && !context.intermediatePostIndexUpdates.isEmpty()) {
+          context.postIndexUpdates = new ArrayList<>(context.intermediatePostIndexUpdates.size());
+      }
+      for (Pair<Pair<Mutation, byte[]>, byte[]> update : context.intermediatePostIndexUpdates) {
+          context.postIndexUpdates.add(update.getFirst());
+      }
+  }
+
+  private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
+      this.batchMutateContext.set(context);
+  }
+  
+  private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+      return this.batchMutateContext.get();
+  }
+  
+  private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+      this.batchMutateContext.remove();
+  }
+
+  @Override
+  public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
+      MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
+      if (this.disabled) {
+          return;
+      }
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+      BatchMutateContext context = getBatchMutateContext(c);
+      if (context == null) {
+          return;
+      }
+      try {
+          for (RowLock rowLock : context.rowLocks) {
+              rowLock.release();
+          }
+          this.builder.batchCompleted(miniBatchOp);
+
+          if (success) { // The pre-index and data table updates are successful, and now, do post index updates
+              if (!skipPostIndexUpdatesForTesting) {
+                  doPost(c, context);
+              }
+          }
+       } finally {
+           removeBatchMutateContext(c);
+           long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+           if (duration >= slowIndexWriteThreshold) {
+               if (LOG.isDebugEnabled()) {
+                   LOG.debug(getCallTooSlowMessage("postBatchMutateIndispensably", duration, slowIndexWriteThreshold));
+               }
+               metricSource.incrementNumSlowIndexWriteCalls();
+           }
+           metricSource.updateIndexWriteTime(duration);
+       }
+  }
+
+  private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) throws IOException {
+      try {
+          doIndexWritesWithExceptions(context, true);
+          return;
+      } catch (Throwable e) {
+          rethrowIndexingException(e);
+      }
+      throw new RuntimeException(
+              "Somehow didn't complete the index update, but didn't return succesfully either!");
+  }
+
+  private void doIndexWritesWithExceptions(BatchMutateContext context, boolean post)
+            throws IOException {
+      Collection<Pair<Mutation, byte[]>> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
+      //short circuit, if we don't need to do any work
+
+      if (context == null || indexUpdates.isEmpty()) {
+          return;
+      }
+
+      // get the current span, or just use a null-span to avoid a bunch of if statements
+      try (TraceScope scope = Trace.startSpan("Completing " + (post ? "post" : "pre") + " index writes")) {
+          Span current = scope.getSpan();
+          if (current == null) {
+              current = NullSpan.INSTANCE;
+          }
+          long start = EnvironmentEdgeManager.currentTimeMillis();
+          current.addTimelineAnnotation("Actually doing " + (post ? "post" : "pre") + " index update for first time");
+          if (post) {
+              postWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion);
+          } else {
+              preWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion);
+          }
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexWriteThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("indexWrite", duration, slowIndexWriteThreshold));
+              }
+              metricSource.incrementNumSlowIndexWriteCalls();
+          }
+          metricSource.updateIndexWriteTime(duration);
+      }
+  }
+
+  private void removePendingRows(BatchMutateContext context) {
+      for (RowLock rowLock : context.rowLocks) {
+          ImmutableBytesPtr rowKey = rowLock.getRowKey();
+          PendingRow pendingRow = pendingRows.get(rowKey);
+          if (pendingRow != null) {
+              pendingRow.remove();
+              if (pendingRow.getCount() == 0) {
+                  pendingRows.remove(rowKey);
+              }
+          }
+      }
+  }
+
+  private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
+                     MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      try {
+          doIndexWritesWithExceptions(context, false);
+          return;
+      } catch (Throwable e) {
+          removePendingRows(context);
+          rethrowIndexingException(e);
+      }
+      throw new RuntimeException(
+              "Somehow didn't complete the index update, but didn't return succesfully either!");
+  }
+
+  @Override
+  public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c) {
+    Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion());
+
+    if (this.disabled) {
+        return;
+    }
+
+    long start = EnvironmentEdgeManager.currentTimeMillis();
+    try {
+        //if we have no pending edits to complete, then we are done
+        if (updates == null || updates.size() == 0) {
+          return;
+        }
+
+        LOG.info("Found some outstanding index updates that didn't succeed during"
+                + " WAL replay - attempting to replay now.");
+
+        // do the usual preWriter stuff
+        try {
+            preWriter.writeAndHandleFailure(updates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
+        } catch (IOException e) {
+                LOG.error("During WAL replay of outstanding index updates, "
+                        + "Exception is thrown instead of killing server during index writing", e);
+        }
+    } finally {
+         long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+         if (duration >= slowPostOpenThreshold) {
+             if (LOG.isDebugEnabled()) {
+                 LOG.debug(getCallTooSlowMessage("postOpen", duration, slowPostOpenThreshold));
+             }
+             metricSource.incrementNumSlowPostOpenCalls();
+         }
+         metricSource.updatePostOpenTime(duration);
+    }
+  }
+
+  /**
+   * Exposed for testing!
+   * @return the currently instantiated index builder
+   */
+  public IndexBuilder getBuilderForTesting() {
+    return this.builder.getBuilderForTesting();
+  }
+
+  /**
+   * Enable indexing on the given table
+   * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled
+   * @param builder class to use when building the index for this table
+   * @param properties map of custom configuration options to make available to your
+   *          {@link IndexBuilder} on the server-side
+   * @param priority TODO
+   * @throws IOException the Indexer coprocessor cannot be added
+   */
+  public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBuilder> builder,
+                                    Map<String, String> properties, int priority) throws IOException {
+      if (properties == null) {
+          properties = new HashMap<String, String>();
+      }
+      properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName());
+      desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties);
+  }
+}
+
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
index c65fc9b..cb5fd22 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
@@ -51,16 +51,13 @@ public class LockManager {
 
     /**
      * Lock the row or throw otherwise
-     * @param row the row key
+     * @param rowKey the row key
      * @return RowLock used to eventually release the lock 
      * @throws TimeoutIOException if the lock could not be acquired within the
      * allowed rowLockWaitDuration and InterruptedException if interrupted while
      * waiting to acquire lock.
      */
-    public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
-        // create an object to use a a key in the row lock map
-        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
-
+    public RowLock lockRow(ImmutableBytesPtr rowKey, int waitDuration) throws IOException {
         RowLockContext rowLockContext = null;
         RowLockImpl result = null;
         TraceScope traceScope = null;
@@ -116,6 +113,12 @@ public class LockManager {
         }
     }
 
+    public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
+        // create an object to use a a key in the row lock map
+        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
+        return lockRow(rowKey, waitDuration);
+    }
+
     /**
      * Unlock the row. We need this stateless way of unlocking because
      * we have no means of passing the RowLock instances between
@@ -226,6 +229,11 @@ public class LockManager {
         }
 
         @Override
+        public ImmutableBytesPtr getRowKey() {
+            return context.rowKey;
+        }
+
+        @Override
         public String toString() {
             return "RowLockImpl{" +
                     "context=" + context +
@@ -247,6 +255,8 @@ public class LockManager {
          *     thread
          */
         void release();
+
+        ImmutableBytesPtr getRowKey();
     }
 
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index 69d135d..f96bf9d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -87,13 +87,13 @@ public abstract class BaseIndexBuilder implements IndexBuilder {
      * @throws IOException
      */
     @Override
-    public boolean isEnabled(Mutation m) throws IOException {
+    public boolean isEnabled(Mutation m) {
         // ask the codec to see if we should even attempt indexing
         return this.codec.isEnabled(m);
     }
 
     @Override
-    public boolean isAtomicOp(Mutation m) throws IOException {
+    public boolean isAtomicOp(Mutation m) {
         return false;
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
index 7489a8c..89c751e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
@@ -28,10 +28,9 @@ public abstract class BaseIndexCodec implements IndexCodec {
      * <p>
      * By default, the codec is always enabled. Subclasses should override this method if they want do
      * decide to index on a per-mutation basis.
-     * @throws IOException 
      */
     @Override
-    public boolean isEnabled(Mutation m) throws IOException {
+    public boolean isEnabled(Mutation m) {
         return true;
     }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index cf5dad2..6b7e416 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -79,7 +79,7 @@ public class IndexBuildManager implements Stoppable {
       return this.delegate.getIndexMetaData(miniBatchOp);
   }
 
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
+  public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates(
       MiniBatchOperationInProgress<Mutation> miniBatchOp,
       Collection<? extends Mutation> mutations) throws Throwable {
     // notify the delegate that we have started processing a batch
@@ -87,7 +87,7 @@ public class IndexBuildManager implements Stoppable {
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
-    ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
+    ArrayList<Pair<Pair<Mutation, byte[]>, byte[]>> results = new ArrayList<>(mutations.size());
     for (Mutation m : mutations) {
       Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
       if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
@@ -96,6 +96,30 @@ public class IndexBuildManager implements Stoppable {
                 BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
           }
       }
+      for (Pair<Mutation, byte[]> update : updates) {
+        results.add(new Pair<>(update, m.getRow()));
+      }
+    }
+    return results;
+  }
+
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
+          MiniBatchOperationInProgress<Mutation> miniBatchOp,
+          Collection<? extends Mutation> mutations) throws Throwable {
+    // notify the delegate that we have started processing a batch
+    final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
+    this.delegate.batchStarted(miniBatchOp, indexMetaData);
+
+    // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
+    ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
+    for (Mutation m : mutations) {
+      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+      if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
+        for (Pair<Mutation, byte[]> update : updates) {
+          update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                  BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+        }
+      }
       results.addAll(updates);
     }
     return results;
@@ -116,11 +140,11 @@ public class IndexBuildManager implements Stoppable {
     delegate.batchStarted(miniBatchOp, indexMetaData);
   }
 
-  public boolean isEnabled(Mutation m) throws IOException {
+  public boolean isEnabled(Mutation m) {
     return delegate.isEnabled(m);
   }
 
-  public boolean isAtomicOp(Mutation m) throws IOException {
+  public boolean isAtomicOp(Mutation m) {
     return delegate.isAtomicOp(m);
   }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index a00294c..d3b9d6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -132,7 +132,7 @@ public interface IndexBuilder extends Stoppable {
    *         basis, as each codec is instantiated per-region.
  * @throws IOException 
    */
-  public boolean isEnabled(Mutation m) throws IOException;
+  public boolean isEnabled(Mutation m);
   
   /**
    * True if mutation has an ON DUPLICATE KEY clause
@@ -140,7 +140,7 @@ public interface IndexBuilder extends Stoppable {
    * @return true if mutation has ON DUPLICATE KEY expression and false otherwise.
    * @throws IOException
    */
-  public boolean isAtomicOp(Mutation m) throws IOException;
+  public boolean isAtomicOp(Mutation m);
 
   /**
    * Calculate the mutations based on the ON DUPLICATE KEY clause
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index 0b70de3..6762a30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -82,7 +82,7 @@ public interface IndexCodec {
      *         codec is instantiated per-region.
      * @throws IOException
      */
-    public boolean isEnabled(Mutation m) throws IOException;
+    public boolean isEnabled(Mutation m);
 
     public void initialize(Configuration conf, byte[] tableName);
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
similarity index 68%
copy from phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
copy to phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
index 3711915..9e94e87 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java
@@ -1,11 +1,19 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.phoenix.hbase.index.write;
 
@@ -14,18 +22,17 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
-import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
 import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
 import org.apache.phoenix.hbase.index.parallel.Task;
 import org.apache.phoenix.hbase.index.parallel.TaskBatch;
@@ -36,51 +43,46 @@ import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
 import org.apache.phoenix.util.IndexUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Multimap;
 
 /**
- * Write index updates to the index tables in parallel. We attempt to early exit from the writes if any of the index
- * updates fails. Completion is determined by the following criteria: *
- * <ol>
- * <li>All index writes have returned, OR</li>
- * <li>Any single index write has failed</li>
- * </ol>
- * We attempt to quickly determine if any write has failed and not write to the remaining indexes to ensure a timely
- * recovery of the failed index writes.
+ * Abstract class to Write index updates to the index tables in parallel.
  */
-public class ParallelWriterIndexCommitter implements IndexCommitter {
+public abstract class AbstractParallelWriterIndexCommitter implements IndexCommitter {
 
     public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
     private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
     public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
-    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelWriterIndexCommitter.class);
+    private static final Log LOG = LogFactory.getLog(AbstractParallelWriterIndexCommitter.class);
 
-    private HTableFactory retryingFactory;
-    private HTableFactory noRetriesfactory;
-    private Stoppable stopped;
-    private QuickFailingTaskRunner pool;
-    private KeyValueBuilder kvBuilder;
-    private RegionCoprocessorEnvironment env;
+    protected HTableFactory retryingFactory;
+    protected HTableFactory noRetriesFactory;
+    protected Stoppable stopped;
+    protected QuickFailingTaskRunner pool;
+    protected KeyValueBuilder kvBuilder;
+    protected RegionCoprocessorEnvironment env;
+    protected TaskBatch<Void> tasks;
+    protected boolean disableIndexOnFailure = false;
 
-    public ParallelWriterIndexCommitter() {}
+
+    public AbstractParallelWriterIndexCommitter() {}
 
     // For testing
-    public ParallelWriterIndexCommitter(String hbaseVersion) {
+    public AbstractParallelWriterIndexCommitter(String hbaseVersion) {
         kvBuilder = KeyValueBuilder.get(hbaseVersion);
     }
 
     @Override
-    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure) {
         this.env = env;
+        this.disableIndexOnFailure = disableIndexOnFailure;
         Configuration conf = env.getConfiguration();
         setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
                 ThreadPoolManager.getExecutor(
                         new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
                                 DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
-                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), parent, env);
         this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
     }
 
@@ -89,11 +91,12 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
      * <p>
      * Exposed for TESTING
      */
-    void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) {
+    public void setup(HTableFactory factory, ExecutorService pool,Stoppable stop, RegionCoprocessorEnvironment env) {
         this.retryingFactory = factory;
-        this.noRetriesfactory = IndexWriterUtils.getNoRetriesHTableFactory(env);
+        this.noRetriesFactory = IndexWriterUtils.getNoRetriesHTableFactory(env);
         this.pool = new QuickFailingTaskRunner(pool);
         this.stopped = stop;
+        this.env = env;
     }
 
     @Override
@@ -109,7 +112,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
          */
 
         Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
-        TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
+        tasks = new TaskBatch<Void>(entries.size());
         for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
             // get the mutations for each table. We leak the implementation here a little bit to save
             // doing a complete copy over of all the index update for each table.
@@ -118,7 +121,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
 			if (env != null
 					&& !allowLocalUpdates
 					&& tableReference.getTableName().equals(
-							env.getRegion().getTableDesc().getNameAsString())) {
+							env.getRegion().getTableDesc().getTableName().getNameAsString())) {
 				continue;
 			}
             /*
@@ -140,37 +143,42 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
                 @SuppressWarnings("deprecation")
                 @Override
                 public Void call() throws Exception {
+                    Table table = null;
                     // this may have been queued, so another task infront of us may have failed, so we should
                     // early exit, if that's the case
                     throwFailureIfDone();
 
-                    if (LOGGER.isTraceEnabled()) {
-                        LOGGER.trace("Writing index update:" + mutations + " to table: "
-                                + tableReference);
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
                     }
-                    HTableInterface table = null;
                     try {
                         if (allowLocalUpdates
                                 && env != null
                                 && tableReference.getTableName().equals(
-                                    env.getRegion().getTableDesc().getNameAsString())) {
+                                    env.getRegion().getTableDesc().getTableName().getNameAsString())) {
                             try {
                                 throwFailureIfDone();
                                 IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);
                                 return null;
-                            } catch (IOException ignord) {
+                            } catch (IOException ignored) {
                                 // when it's failed we fall back to the standard & slow way
-                                if (LOGGER.isDebugEnabled()) {
-                                    LOGGER.debug("indexRegion.batchMutate failed and fall back " +
-                                            "to HTable.batch(). Got error=" + ignord);
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
+                                            + ignored);
                                 }
                             }
                         }
-                     // if the client can retry index writes, then we don't need to retry here
-                        HTableFactory factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesfactory;
+                        // if the client can retry index writes, then we don't need to retry here
+                        HTableFactory factory;
+                        if (disableIndexOnFailure) {
+                            factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesFactory;
+                        }
+                        else {
+                            factory = retryingFactory;
+                        }
                         table = factory.getTable(tableReference.get());
                         throwFailureIfDone();
-                        table.batch(mutations);
+                        table.batch(mutations, null);
                     } catch (SingleIndexWriteFailureException e) {
                         throw e;
                     } catch (IOException e) {
@@ -195,20 +203,9 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
                 }
             });
         }
-
-        // actually submit the tasks to the pool and wait for them to finish/fail
-        try {
-            pool.submitUninterruptible(tasks);
-        } catch (EarlyExitFailure e) {
-            propagateFailure(e);
-        } catch (ExecutionException e) {
-            LOGGER.error("Found a failed index update!");
-            propagateFailure(e.getCause());
-        }
-
     }
 
-    private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
+    protected void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
         try {
             throw throwable;
         } catch (SingleIndexWriteFailureException e1) {
@@ -230,14 +227,14 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
      */
     @Override
     public void stop(String why) {
-        LOGGER.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
+        LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
         this.pool.stop(why);
         this.retryingFactory.shutdown();
-        this.noRetriesfactory.shutdown();
+        this.noRetriesFactory.shutdown();
     }
 
     @Override
     public boolean isStopped() {
         return this.stopped.isStopped();
     }
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
index e9dc202..1eb9ff1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
@@ -30,7 +30,7 @@ import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
  */
 public interface IndexCommitter extends Stoppable {
 
-  void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
+  void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure);
 
   public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates, int clientVersion)
       throws IndexWriteException;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index 86624fa..41ba6bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -39,8 +39,7 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
 /**
- * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
- * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
+ * Do the actual work of writing to the index tables.
  * <p>
  * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon
  * threads, so it will not block the region from shutting down.
@@ -59,12 +58,15 @@ public class IndexWriter implements Stoppable {
    *           instantiated
    */
   public IndexWriter(RegionCoprocessorEnvironment env, String name) throws IOException {
-    this(getCommitter(env), getFailurePolicy(env), env, name);
+    this(getCommitter(env), getFailurePolicy(env), env, name, true);
   }
 
-  public IndexWriter(IndexFailurePolicy failurePolicy, RegionCoprocessorEnvironment env, String name) throws IOException {
-      this(getCommitter(env), failurePolicy, env, name);
-    }
+  public IndexWriter(RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure) throws IOException {
+    this(getCommitter(env), getFailurePolicy(env), env, name, disableIndexOnFailure);
+  }
+  public IndexWriter(RegionCoprocessorEnvironment env, IndexCommitter indexCommitter, String name, boolean disableIndexOnFailure) throws IOException {
+    this(indexCommitter, getFailurePolicy(env), env, name, disableIndexOnFailure);
+  }
 
   public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
       return getCommitter(env,TrackingParallelWriterIndexCommitter.class);
@@ -99,6 +101,12 @@ public class IndexWriter implements Stoppable {
     }
   }
 
+  public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
+                     RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure) {
+    this(committer, policy);
+    this.writer.setup(this, env, name, disableIndexOnFailure);
+    this.failurePolicy.setup(this, env);
+  }
   /**
    * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected
    * to be fully setup before calling.
@@ -109,7 +117,7 @@ public class IndexWriter implements Stoppable {
   public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
       RegionCoprocessorEnvironment env, String name) {
     this(committer, policy);
-    this.writer.setup(this, env, name);
+    this.writer.setup(this, env, name, true);
     this.failurePolicy.setup(this, env);
   }
 
@@ -131,27 +139,64 @@ public class IndexWriter implements Stoppable {
    * write the index updates. When we return depends on the specified {@link IndexCommitter}.
    * <p>
    * If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
-   * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
-   * which ensures that the server crashes when an index write fails, ensuring that we get WAL
-   * replay of the index edits.
+   * then decides how to handle the failure.
    * @param indexUpdates Updates to write
  * @param clientVersion version of the client
  * @throws IOException 
    */
-    public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates,
-            boolean allowLocalUpdates, int clientVersion) throws IOException {
+    public void writeAndHandleFailure(Collection<Pair<Mutation, byte[]>> indexUpdates,
+                                      boolean allowLocalUpdates, int clientVersion) throws IOException {
     // convert the strings to htableinterfaces to which we can talk and group by TABLE
     Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
-    writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates, clientVersion);
+    writeAndHandleFailure(toWrite, allowLocalUpdates, clientVersion);
+  }
+
+  /**
+   * see {@link #writeAndHandleFailure(Collection)}.
+   * @param toWrite
+   * @throws IOException
+   */
+    public void writeAndHandleFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
+                                      boolean allowLocalUpdates, int clientVersion) throws IOException {
+    try {
+      write(toWrite, allowLocalUpdates, clientVersion);
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Done writing all index updates!\n\t" + toWrite);
+      }
+    } catch (Exception e) {
+      this.failurePolicy.handleFailure(toWrite, e);
+    }
+  }
+
+  /**
+   * Write the mutations to their respective table.
+   * <p>
+   * This method is blocking and could potentially cause the writer to block for a long time as we
+   * write the index updates. When we return depends on the specified {@link IndexCommitter}.
+   * <p>
+   * If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
+   * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
+   * which ensures that the server crashes when an index write fails, ensuring that we get WAL
+   * replay of the index edits.
+   * @param indexUpdates Updates to write
+   * @param clientVersion version of the client
+   * @throws IOException
+   */
+  public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates,
+                                            boolean allowLocalUpdates, int clientVersion) throws IOException {
+      // convert the strings to htableinterfaces to which we can talk and group by TABLE
+      Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
+      writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates, clientVersion);
+      writeAndHandleFailure(toWrite, allowLocalUpdates, clientVersion);
   }
 
   /**
    * see {@link #writeAndKillYourselfOnFailure(Collection)}.
    * @param toWrite
- * @throws IOException 
+   * @throws IOException
    */
-    public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
-            boolean allowLocalUpdates, int clientVersion) throws IOException {
+  public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
+                                            boolean allowLocalUpdates, int clientVersion) throws IOException {
     try {
       write(toWrite, allowLocalUpdates, clientVersion);
       if (LOGGER.isTraceEnabled()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
similarity index 58%
copy from phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
copy to phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
index 7489a8c..f070278 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java
@@ -15,23 +15,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.hbase.index.builder;
+package org.apache.phoenix.hbase.index.write;
 
-import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
 
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.phoenix.hbase.index.covered.IndexCodec;
+/**
+ * Like the {@link ParallelWriterIndexCommitter}, but does not block
+ *
+ *
+ */
+public class LazyParallelWriterIndexCommitter extends AbstractParallelWriterIndexCommitter {
+
+    // for testing
+    public LazyParallelWriterIndexCommitter(String hbaseVersion) {
+        super(hbaseVersion);
+    }
 
-public abstract class BaseIndexCodec implements IndexCodec {
-    /**
-     * {@inheritDoc}
-     * <p>
-     * By default, the codec is always enabled. Subclasses should override this method if they want do
-     * decide to index on a per-mutation basis.
-     * @throws IOException 
-     */
-    @Override
-    public boolean isEnabled(Mutation m) throws IOException {
-        return true;
+    public LazyParallelWriterIndexCommitter() {
+        super();
     }
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 3711915..e999d5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -9,33 +9,12 @@
  */
 package org.apache.phoenix.hbase.index.write;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
-import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
-import org.apache.phoenix.hbase.index.parallel.Task;
-import org.apache.phoenix.hbase.index.parallel.TaskBatch;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
-import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
-import org.apache.phoenix.util.IndexUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,151 +30,19 @@ import com.google.common.collect.Multimap;
  * We attempt to quickly determine if any write has failed and not write to the remaining indexes to ensure a timely
  * recovery of the failed index writes.
  */
-public class ParallelWriterIndexCommitter implements IndexCommitter {
-
-    public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
-    private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
-    public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
+public class ParallelWriterIndexCommitter extends AbstractParallelWriterIndexCommitter {
     private static final Logger LOGGER = LoggerFactory.getLogger(ParallelWriterIndexCommitter.class);
 
-    private HTableFactory retryingFactory;
-    private HTableFactory noRetriesfactory;
-    private Stoppable stopped;
-    private QuickFailingTaskRunner pool;
-    private KeyValueBuilder kvBuilder;
-    private RegionCoprocessorEnvironment env;
-
     public ParallelWriterIndexCommitter() {}
 
     // For testing
     public ParallelWriterIndexCommitter(String hbaseVersion) {
-        kvBuilder = KeyValueBuilder.get(hbaseVersion);
-    }
-
-    @Override
-    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
-        this.env = env;
-        Configuration conf = env.getConfiguration();
-        setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
-                ThreadPoolManager.getExecutor(
-                        new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
-                                DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
-                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
-        this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
-    }
-
-    /**
-     * Setup <tt>this</tt>.
-     * <p>
-     * Exposed for TESTING
-     */
-    void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) {
-        this.retryingFactory = factory;
-        this.noRetriesfactory = IndexWriterUtils.getNoRetriesHTableFactory(env);
-        this.pool = new QuickFailingTaskRunner(pool);
-        this.stopped = stop;
+        super(hbaseVersion);
     }
 
     @Override
     public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates, final int clientVersion) throws SingleIndexWriteFailureException {
-        /*
-         * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in
-         * parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets
-         * tricky is that we want to block the calling thread until one of two things happens: (1) all index tables get
-         * successfully updated, or (2) any one of the index table writes fail; in either case, we should return as
-         * quickly as possible. We get a little more complicated in that if we do get a single failure, but any of the
-         * index writes hasn't been started yet (its been queued up, but not submitted to a thread) we want to that task
-         * to fail immediately as we know that write is a waste and will need to be replayed anyways.
-         */
-
-        Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
-        TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
-        for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
-            // get the mutations for each table. We leak the implementation here a little bit to save
-            // doing a complete copy over of all the index update for each table.
-            final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
-            final HTableInterfaceReference tableReference = entry.getKey();
-			if (env != null
-					&& !allowLocalUpdates
-					&& tableReference.getTableName().equals(
-							env.getRegion().getTableDesc().getNameAsString())) {
-				continue;
-			}
-            /*
-             * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
-             * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
-             * The former will only work if we are not in the midst of writing the current batch to the table, though we
-             * do check these status variables before starting and before writing the batch. The latter usage,
-             * interrupting the thread, will work in the previous situations as was at some points while writing the
-             * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
-             * elaborate when is supports an interrupt).
-             */
-            tasks.add(new Task<Void>() {
-
-                /**
-                 * Do the actual write to the primary table.
-                 * 
-                 * @return
-                 */
-                @SuppressWarnings("deprecation")
-                @Override
-                public Void call() throws Exception {
-                    // this may have been queued, so another task infront of us may have failed, so we should
-                    // early exit, if that's the case
-                    throwFailureIfDone();
-
-                    if (LOGGER.isTraceEnabled()) {
-                        LOGGER.trace("Writing index update:" + mutations + " to table: "
-                                + tableReference);
-                    }
-                    HTableInterface table = null;
-                    try {
-                        if (allowLocalUpdates
-                                && env != null
-                                && tableReference.getTableName().equals(
-                                    env.getRegion().getTableDesc().getNameAsString())) {
-                            try {
-                                throwFailureIfDone();
-                                IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);
-                                return null;
-                            } catch (IOException ignord) {
-                                // when it's failed we fall back to the standard & slow way
-                                if (LOGGER.isDebugEnabled()) {
-                                    LOGGER.debug("indexRegion.batchMutate failed and fall back " +
-                                            "to HTable.batch(). Got error=" + ignord);
-                                }
-                            }
-                        }
-                     // if the client can retry index writes, then we don't need to retry here
-                        HTableFactory factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesfactory;
-                        table = factory.getTable(tableReference.get());
-                        throwFailureIfDone();
-                        table.batch(mutations);
-                    } catch (SingleIndexWriteFailureException e) {
-                        throw e;
-                    } catch (IOException e) {
-                        throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
-                    } catch (InterruptedException e) {
-                        // reset the interrupt status on the thread
-                        Thread.currentThread().interrupt();
-                        throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e, PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
-                    }
-                    finally{
-                        if (table != null) {
-                            table.close();
-                        }
-                    }
-                    return null;
-                }
-
-                private void throwFailureIfDone() throws SingleIndexWriteFailureException {
-                    if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
-                            "Pool closed, not attempting to write to the index!", null); }
-
-                }
-            });
-        }
-
+        super.write(toWrite, allowLocalUpdates, clientVersion);
         // actually submit the tasks to the pool and wait for them to finish/fail
         try {
             pool.submitUninterruptible(tasks);
@@ -207,37 +54,4 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
         }
 
     }
-
-    private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
-        try {
-            throw throwable;
-        } catch (SingleIndexWriteFailureException e1) {
-            throw e1;
-        } catch (Throwable e1) {
-            throw new SingleIndexWriteFailureException("Got an abort notification while writing to the index!", e1);
-        }
-
-    }
-
-    /**
-     * {@inheritDoc}
-     * <p>
-     * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed by the external
-     * {@link Stoppable}. This call does not delegate the stop down to the {@link Stoppable} passed in the constructor.
-     * 
-     * @param why
-     *            the reason for stopping
-     */
-    @Override
-    public void stop(String why) {
-        LOGGER.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
-        this.pool.stop(why);
-        this.retryingFactory.shutdown();
-        this.noRetriesfactory.shutdown();
-    }
-
-    @Override
-    public boolean isStopped() {
-        return this.stopped.isStopped();
-    }
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index fd37402..ca2ad84 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -82,6 +82,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
     private Stoppable stopped;
     private RegionCoprocessorEnvironment env;
     private KeyValueBuilder kvBuilder;
+    protected boolean disableIndexOnFailure = false;
 
     // for testing
     public TrackingParallelWriterIndexCommitter(String hbaseVersion) {
@@ -92,8 +93,9 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
     }
 
     @Override
-    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name, boolean disableIndexOnFailure) {
         this.env = env;
+        this.disableIndexOnFailure = disableIndexOnFailure;
         Configuration conf = env.getConfiguration();
         setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
                 ThreadPoolManager.getExecutor(
@@ -181,7 +183,13 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
                                     " to table: " + tableReference);
                         }
                         // if the client can retry index writes, then we don't need to retry here
-                        HTableFactory factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesFactory;
+                        HTableFactory factory;
+                        if (disableIndexOnFailure) {
+                            factory = clientVersion < MetaDataProtocol.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesFactory;
+                        }
+                        else {
+                            factory = retryingFactory;
+                        }
                         table = factory.getTable(tableReference.get());
                         throwFailureIfDone();
                         table.batch(mutations);
@@ -235,7 +243,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
         if (failures.size() > 0) {
             // make the list unmodifiable to avoid any more synchronization concerns
             throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures),
-                    PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
+                    disableIndexOnFailure && PhoenixIndexFailurePolicy.getDisableIndexOnFailure(env));
         }
         return;
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
new file mode 100644
index 0000000..7731bb4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CHECK_VERIFY_COLUMN;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
+import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ServerUtil;
+
+/**
+ * 
+ * Coprocessor that verifies the scanned rows of a non-transactional global index.
+ *
+ */
+public class GlobalIndexChecker extends BaseRegionObserver {
+    private static final Log LOG = LogFactory.getLog(GlobalIndexChecker.class);
+    /**
+     * Class that verifies a given row of a non-transactional global index.
+     * An instance of this class is created for each scanner on an index
+     * and used to verify individual rows and rebuild them if they are not valid
+     */
+    private static class GlobalIndexScanner implements RegionScanner {
+        RegionScanner scanner;
+        private long ageThreshold;
+        private int repairCount;
+        private Scan scan;
+        private Scan indexScan;
+        private Scan buildIndexScan = null;
+        private Table dataHTable = null;
+        private byte[] emptyCF;
+        private byte[] emptyCQ;
+        private IndexMaintainer indexMaintainer = null;
+        private byte[][] viewConstants = null;
+        private RegionCoprocessorEnvironment env;
+        private Region region;
+        private long minTimestamp;
+        private long maxTimestamp;
+
+        public GlobalIndexScanner(RegionCoprocessorEnvironment env, Scan scan, RegionScanner scanner) throws IOException {
+            this.env = env;
+            this.scan = scan;
+            region = env.getRegion();
+            this.scanner = scanner;
+            emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+            emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+            ageThreshold = env.getConfiguration().getLong(
+                    QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS);
+            repairCount = env.getConfiguration().getInt(
+                    QueryServices.GLOBAL_INDEX_ROW_REPAIR_COUNT_ATTRIB,
+                    QueryServicesOptions.DEFAULT_GLOBAL_INDEX_REPAIR_COUNT);
+            minTimestamp = scan.getTimeRange().getMin();
+            maxTimestamp = scan.getTimeRange().getMax();
+        }
+
+        @Override
+        public int getBatch() {
+            return scanner.getBatch();
+        }
+
+        @Override
+        public long getMaxResultSize() {
+            return scanner.getMaxResultSize();
+        }
+
+        @Override
+        public boolean next(List<Cell> result) throws IOException {
+            try {
+                boolean hasMore;
+                do {
+                    hasMore = scanner.next(result);
+                    if (result.isEmpty()) {
+                        break;
+                    }
+                    if (verifyRowAndRepairIfNecessary(result)) {
+                        break;
+                    }
+                    // skip this row as it is invalid
+                    // if there is no more row, then result will be an empty list
+                } while (hasMore);
+                return hasMore;
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+                return false; // impossible
+            }
+        }
+
+        @Override
+        public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+            throw new IOException("next with scannerContext should not be called in Phoenix environment");
+        }
+
+        @Override
+        public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
+            throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment");
+        }
+
+        @Override
+        public void close() throws IOException {
+            scanner.close();
+            if (dataHTable != null) {
+                dataHTable.close();
+            }
+        }
+
+        @Override
+        public HRegionInfo getRegionInfo() {
+            return scanner.getRegionInfo();
+        }
+
+        @Override
+        public boolean isFilterDone() throws IOException {
+            return scanner.isFilterDone();
+        }
+
+        @Override
+        public boolean reseek(byte[] row) throws IOException {
+            return scanner.reseek(row);
+        }
+
+        @Override
+        public long getMvccReadPoint() {
+            return scanner.getMvccReadPoint();
+        }
+
+        @Override
+        public boolean nextRaw(List<Cell> result) throws IOException {
+            try {
+                boolean hasMore;
+                do {
+                    hasMore = scanner.nextRaw(result);
+                    if (result.isEmpty()) {
+                        break;
+                    }
+                    if (verifyRowAndRepairIfNecessary(result)) {
+                        break;
+                    }
+                    // skip this row as it is invalid
+                    // if there is no more row, then result will be an empty list
+                } while (hasMore);
+                return hasMore;
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+                return false; // impossible
+            }
+        }
+
+        private void deleteRowIfAgedEnough(byte[] indexRowKey, long ts) throws IOException {
+            if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
+                Delete del = new Delete(indexRowKey, ts);
+                Mutation[] mutations = new Mutation[]{del};
+                region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
+            }
+        }
+
+        private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException {
+            // Build the data table row key from the index table row key
+            if (buildIndexScan == null) {
+                buildIndexScan = new Scan();
+                indexScan = new Scan(scan);
+                byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
+                byte[] indexTableName = region.getRegionInfo().getTable().getName();
+                dataHTable = ServerUtil.getHTableForCoprocessorScan(env, dataTableName);
+                if (indexMaintainer == null) {
+                    byte[] md = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+                    List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(md, true);
+                    indexMaintainer = getIndexMaintainer(maintainers, indexTableName);
+                }
+                if (indexMaintainer == null) {
+                    throw new DoNotRetryIOException(
+                            "repairIndexRows: IndexMaintainer is not included in scan attributes for " +
+                                    region.getRegionInfo().getTable().getNameAsString());
+                }
+                if (viewConstants == null) {
+                    viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+                }
+                // The following attributes are set to instruct UngroupedAggregateRegionObserver to do partial index rebuild
+                // i.e., rebuild a subset of index rows.
+                buildIndexScan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, TRUE_BYTES);
+                buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
+                buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+                buildIndexScan.setAttribute(BaseScannerRegionObserver.SCAN_LIMIT, Bytes.toBytes(repairCount));
+                buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+            }
+            // Rebuild the index rows from the corresponding the rows in the the data table
+            byte[] dataRowKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRowKey), viewConstants);
+            buildIndexScan.withStartRow(dataRowKey, true);
+            buildIndexScan.setTimeRange(ts, maxTimestamp);
+            buildIndexScan.setRaw(true);
+            try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
+                resultScanner.next();
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+            }
+            // Close the current scanner as the newly build row will not be visible to it
+            scanner.close();
+            // Open a new scanner starting from the current row
+            indexScan.withStartRow(indexRowKey, true);
+            scanner = region.getScanner(indexScan);
+            // Scan the newly build index rows
+            scanner.next(row);
+            if (row.isEmpty()) {
+                return;
+            }
+            // Check if the corresponding data table row exist
+            if (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
+                    indexRowKey, 0, indexRowKey.length) == 0) {
+                if (!verifyRowAndRemoveEmptyColumn(row)) {
+                    // The corresponding row does not exist in the data table.
+                    // Need to delete the row from index if it is old enough
+                    deleteRowIfAgedEnough(indexRowKey, ts);
+                    row.clear();
+                }
+                return;
+            }
+            // This means the current index row is deleted by the rebuild process and we got the next row.
+            // If it is verified then we are good to go. If not, then we need to repair the new row
+            if (!verifyRowAndRemoveEmptyColumn(row)) {
+                // Rewind the scanner and let the row be scanned again so that it can be repaired
+                scanner.close();
+                scanner = region.getScanner(indexScan);
+                row.clear();
+            }
+        }
+
+        private boolean isEmptyColumn(Cell cell) {
+            return Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+                    emptyCF, 0, emptyCF.length) == 0 &&
+                    Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                            emptyCQ, 0, emptyCQ.length) == 0;
+        }
+
+        private boolean verifyRow(byte[] rowKey) throws IOException {
+            LOG.warn("Scan " + scan + " did not return the empty column for " + region.getRegionInfo().getTable().getNameAsString());
+            Get get = new Get(rowKey);
+            get.setTimeRange(minTimestamp, maxTimestamp);
+            get.addColumn(emptyCF, emptyCQ);
+            Result result = region.get(get);
+            if (result.isEmpty()) {
+                LOG.warn("The empty column does not exist in a row in " + region.getRegionInfo().getTable().getNameAsString());
+                return false;
+            }
+            if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, UNVERIFIED_BYTES.length,
+                    UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) {
+                return false;
+            }
+            return true;
+        }
+
+        private boolean verifyRowAndRemoveEmptyColumn(List<Cell> cellList) throws IOException {
+            long cellListSize = cellList.size();
+            Cell cell = null;
+            if (cellListSize == 0) {
+                return true;
+            }
+            Iterator<Cell> cellIterator = cellList.iterator();
+            while (cellIterator.hasNext()) {
+                cell = cellIterator.next();
+                if (isEmptyColumn(cell)) {
+                    // Before PHOENIX-5156, the empty column value was set to 'x'. With PHOENIX-5156, it is now
+                    // set to VERIFIED (1) and UNVERIFIED (2). In order to skip the index rows that are inserted before PHOENIX-5156
+                    // we consider anything that is not UNVERIFIED means VERIFIED. IndexTool should be used to
+                    // rebuild old rows to ensure their correctness after the PHOENIX-5156 upgrade
+                    if (Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                            UNVERIFIED_BYTES, 0, UNVERIFIED_BYTES.length) == 0) {
+                        return false;
+                    }
+                    // Empty column is not supposed to be returned to the client except it is the only column included
+                    // in the scan
+                    if (cellListSize > 1) {
+                        cellIterator.remove();
+                    }
+                    return true;
+                }
+            }
+            byte[] rowKey = new byte[cell.getRowLength()];
+            System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+            return verifyRow(rowKey);
+        }
+
+        private long getMaxTimestamp(List<Cell> cellList) {
+            long maxTs = 0;
+            long ts = 0;
+            Iterator<Cell> cellIterator = cellList.iterator();
+            while (cellIterator.hasNext()) {
+                Cell cell = cellIterator.next();
+                ts = cell.getTimestamp();
+                if (ts > maxTs) {
+                    maxTs = ts;
+                }
+            }
+            return maxTs;
+        }
+
+        /**
+         * @param cellList is an input and output parameter and will either include a valid row or be an empty list
+         * @return true if there exists more rows, otherwise false
+         * @throws IOException
+         */
+        private boolean verifyRowAndRepairIfNecessary(List<Cell> cellList) throws IOException {
+            Cell cell = cellList.get(0);
+            if (verifyRowAndRemoveEmptyColumn(cellList)) {
+                return true;
+            } else {
+                byte[] rowKey = new byte[cell.getRowLength()];
+                System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+                long ts = getMaxTimestamp(cellList);
+                cellList.clear();
+                repairIndexRows(rowKey, ts, cellList);
+                if (cellList.isEmpty()) {
+                    // This means that the index row is invalid. Return false to tell the caller that this row should be skipped
+                    return false;
+                }
+                return true;
+            }
+        }
+    }
+
+    @Override
+    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+                                         Scan scan, RegionScanner s) throws IOException {
+        if (scan.getAttribute(CHECK_VERIFY_COLUMN) == null) {
+            return s;
+        }
+        return new GlobalIndexScanner(c.getEnvironment(), scan, s);
+    }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index cb09dc4..2e62745 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -316,6 +316,17 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> {
         }
         return maintainers;
     }
+
+    public static IndexMaintainer getIndexMaintainer(List<IndexMaintainer> maintainers, byte[] indexTableName) {
+        Iterator<IndexMaintainer> maintainerIterator = maintainers.iterator();
+        while (maintainerIterator.hasNext()) {
+            IndexMaintainer maintainer = maintainerIterator.next();
+            if (Bytes.compareTo(indexTableName, maintainer.getIndexTableName()) == 0) {
+                return maintainer;
+            }
+        }
+        return null;
+    }
     
     private byte[] viewIndexId;
     private PDataType viewIndexIdType;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
index 585631d..7a706bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java
@@ -109,7 +109,7 @@ public class PhoenixIndexBuilder extends NonTxIndexBuilder {
     }
     
     @Override
-    public boolean isAtomicOp(Mutation m) throws IOException {
+    public boolean isAtomicOp(Mutation m) {
         return m.getAttribute(ATOMIC_OP_ATTRIB) != null;
     }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index eb911e1..51e9de4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -121,7 +121,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
     }
 
     @Override
-    public boolean isEnabled(Mutation m) throws IOException {
+    public boolean isEnabled(Mutation m) {
         return hasIndexMaintainers(m.getAttributesMap());
     }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index cc99ae4..4bf41dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -55,6 +55,7 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.slf4j.Logger;
@@ -132,6 +133,7 @@ public class TableResultIterator implements ResultIterator {
         this.caches = caches;
         this.retry=plan.getContext().getConnection().getQueryServices().getProps()
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
+        IndexUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index f299a58..bd2b975 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -71,9 +71,7 @@ import static org.apache.phoenix.util.UpgradeUtil.syncTableAndIndexProperties;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.ref.WeakReference;
-import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
-import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Types;
@@ -187,11 +185,13 @@ import org.apache.phoenix.exception.UpgradeInProgressException;
 import org.apache.phoenix.exception.UpgradeNotRequiredException;
 import org.apache.phoenix.exception.UpgradeRequiredException;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixTransactionalIndexer;
@@ -227,7 +227,6 @@ import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.SequenceAllocation;
-import org.apache.phoenix.schema.SequenceAlreadyExistsException;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.SystemFunctionSplitPolicy;
@@ -812,7 +811,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         .getName());
             } else {
                 // In case this a local index created on a view of a multi-tenant table, the
-                // DATA_TABLE_NAME points to the name of the view instead of the physical base table
+                // PHYSICAL_DATA_TABLE_NAME points to the name of the view instead of the physical base table
                 baseTableDesc = existingDesc;
             }
             dataTableColDescForIndexTablePropSyncing = baseTableDesc.getFamily(defaultFamilyBytes);
@@ -897,6 +896,19 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         // The phoenix jar must be available on HBase classpath
         int priority = props.getInt(QueryServices.COPROCESSOR_PRIORITY_ATTRIB, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY);
         try {
+            TransactionFactory.Provider provider = getTransactionProvider(tableProps);
+            boolean isTransactional = (provider != null);
+
+            boolean globalIndexerEnabled = config.getBoolean(
+                    QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
+                    QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED);
+
+            if (tableType == PTableType.INDEX && !isTransactional) {
+                if (globalIndexerEnabled && !descriptor.hasCoprocessor(GlobalIndexChecker.class.getName())) {
+                    descriptor.addCoprocessor(GlobalIndexChecker.class.getName(), null, priority - 1, null);
+                }
+            }
+
             if (!descriptor.hasCoprocessor(ScanRegionObserver.class.getName())) {
                 descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, priority, null);
             }
@@ -909,8 +921,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
             if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) {
                 descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null);
             }
-            TransactionFactory.Provider provider = getTransactionProvider(tableProps);
-            boolean isTransactional = (provider != null);
+
             // TODO: better encapsulation for this
             // Since indexes can't have indexes, don't install our indexing coprocessor for indexes.
             // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use
@@ -926,15 +937,27 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                     if (descriptor.hasCoprocessor(Indexer.class.getName())) {
                         descriptor.removeCoprocessor(Indexer.class.getName());
                     }
+                    if (descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
+                        descriptor.removeCoprocessor(IndexRegionObserver.class.getName());
+                    }
                 } else {
                     // If exception on alter table to transition back to non transactional
                     if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName())) {
                         descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
                     }
-                    if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
-                        Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
-                        opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
-                        Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+                    if (globalIndexerEnabled) {
+                        if (!descriptor.hasCoprocessor(IndexRegionObserver.class.getName())) {
+                            Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+                            opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+                            IndexRegionObserver.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+                        }
+
+                    } else {
+                        if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
+                            Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
+                            opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
+                            Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts, priority);
+                        }
                     }
                 }
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 2e95483..910ce92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -151,6 +151,9 @@ public interface QueryConstants {
     public static final ImmutableBytesPtr DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr(
                DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
 
+    public static final String GLOBAL_INDEX_VERIFIED_COLUMN_QUALIFIER = EMPTY_COLUMN_NAME;
+    public static final byte[] GLOBAL_INDEX_VERIFIED_COLUMN_NAME_BYTES = Bytes.toBytes(GLOBAL_INDEX_VERIFIED_COLUMN_QUALIFIER);
+            ;
     public static final String ALL_FAMILY_PROPERTIES_KEY = "";
     public static final String SYSTEM_TABLE_PK_NAME = "pk";
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 904ed56..e109293 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -346,6 +346,13 @@ public interface QueryServices extends SQLCloseable {
     public static final String TASK_HANDLING_MAX_INTERVAL_MS_ATTRIB = "phoenix.task.handling.maxInterval.ms";
     // The initial delay before the first task from table SYSTEM.TASK is handled
     public static final String TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB = "phoenix.task.handling.initial.delay.ms";
+    // The minimum age of an unverified global index row to be eligible for deletion
+    public static final String GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB = "phoenix.global.index.row.age.threshold.to.delete.ms";
+    // The maximum number of global index rows to be rebuild at a time
+    public static final String GLOBAL_INDEX_ROW_REPAIR_COUNT_ATTRIB = "phoenix.global.index.row.repair.count.ms";
+    // Enable the IndexRegionObserver Coprocessor
+    public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = "phoenix.index.region.observer.enabled";
+
 
     // Before 4.15 when we created a view we included the parent table column metadata in the view
     // metadata. After PHOENIX-3534 we allow SYSTEM.CATALOG to split and no longer store the parent
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index a045a3a..2d66217 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -355,6 +355,10 @@ public class QueryServicesOptions {
     public static final long DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS = 30*60*1000; // 30 min
     public static final long DEFAULT_TASK_HANDLING_INITIAL_DELAY_MS = 10*1000; // 10 sec
 
+    public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 10*60*1000; /* 10 min */
+    public static final int DEFAULT_GLOBAL_INDEX_REPAIR_COUNT = DEFAULT_MUTATE_BATCH_SIZE;
+    public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
+
     public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
 
     public static final boolean DEFAULT_PROPERTY_POLICY_PROVIDER_ENABLED = true;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ab7d881..81f540c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -942,6 +942,9 @@ public class MetaDataClient {
                 } else {
                     indexesToAdd.add(PTableImpl.builderWithColumns(index, getColumnsToClone(index))
                             .setTableName(modifiedIndexName)
+                            .setParentName(view.getName())
+                            .setParentSchemaName(view.getSchemaName())
+                            .setParentTableName(view.getTableName())
                             .setViewStatement(viewStatement)
                             .setUpdateCacheFrequency(view.getUpdateCacheFrequency())
                             .setTenantId(view.getTenantId())
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index f1d9ce4..79a0dfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -21,9 +21,11 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERS
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME;
 import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 import static org.apache.phoenix.util.PhoenixRuntime.getTable;
 
 import java.io.ByteArrayInputStream;
@@ -37,6 +39,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
+import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
@@ -54,6 +57,8 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -75,6 +80,7 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
@@ -82,12 +88,16 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
+import org.apache.phoenix.filter.ColumnProjectionFilter;
+import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter;
+import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -844,7 +854,7 @@ public class IndexUtil {
                                     Collections.<PTable>emptyIterator();
         return Lists.newArrayList(indexIterator);
     }
-    
+
     public static Result incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable,long amount) throws IOException {
         byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(failedIndexTable);
         Increment incr = new Increment(indexTableKey);
@@ -873,4 +883,86 @@ public class IndexUtil {
             throw new IOException(e);
         }
     }
+
+    private static boolean containsOneOrMoreColumn(Scan scan) {
+        Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
+        if (familyMap == null || familyMap.isEmpty()) {
+            return false;
+        }
+        for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
+            NavigableSet<byte[]> family = entry.getValue();
+            if (family != null && !family.isEmpty()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) {
+        boolean addedEmptyColumn = false;
+        Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan);
+        while (iterator.hasNext()) {
+            Filter filter = iterator.next();
+            if (filter instanceof EncodedQualifiersColumnProjectionFilter) {
+                ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME);
+                if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+                    scan.addColumn(emptyCF, emptyCQ);
+                }
+            }
+            else if (filter instanceof ColumnProjectionFilter) {
+                ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ));
+                if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+                    scan.addColumn(emptyCF, emptyCQ);
+                }
+            }
+            else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) {
+                ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME);
+            }
+            else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) {
+                scan.addColumn(emptyCF, emptyCQ);
+                addedEmptyColumn = true;
+            }
+        }
+        if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) {
+            scan.addColumn(emptyCF, emptyCQ);
+        }
+    }
+
+    public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException {
+        if (table.isTransactional() || table.isImmutableRows() || table.getType() != PTableType.INDEX) {
+            return;
+        }
+        PTable indexTable = table;
+        if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) {
+            return;
+        }
+        String schemaName = indexTable.getParentSchemaName().getString();
+        String tableName = indexTable.getParentTableName().getString();
+        PTable dataTable;
+        try {
+            dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName));
+        } catch (TableNotFoundException e) {
+            // This index table must be being deleted. No need to set the scan attributes
+            return;
+        }
+        if (!dataTable.getIndexes().contains(indexTable)) {
+            return;
+        }
+        if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) {
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection);
+            scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+        }
+        scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES);
+        scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes());
+        IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection);
+        byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+        byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF);
+        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ);
+        if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) {
+            BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
+        }
+        addEmptyColumnToScan(scan, emptyCF, emptyCQ);
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 26d2655..dd89f07 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.query;
 import static org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY;
 import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GLOBAL_INDEX_ROW_REPAIR_COUNT_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.INDEX_FAILURE_DISABLE_INDEX;
 import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -624,6 +626,7 @@ public abstract class BaseTest {
         conf.setInt(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 1000);
         conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
         conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
+        conf.setInt(GLOBAL_INDEX_ROW_REPAIR_COUNT_ATTRIB, 5);
         return conf;
     }