You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/23 04:20:32 UTC

[2/3] phoenix git commit: PHOENIX-4178 Detect failed index write while rebuilder is running with index staying active

PHOENIX-4178 Detect failed index write while rebuilder is running with index staying active


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/944bed73
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/944bed73
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/944bed73

Branch: refs/heads/master
Commit: 944bed73585a5ff826997895c2da43720b229d8a
Parents: 0a1b33a
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Sep 21 18:24:39 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Sep 22 21:18:17 2017 -0700

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    |  12 +-
 .../end2end/index/PartialIndexRebuilderIT.java  | 459 +++++++++++--------
 .../coprocessor/MetaDataEndpointImpl.java       | 179 ++++----
 .../phoenix/coprocessor/MetaDataProtocol.java   |   6 +-
 .../coprocessor/MetaDataRegionObserver.java     |  74 +--
 .../apache/phoenix/execute/MutationState.java   |  10 +-
 .../apache/phoenix/index/IndexMaintainer.java   |  25 +-
 .../index/PhoenixIndexFailurePolicy.java        |  12 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   1 +
 .../apache/phoenix/optimize/QueryOptimizer.java |   6 +-
 .../phoenix/query/QueryServicesOptions.java     |  16 +-
 .../org/apache/phoenix/schema/PIndexState.java  |   3 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  14 +-
 .../phoenix/query/QueryServicesTestImpl.java    |  10 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |  42 +-
 15 files changed, 496 insertions(+), 373 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/944bed73/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index dbac5a9..6617466 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -35,7 +36,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -137,12 +137,12 @@ public class MutableIndexFailureIT extends BaseTest {
         // need to override rpc retries otherwise test doesn't pass
         serverProps.put(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, Long.toString(numRpcRetries));
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(forwardOverlapMs));
+        serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, Long.toString(disableTimestampThresholdMs));
         /*
          * Effectively disable running the index rebuild task by having an infinite delay
          * because we want to control it's execution ourselves
          */
         serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE));
-        serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, Long.toString(disableTimestampThresholdMs));
         Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         NUM_SLAVES_BASE = 4;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
@@ -296,10 +296,10 @@ public class MutableIndexFailureIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
             // the index is only disabled for non-txn tables upon index table write failure
+            String indexState = rs.getString("INDEX_STATE");
             if (transactional || leaveIndexActiveOnFailure || localIndex) {
-                assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+                assertTrue(PIndexState.ACTIVE.toString().equalsIgnoreCase(indexState) || PIndexState.PENDING_ACTIVE.toString().equalsIgnoreCase(indexState));
             } else {
-                String indexState = rs.getString("INDEX_STATE");
                 assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState));
             }
             assertFalse(rs.next());
@@ -308,7 +308,7 @@ public class MutableIndexFailureIT extends BaseTest {
             // in an all or none manner. If the table is not transactional, then the data writes
             // would have succeeded while the index writes would have failed.
             if (!transactional) {
-                updateTableAgain(conn, leaveIndexActiveOnFailure);
+                updateTableAgain(conn, false);
                 // Verify previous writes succeeded to data table
                 query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
                 rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -496,7 +496,7 @@ public class MutableIndexFailureIT extends BaseTest {
         public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
 
         @Override
-        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
             boolean throwException = false;
             if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
                     && FAIL_WRITE) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/944bed73/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index dfe5a28..9095dbe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Map;
 import java.util.Random;
@@ -32,7 +33,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -40,11 +41,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.execute.CommitException;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
@@ -64,23 +67,68 @@ import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
 
+@SuppressWarnings("deprecation")
 @RunWith(RunUntilFailure.class)
 public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
+    private static final Logger LOG = LoggerFactory.getLogger(PartialIndexRebuilderIT.class);
     private static final Random RAND = new Random(5);
     private static final int WAIT_AFTER_DISABLED = 5000;
-    private static final int REBUILD_INTERVAL = 2000;
+    private static final long REBUILD_PERIOD = 50000;
+    private static final long REBUILD_INTERVAL = 2000;
+    private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
 
+    
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString());
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, Long.toString(REBUILD_INTERVAL));
-        serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "120000"); // give up rebuilding after 2 minutes
+        serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "300000"); // give up rebuilding after 5 minutes
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED));
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
+        indexRebuildTaskRegionEnvironment =
+                (RegionCoprocessorEnvironment) getUtility()
+                        .getRSForFirstRegionInTable(
+                            PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        .get(0).getCoprocessorHost()
+                        .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+        MetaDataRegionObserver.initRebuildIndexConnectionProps(
+            indexRebuildTaskRegionEnvironment.getConfiguration());
+    }
+
+    private static void runIndexRebuilder() throws InterruptedException, SQLException {
+        BuildIndexScheduleTask task =
+                new MetaDataRegionObserver.BuildIndexScheduleTask(
+                        indexRebuildTaskRegionEnvironment);
+        task.run();
+    }
+    
+    private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel) {
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!cancel[0]) {
+                    try {
+                        runIndexRebuilder();
+                        Thread.sleep(interval);
+                    } catch (InterruptedException e) {
+                        Thread.interrupted();
+                        throw new RuntimeException(e);
+                    } catch (SQLException e) {
+                        LOG.error(e.getMessage(),e);
+                    }
+                }
+            }
+        });
+        thread.setDaemon(true);
+        thread.start();
     }
 
     private static void mutateRandomly(final String fullTableName, final int nThreads, final int nRows, final int nIndexValues, final int batchSize, final CountDownLatch doneSignal) {
@@ -134,7 +182,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
     @Test
-    @Repeat(5)
     public void testConcurrentUpsertsWithRebuild() throws Throwable {
         int nThreads = 5;
         final int batchSize = 200;
@@ -155,12 +202,17 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         assertTrue("Ran out of time", doneSignal1.await(120, TimeUnit.SECONDS));
         
         IndexUtil.updateIndexState(fullIndexName, EnvironmentEdgeManager.currentTimeMillis(), metaTable, PIndexState.DISABLE);
-        do {
-            final CountDownLatch doneSignal2 = new CountDownLatch(nThreads);
-            mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal2);
-            assertTrue("Ran out of time", doneSignal2.await(500, TimeUnit.SECONDS));
-        } while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
-        
+        boolean[] cancel = new boolean[1];
+        try {
+            do {
+                final CountDownLatch doneSignal2 = new CountDownLatch(nThreads);
+                runIndexRebuilderAsync(500,cancel);
+                mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal2);
+                assertTrue("Ran out of time", doneSignal2.await(500, TimeUnit.SECONDS));
+            } while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
+        } finally {
+            cancel[0] = true;
+        }
         long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         assertEquals(nRows, actualRowCount);
     }
@@ -172,17 +224,21 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
     private static boolean hasInactiveIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException {
         PTable table = metaCache.getTableRef(key).getTable();
         for (PTable index : table.getIndexes()) {
-            if (index.getIndexState() == PIndexState.ACTIVE) {
+            if (index.getIndexState() == PIndexState.INACTIVE) {
                 return true;
             }
         }
         return false;
     }
-    
+
     private static boolean hasDisabledIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException {
+        return hasIndexWithState(metaCache, key, PIndexState.DISABLE);
+    }
+
+    private static boolean hasIndexWithState(PMetaData metaCache, PTableKey key, PIndexState expectedState) throws TableNotFoundException {
         PTable table = metaCache.getTableRef(key).getTable();
         for (PTable index : table.getIndexes()) {
-            if (index.getIndexState() == PIndexState.DISABLE) {
+            if (index.getIndexState() == expectedState) {
                 return true;
             }
         }
@@ -304,8 +360,14 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             long disableTS = EnvironmentEdgeManager.currentTimeMillis();
             HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
             IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
-            mutateRandomly(conn, fullTableName, nRows);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            boolean[] cancel = new boolean[1];
+            try {
+                runIndexRebuilderAsync(500,cancel);
+                mutateRandomly(conn, fullTableName, nRows);
+                TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            } finally {
+                cancel[0] = true;
+            }
             
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
             assertEquals(nRows,actualRowCount);
@@ -346,12 +408,16 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             t.start();
             long disableTS = EnvironmentEdgeManager.currentTimeMillis();
             IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
-            doneSignal.await(60, TimeUnit.SECONDS);
+            boolean[] cancel = new boolean[1];
+            try {
+                runIndexRebuilderAsync(500,cancel);
+                TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+                doneSignal.await(60, TimeUnit.SECONDS);
+            } finally {
+                cancel[0] = true;
+            }
             assertTrue(hasInactiveIndex[0]);
             
-            TestUtil.dumpIndexStatus(conn, fullIndexName);
-
             long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
             assertEquals(nRows,actualRowCount);
             
@@ -379,7 +445,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             conn.commit();
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee')");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -406,7 +475,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             conn.commit();
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -433,7 +505,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             conn.commit();
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -458,7 +533,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             conn.commit();
             conn.createStatement().execute("DELETE FROM " + fullTableName);
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
        }
@@ -483,7 +561,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')");
             conn.commit();
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            runIndexRebuilder();
+            Thread.sleep(WAIT_AFTER_DISABLED);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
 
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         }
@@ -526,9 +607,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
             conn.commit();
             clock.time += 1000;
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
-            clock.time += 100;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -536,7 +619,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
-    public void testSeparateTimeBatchesRequired() throws Throwable {
+    public void testTimeBatchesInCoprocessorRequired() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
         String indexName = generateUniqueName();
@@ -572,9 +655,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             clock.time += 100;
             IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
             clock.time += 100;
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
-            clock.time += 100;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -582,7 +667,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
-    public void testMultiValuesWhenDisableAndInactive() throws Throwable {
+    public void testBatchingDuringRebuild() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
         String indexName = generateUniqueName();
@@ -593,55 +678,45 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         EnvironmentEdgeManager.injectEdge(clock);
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
             clock.time += 100;
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2) INCLUDE (v3)");
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
             clock.time += 100;
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')");
             conn.commit();
             clock.time += 100;
-            try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
-                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
-                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
-                clock.time += 100;
-                long disableTime = clock.currentTime();
-                // Set some values while index disabled
-                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11','yy')");
-                conn.commit();
-                clock.time += 100;
-                assertTrue(hasDisabledIndex(metaCache, key));
-                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')");
-                conn.commit();
-                clock.time += 100;
-                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')");
-                conn.commit();
-                clock.time += 100;
-                // Will cause partial index rebuilder to be triggered
-                IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
-            }
-            final CountDownLatch doneSignal = new CountDownLatch(1);
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
-            doneSignal.await(30, TimeUnit.SECONDS);
-            // Set some values while index is in INACTIVE state
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+            long disableTime = clock.currentTime();
+            IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
             clock.time += 100;
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb', '11')");
             conn.commit();
-            clock.time += 100;
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
+            clock.time += REBUILD_PERIOD;
+            assertTrue(hasDisabledIndex(metaCache, key));
+            assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('ccc','ccc','222')");
             conn.commit();
-            clock.time += WAIT_AFTER_DISABLED;
-            // Enough time has passed, so rebuild will start now
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            assertEquals(3,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
             clock.time += 100;
+            
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertEquals(2,TestUtil.getRowCount(conn, fullIndexName));
+            
+            clock.time += REBUILD_PERIOD;
+            runIndexRebuilder();
+            // Verify that other batches were processed
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
         }
     }
-
-    private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new CountDownLatch(1);
-    private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new CountDownLatch(1);
-
+    
     @Test
     public void testUpperBoundSetOnRebuild() throws Throwable {
         String schemaName = generateUniqueName();
@@ -665,7 +740,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a', '0')");
             conn.commit();
             // Set clock forward in time past the "overlap" amount we wait for index maintenance to kick in
-            clock.time += 10 * WAIT_AFTER_DISABLED;
+            clock.time += 2 * WAIT_AFTER_DISABLED;
             assertTrue(hasDisabledIndex(metaCache, key));
             assertEquals(1,TestUtil.getRowCount(conn, fullTableName));
             assertEquals(0,TestUtil.getRowCount(conn, fullIndexName));
@@ -676,9 +751,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             // Set clock back in time and start rebuild
             clock.time = disableTime + 100;
             IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            clock.time += REBUILD_INTERVAL;
-            waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE, clock, REBUILD_INTERVAL);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
             assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
             // If an upper bound was set on the rebuilder, we should only have found one row
             assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
@@ -687,21 +764,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
-    private static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState, MyClock clock, long increment) throws InterruptedException, SQLException {
-        int maxTries = 60, nTries = 0;
-        do {
-            Thread.sleep(1000); // sleep 1 sec
-            clock.time += increment;
-            if (TestUtil.checkIndexState(conn, fullIndexName, expectedIndexState, 0L)) {
-                return;
-            }
-        } while (++nTries < maxTries);
-        fail("Ran out of time waiting for index state to become " + expectedIndexState);
-    }
-
-
     @Test
-    public void testDisableIndexDuringRebuild() throws Throwable {
+    public void testMultiValuesWhenDisableAndInactive() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
         String indexName = generateUniqueName();
@@ -737,47 +801,21 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
                 clock.time += 100;
                 // Will cause partial index rebuilder to be triggered
                 IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
-                final CountDownLatch doneSignal = new CountDownLatch(1);
-                advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
-                // Set some values while index is in INACTIVE state
-                clock.time += 100;
-                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
-                conn.commit();
-                clock.time += 100;
-                conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
-                conn.commit();
-                doneSignal.await(30, TimeUnit.SECONDS);
-                // Install coprocessor that will simulate an index write failure during index rebuild
-                TestUtil.addCoprocessor(conn,fullIndexName,WriteFailingRegionObserver.class);
-                clock.time += WAIT_AFTER_DISABLED;
-                doneSignal.await(30, TimeUnit.SECONDS);
-                WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
-                // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
-                IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
-	            clock.time += 100;
-	            disableTime = clock.currentTime();
-	            // Set some values while index disabled
-	            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bbbbb', '11','yy')");
-	            conn.commit();
-	            clock.time += 100;
-	            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','cccccc','222','zzz')");
-	            conn.commit();
-	            clock.time += 100;
-	            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ddddddd','3333','zzzz')");
-	            conn.commit();
-	            clock.time += 100;
-	            // Simulates another write failure. Should cause current run of rebuilder to fail and retry again later
-	            IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
-                removeWriteFailingCoprocessor(conn,fullIndexName);
-	            WAIT_FOR_INDEX_WRITE.countDown();
             }
-            // Original rebuilder should have failed
-            
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            clock.time += WAIT_AFTER_DISABLED * 2;
-            // Enough time has passed, so rebuild will start now
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+
+            // Set some values while index is in INACTIVE state
             clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
+            conn.commit();
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
+            conn.commit();
+            clock.time += WAIT_AFTER_DISABLED;
+            // Enough time has passed, so rebuild will start now
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -785,6 +823,100 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testIndexWriteFailureDisablingIndex() throws Throwable {
+        testIndexWriteFailureDuringRebuild(PIndexState.DISABLE);
+    }
+    
+    @Test
+    public void testIndexWriteFailureLeavingIndexActive() throws Throwable {
+        testIndexWriteFailureDuringRebuild(PIndexState.PENDING_ACTIVE);
+    }
+    
+    private void testIndexWriteFailureDuringRebuild(PIndexState indexStateOnFailure) throws Throwable {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        PTableKey key = new PTableKey(null,fullTableName);
+        final MyClock clock = new MyClock(1000);
+        EnvironmentEdgeManager.injectEdge(clock);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, DISABLE_INDEX_ON_WRITE_FAILURE = " + (indexStateOnFailure == PIndexState.DISABLE));
+            clock.time += 100;
+            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')");
+            conn.commit();
+            clock.time += 100;
+            HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+
+            long disableTime = clock.currentTime();
+            // Simulates an index write failure
+            IndexUtil.updateIndexState(fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? disableTime : -disableTime, metaTable, indexStateOnFailure);
+            
+            clock.time += 100;
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb', '11')");
+            conn.commit();
+            
+            // Large enough to be in separate time batch
+            clock.time += 2 * REBUILD_PERIOD;
+            assertTrue(hasIndexWithState(metaCache, key, indexStateOnFailure));
+            assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('ccc','ccc','222')");
+            conn.commit();
+            assertEquals(3,TestUtil.getRowCount(conn, fullTableName));
+            assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+            clock.time += 100;
+
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : PIndexState.ACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            
+            // First batch should have been processed
+            runIndexRebuilder();
+            assertEquals(2,TestUtil.getRowCount(conn, fullIndexName));
+
+            // Simulate write failure
+            TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class);
+            conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('dddd','dddd','3333')");
+            try {
+                conn.commit();
+                fail();
+            } catch (CommitException e) {
+                // Expected
+            }
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure, null));
+            PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+            ResultSet rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'");
+            assertTrue(rs.next());
+            assertEquals("0", rs.getString(1));
+            assertEquals(indexStateOnFailure == PIndexState.DISABLE ? fullTableName : fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString());
+            TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class);
+            
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : PIndexState.ACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            
+            // First batch should have been processed again because we started over
+            runIndexRebuilder();
+            assertEquals(3,TestUtil.getRowCount(conn, fullIndexName));
+
+            clock.time += 2 * REBUILD_PERIOD;
+            // Second batch should have been processed now
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
+            
+            // Verify that other batches were processed
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+        } finally {
+            EnvironmentEdgeManager.injectEdge(null);
+        }
+    }
+    
+    @Test
     public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
         String schemaName = generateUniqueName();
         String tableName = generateUniqueName();
@@ -808,9 +940,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
             conn.commit();
             clock.time += 1000;
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
-            clock.time += 100;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -841,9 +975,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
             conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k='a'");
             conn.commit();
             clock.time += 1000;
-            advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
-            TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
-            clock.time += 100;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+            clock.time += WAIT_AFTER_DISABLED;
+            runIndexRebuilder();
+            assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
             IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
         } finally {
             EnvironmentEdgeManager.injectEdge(null);
@@ -873,65 +1009,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
     
-    private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName, final MyClock clock) throws InterruptedException {
-        final CountDownLatch doneSignal = new CountDownLatch(1);
-        advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
-        clock.time += WAIT_AFTER_DISABLED + 1000;
-        doneSignal.await(30, TimeUnit.SECONDS);
-    }
-    
-    private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName, final MyClock clock, final CountDownLatch doneSignal) {
-        Runnable r = new Runnable() {
-            @Override
-            public void run() {
-                try (Connection conn = DriverManager.getConnection(getUrl())) {
-                  int nTries = 10;
-                    while (--nTries >0 && !TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)) {
-                        Thread.sleep(1000);
-                        clock.time += 1000;
-                    }
-                    doneSignal.countDown();
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-        Thread t = new Thread(r);
-        t.setDaemon(true);
-        t.start();
-    }
-    
-    private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
-        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
-        HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
-        descriptor.removeCoprocessor(WriteFailingRegionObserver.class.getName());
-        int numTries = 10;
-        try (HBaseAdmin admin = services.getAdmin()) {
-            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
-            while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
-                    && numTries > 0) {
-                numTries--;
-                if (numTries == 0) {
-                    throw new Exception(
-                            "Check to detect if delaying co-processor was removed failed after "
-                                    + numTries + " retries.");
-                }
-                Thread.sleep(1000);
-            }
-        }
-    }
-    
     public static class WriteFailingRegionObserver extends SimpleRegionObserver {
         @Override
         public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-        	WAIT_FOR_REBUILD_TO_START.countDown();
-        	try {
-				WAIT_FOR_INDEX_WRITE.await(30, TimeUnit.SECONDS);
-			} catch (InterruptedException e) {
-				Thread.interrupted();
-				throw new IOException(e);
-			}
+            throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString());
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/944bed73/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5131a77..80b0785 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -515,7 +515,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             }
 
             long currentTime = EnvironmentEdgeManager.currentTimeMillis();
-            PTable table = doGetTable(key, request.getClientTimestamp());
+            PTable table = doGetTable(key, request.getClientTimestamp(), request.getClientVersion());
             if (table == null) {
                 builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
                 builder.setMutationTime(currentTime);
@@ -527,7 +527,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE;
             for (PTable index : table.getIndexes()) {
                 disableIndexTimestamp = index.getIndexDisableTimestamp();
-                if (disableIndexTimestamp > 0 && index.getIndexState() == PIndexState.ACTIVE && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
+                if (disableIndexTimestamp > 0 && (index.getIndexState() == PIndexState.ACTIVE || index.getIndexState() == PIndexState.PENDING_ACTIVE) && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
                     minNonZerodisableIndexTimestamp = disableIndexTimestamp;
                 }
             }
@@ -554,7 +554,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     }
 
     private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
-            long clientTimeStamp) throws IOException, SQLException {
+            long clientTimeStamp, int clientVersion) throws IOException, SQLException {
         Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
         Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
         try (RegionScanner scanner = region.getScanner(scan)) {
@@ -563,7 +563,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             PTable newTable;
             boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
                     QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
-            newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
+            newTable = getTable(scanner, clientTimeStamp, tableTimeStamp, clientVersion);
             if (newTable == null) {
                 return null;
             }
@@ -646,9 +646,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
 
-    private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws IOException, SQLException {
+    private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes, int clientVersion) throws IOException, SQLException {
         byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes());
-        PTable indexTable = doGetTable(key, clientTimeStamp);
+        PTable indexTable = doGetTable(key, clientTimeStamp, clientVersion);
         if (indexTable == null) {
             ServerUtil.throwIOException("Index not found", new TableNotFoundException(schemaName.getString(), indexName.getString()));
             return;
@@ -795,7 +795,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         arguments.add(arg);
     }
 
-    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp)
+    private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp, int clientVersion)
         throws IOException, SQLException {
         List<Cell> results = Lists.newArrayList();
         scanner.next(results);
@@ -896,6 +896,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         PIndexState indexState =
                 indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv
                         .getValueArray()[indexStateKv.getValueOffset()]);
+        // If client is not yet up to 4.12, then translate PENDING_ACTIVE to ACTIVE (as would have been
+        // the value in those versions) since the client won't have this index state in its enum.
+        if (indexState == PIndexState.PENDING_ACTIVE && clientVersion < PhoenixDatabaseMetaData.MIN_PENDING_ACTIVE_INDEX) {
+            indexState = PIndexState.ACTIVE;
+        }
         Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
         boolean isImmutableRows =
                 immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject(
@@ -981,7 +986,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
           } else if (Bytes.compareTo(LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length, colKv.getQualifierArray(), colKv.getQualifierOffset(), colKv.getQualifierLength())==0) {    
               LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
               if (linkType == LinkType.INDEX_TABLE) {
-                  addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
+                  addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes, clientVersion);
               } else if (linkType == LinkType.PHYSICAL_TABLE) {
                   physicalTables.add(famName);
               } else if (linkType == LinkType.PARENT_TABLE) {
@@ -1247,13 +1252,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     }
 
     private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
-        ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
+        ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp, int clientVersion)
         throws IOException, SQLException {
         Region region = env.getRegion();
         Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
         PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
         // We always cache the latest version - fault in if not in cache
-        if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
+        if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp, clientVersion)) != null) {
             return table;
         }
         // if not found then check if newer table already exists and add delete marker for timestamp
@@ -1358,6 +1363,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         byte[] schemaName = null;
         byte[] tableName = null;
         try {
+            int clientVersion = request.getClientVersion();
             List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
             MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
             byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -1419,7 +1425,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                         acquireLock(region, parentTableKey, locks);
                         parentCacheKey = new ImmutableBytesPtr(parentTableKey);
                         parentTable = loadTable(env, parentTableKey, parentCacheKey, clientTimeStamp,
-                                clientTimeStamp);
+                                clientTimeStamp, clientVersion);
                         if (parentTable == null || isTableDeleted(parentTable)) {
                             builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
                             builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
@@ -1452,7 +1458,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 // Get as of latest timestamp so we can detect if we have a newer table that already
                 // exists without making an additional query
                 PTable table =
-                        loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
+                        loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP, clientVersion);
                 if (table != null) {
                     if (table.getTimeStamp() < clientTimeStamp) {
                         // If the table is older than the client time stamp and it's deleted,
@@ -1641,8 +1647,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
     
     private void findAllChildViews(Region region, byte[] tenantId, PTable table,
-            TableViewFinder result, long clientTimeStamp) throws IOException, SQLException {
-        TableViewFinder currResult = findChildViews(region, tenantId, table);
+            TableViewFinder result, long clientTimeStamp, int clientVersion) throws IOException, SQLException {
+        TableViewFinder currResult = findChildViews(region, tenantId, table, clientVersion);
         result.addResult(currResult);
         for (ViewInfo viewInfo : currResult.getViewInfoList()) {
             byte[] viewtenantId = viewInfo.getTenantId();
@@ -1650,8 +1656,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             byte[] viewTable = viewInfo.getViewName();
             byte[] tableKey = SchemaUtil.getTableKey(viewtenantId, viewSchema, viewTable);
             ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
-            PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, clientTimeStamp);
-            findAllChildViews(region, viewtenantId, view, result, clientTimeStamp);
+            PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, clientTimeStamp, clientVersion);
+            findAllChildViews(region, viewtenantId, view, result, clientTimeStamp, clientVersion);
         }
     }
         
@@ -1775,7 +1781,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final byte[] PHYSICAL_TABLE_BYTES =
             new byte[] { PTable.LinkType.PHYSICAL_TABLE.getSerializedValue() };
 
-    private TableViewFinder findChildViews(Region region, byte[] tenantId, PTable table)
+    private TableViewFinder findChildViews(Region region, byte[] tenantId, PTable table, int clientVersion)
             throws IOException, SQLException {
         byte[] tableKey =
                 SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
@@ -1784,7 +1790,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
         PTable systemCatalog =
                 loadTable(env, tableKey, cacheKey, MIN_SYSTEM_TABLE_TIMESTAMP,
-                    HConstants.LATEST_TIMESTAMP);
+                    HConstants.LATEST_TIMESTAMP, clientVersion);
         if (systemCatalog.getTimeStamp() < MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0) {
             return findChildViews_deprecated(region, tenantId, table, PHYSICAL_TABLE_BYTES);
         } else {
@@ -1842,7 +1848,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 result =
                         doDropTable(key, tenantIdBytes, schemaName, tableName, parentTableName,
                             PTableType.fromSerializedValue(tableType), tableMetadata,
-                            invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, isCascade);
+                            invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, isCascade, request.getClientVersion());
                 if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                     done.run(MetaDataMutationResult.toProto(result));
                     return;
@@ -1874,7 +1880,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
         byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> rowsToDelete,
         List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
-        List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade) throws IOException, SQLException {
+        List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade, int clientVersion) throws IOException, SQLException {
 
 
         long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
@@ -1887,7 +1893,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
         // We always cache the latest version - fault in if not in cache
         if (table != null
-                || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) {
+                || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) != null) {
             if (table.getTimeStamp() < clientTimeStamp) {
                 if (isTableDeleted(table) || tableType != table.getType()) {
                     return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
@@ -1924,7 +1930,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
 
             if (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) {
                 // Handle any child views that exist
-                TableViewFinder tableViewFinderResult = findChildViews(region, tenantId, table);
+                TableViewFinder tableViewFinderResult = findChildViews(region, tenantId, table, clientVersion);
                 if (tableViewFinderResult.hasViews()) {
                     if (isCascade) {
                         if (tableViewFinderResult.allViewsInMultipleRegions()) {
@@ -1944,7 +1950,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                 acquireLock(region, viewKey, locks);
                                 MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName,
                                         viewName, null, PTableType.VIEW, rowsToDelete, invalidateList, locks,
-                                        tableNamesToDelete, sharedTablesToDelete, false);
+                                        tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
                                 if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { return result; }
                             }
                         }
@@ -2007,7 +2013,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             acquireLock(region, indexKey, locks);
             MetaDataMutationResult result =
                     doDropTable(indexKey, tenantId, schemaName, indexName, tableName, PTableType.INDEX,
-                        rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false);
+                        rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
             if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
                 return result;
             }
@@ -2025,7 +2031,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     }
 
     private MetaDataMutationResult
-    mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
+    mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator, int clientVersion) throws IOException {
         byte[][] rowKeyMetaData = new byte[5][];
         MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
         byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -2059,7 +2065,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 // Get client timeStamp from mutations
                 long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
                 if (table == null
-                        && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
+                        && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) == null) {
                     // if not found then call newerTableExists and add delete marker for timestamp
                     // found
                     table = buildDeletedTable(key, cacheKey, region, clientTimeStamp);
@@ -2130,7 +2136,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 if (result !=null) {
                     return result;
                 } else {
-                    table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
+                    table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion);
                     return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table);
                 }
             } finally {
@@ -2289,7 +2295,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     
     private MetaDataMutationResult addColumnsAndTablePropertiesToChildViews(PTable basePhysicalTable, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
             List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, TableViewFinder childViewsResult,
-            Region region, List<RowLock> locks) throws IOException, SQLException {
+            Region region, List<RowLock> locks, int clientVersion) throws IOException, SQLException {
         List<PutWithOrdinalPosition> columnPutsForBaseTable = Lists.newArrayListWithExpectedSize(tableMetadata.size());
         Map<TableProperty, Cell> tablePropertyCellMap = Maps.newHashMapWithExpectedSize(tableMetadata.size());
         // Isolate the puts relevant to adding columns. Also figure out what kind of columns are being added.
@@ -2336,7 +2342,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             
             // lock the rows corresponding to views so that no other thread can modify the view meta-data
             RowLock viewRowLock = acquireLock(region, viewKey, locks);
-            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock);
+            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion);
             
             ColumnOrdinalPositionUpdateList ordinalPositionList = new ColumnOrdinalPositionUpdateList();
             List<PColumn> viewPkCols = new ArrayList<>(view.getPKColumns());
@@ -2635,7 +2641,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             PTable basePhysicalTable, List<RowLock> locks, List<Mutation> tableMetadata,
             List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
             List<ImmutableBytesPtr> invalidateList, long clientTimeStamp,
-            TableViewFinder childViewsResult, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete)
+            TableViewFinder childViewsResult, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, int clientVersion)
             throws IOException, SQLException {
         List<Delete> columnDeletesForBaseTable = new ArrayList<>(tableMetadata.size());
         // Isolate the deletes relevant to dropping columns. Also figure out what kind of columns
@@ -2661,7 +2667,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             // lock the rows corresponding to views so that no other thread can modify the view
             // meta-data
             RowLock viewRowLock = acquireLock(region, viewKey, locks);
-            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock);
+            PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion);
 
             ColumnOrdinalPositionUpdateList ordinalPositionList =
                     new ColumnOrdinalPositionUpdateList();
@@ -2732,7 +2738,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     dropIndexes(view, region, invalidateList, locks, clientTimeStamp,
                         schemaName, view.getName().getBytes(),
                         mutationsForAddingColumnsToViews, existingViewColumn,
-                        tableNamesToDelete, sharedTablesToDelete);
+                        tableNamesToDelete, sharedTablesToDelete, clientVersion);
                 }
             }
 
@@ -2954,7 +2960,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     List<Mutation> mutationsForAddingColumnsToViews = Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + table.getIndexes().size()));
                     if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
                         TableViewFinder childViewsResult = new TableViewFinder();
-                        findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp);
+                        findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion());
                         if (childViewsResult.hasViews()) {
                             /* 
                              * Dis-allow if:
@@ -2979,7 +2985,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                             } else {
                                 mutationsForAddingColumnsToViews = new ArrayList<>(childViewsResult.getViewInfoList().size() * tableMetaData.size());
                                 MetaDataMutationResult mutationResult = addColumnsAndTablePropertiesToChildViews(table, tableMetaData, mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, clientTimeStamp,
-                                        childViewsResult, region, locks);
+                                        childViewsResult, region, locks, request.getClientVersion());
                                 // return if we were not able to add the column successfully
                                 if (mutationResult!=null)
                                     return mutationResult;
@@ -3070,7 +3076,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     tableMetaData.addAll(mutationsForAddingColumnsToViews);
                     return null;
                 }
-            });
+            }, request.getClientVersion());
             if (result != null) {
                 done.run(MetaDataMutationResult.toProto(result));
             }
@@ -3081,11 +3087,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         }
     }
 
-    private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
-        return doGetTable(key, clientTimeStamp, null);
+    private PTable doGetTable(byte[] key, long clientTimeStamp, int clientVersion) throws IOException, SQLException {
+        return doGetTable(key, clientTimeStamp, null, clientVersion);
     }
 
-    private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock) throws IOException, SQLException {
+    private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock, int clientVersion) throws IOException, SQLException {
         ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
         Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                 GlobalCache.getInstance(this.env).getMetaDataCache();
@@ -3137,13 +3143,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 return table;
             }
             // Query for the latest table first, since it's not cached
-            table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
+            table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion);
             if ((table != null && table.getTimeStamp() < clientTimeStamp) || 
                     (blockWriteRebuildIndex && table.getIndexDisableTimestamp() > 0)) {
                 return table;
             }
             // Otherwise, query for an older version of the table - it won't be cached
-            return buildTable(key, cacheKey, region, clientTimeStamp);
+            return buildTable(key, cacheKey, region, clientTimeStamp, clientVersion);
         } finally {
             if (!wasLocked) rowLock.release();
         }
@@ -3210,7 +3216,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     }
 
     @Override
-    public void dropColumn(RpcController controller, DropColumnRequest request,
+    public void dropColumn(RpcController controller, final DropColumnRequest request,
             RpcCallback<MetaDataResponse> done) {
         List<Mutation> tableMetaData = null;
         final List<byte[]> tableNamesToDelete = Lists.newArrayList();
@@ -3232,13 +3238,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     PTableType type = table.getType();
                     if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
                         TableViewFinder childViewsResult = new TableViewFinder();
-                        findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp);
+                        findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion());
                         if (childViewsResult.hasViews()) {
                             MetaDataMutationResult mutationResult =
                                     dropColumnsFromChildViews(region, table,
                                         locks, tableMetaData, additionalTableMetaData,
                                         schemaName, tableName, invalidateList,
-                                        clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete);
+                                        clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete, request.getClientVersion());
                             // return if we were not able to drop the column successfully
                             if (mutationResult != null) return mutationResult;
                         }
@@ -3296,7 +3302,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                     dropIndexes(table, region, invalidateList, locks,
                                         clientTimeStamp, schemaName, tableName,
                                         additionalTableMetaData, columnToDelete,
-                                        tableNamesToDelete, sharedTablesToDelete);
+                                        tableNamesToDelete, sharedTablesToDelete, request.getClientVersion());
                                 } catch (ColumnFamilyNotFoundException e) {
                                     return new MetaDataMutationResult(
                                             MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager
@@ -3319,7 +3325,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     long currentTime = MetaDataUtil.getClientTimeStamp(tableMetaData);
                     return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null, tableNamesToDelete, sharedTablesToDelete);
                 }
-            });
+            }, request.getClientVersion());
             if (result != null) {
                 done.run(MetaDataMutationResult.toProto(result));
             }
@@ -3333,7 +3339,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private void dropIndexes(PTable table, Region region, List<ImmutableBytesPtr> invalidateList,
             List<RowLock> locks, long clientTimeStamp, byte[] schemaName,
             byte[] tableName, List<Mutation> additionalTableMetaData, PColumn columnToDelete, 
-            List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete)
+            List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, int clientVersion)
             throws IOException, SQLException {
         // Look for columnToDelete in any indexes. If found as PK column, get lock and drop the
         // index and then invalidate it
@@ -3372,7 +3378,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp));
                 doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(), index
                         .getTableName().getBytes(), tableName, index.getType(),
-                    additionalTableMetaData, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false);
+                    additionalTableMetaData, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
                 invalidateList.add(new ImmutableBytesPtr(indexKey));
             }
             // If the dropped column is a covered index column, invalidate the index
@@ -3438,7 +3444,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 done.run(MetaDataMutationResult.toProto(result));
                 return;
             }
-            long timeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
+            long timeStamp = HConstants.LATEST_TIMESTAMP;
             ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
             List<Cell> newKVs = tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES);
             Cell newKV = null;
@@ -3450,6 +3456,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                       INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){
                   newKV = cell;
                   indexStateKVIndex = index;
+                  timeStamp = cell.getTimestamp();
                 } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
                   INDEX_DISABLE_TIMESTAMP_BYTES, 0, INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0) {
                   disableTimeStampKVIndex = index;
@@ -3464,7 +3471,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             }
             try {
                 Get get = new Get(key);
-                get.setTimeRange(PTable.INITIAL_SEQ_NUM, timeStamp);
                 get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
                 get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
                 get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
@@ -3484,6 +3490,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 PIndexState currentState =
                         PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
                                 .getValueOffset()]);
+                // Timestamp of INDEX_STATE gets updated with each call
+                long actualTimestamp = currentStateKV.getTimestamp();
                 long curTimeStampVal = 0;
                 if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0)) {
                     curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
@@ -3491,32 +3499,28 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     // new DisableTimeStamp is passed in
                     if (disableTimeStampKVIndex >= 0) {
                         Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex);
+                        long expectedTimestamp = newDisableTimeStampCell.getTimestamp();
+                        // If the index status has been updated after the upper bound of the scan we use
+                        // to partially rebuild the index, then we need to fail the rebuild because an
+                        // index write failed before the rebuild was complete.
+                        if (actualTimestamp > expectedTimestamp) {
+                            builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
+                            builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+                            done.run(builder.build());
+                            return;
+                        }
                         long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
                                 newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
-                        // We never set the INDEX_DISABLE_TIMESTAMP to a positive value when we're setting the state to ACTIVE.
-                        // Instead, we're passing in what we expect the INDEX_DISABLE_TIMESTAMP to be currently. If it's
-                        // changed, it means that a data table row failed to write while we were partially rebuilding it
-                        // and we must rerun it.
-                        if (newState == PIndexState.ACTIVE && newDisableTimeStamp > 0) {
-                            // Don't allow setting to ACTIVE if the INDEX_DISABLE_TIMESTAMP doesn't match
-                            // what we expect.
-                            if (newDisableTimeStamp != Math.abs(curTimeStampVal)) {
-                                builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
-                                builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
-                                done.run(builder.build());
-                                return;
-                           }
-                            // Reset INDEX_DISABLE_TIMESTAMP_BYTES to zero as we're good to go.
-                            newKVs.set(disableTimeStampKVIndex, 
-                                    CellUtil.createCell(key, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES, 
-                                            timeStamp, KeyValue.Type.Put.getCode(), PLong.INSTANCE.toBytes(0L)));
-                        }
                         // We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative)
                         // from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to
                         // drive the partial index rebuild rather than update it with each attempt to update the index
                         // when a new data table write occurs.
-                        if (curTimeStampVal != 0 && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) {
-                            // not reset disable timestamp
+                        // We do legitimately move the INDEX_DISABLE_TIMESTAMP to be newer when we're rebuilding the
+                        // index in which case the state will be INACTIVE or PENDING_ACTIVE.
+                        if (curTimeStampVal != 0 
+                                && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE) 
+                                && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) {
+                            // do not reset disable timestamp as we want to keep the min
                             newKVs.remove(disableTimeStampKVIndex);
                             disableTimeStampKVIndex = -1;
                         }
@@ -3548,29 +3552,42 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) {
                     timeStamp = currentStateKV.getTimestamp();
                 }
-                if ((currentState == PIndexState.UNUSABLE && newState == PIndexState.ACTIVE)
-                        || (currentState == PIndexState.ACTIVE && newState == PIndexState.UNUSABLE)) {
+                if ((currentState == PIndexState.ACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.UNUSABLE) {
                     newState = PIndexState.INACTIVE;
                     newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                         INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
-                } else if (currentState == PIndexState.INACTIVE && newState == PIndexState.USABLE) {
-                    newState = PIndexState.ACTIVE;
+                } else if ((currentState == PIndexState.INACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.USABLE) {
+                    // Don't allow manual state change to USABLE (i.e. ACTIVE) if non zero INDEX_DISABLE_TIMESTAMP
+                    if (curTimeStampVal != 0) {
+                        newState = currentState;
+                    } else {
+                        newState = PIndexState.ACTIVE;
+                    }
                     newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                         INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
                 }
 
                 PTable returnTable = null;
                 if (currentState != newState || disableTimeStampKVIndex != -1) {
+                    // make a copy of tableMetadata so we can add to it
+                    tableMetadata = new ArrayList<Mutation>(tableMetadata);
+                    // Always include the empty column value at latest timestamp so
+                    // that clients pull over update.
+                    Put emptyValue = new Put(key);
+                    emptyValue.addColumn(TABLE_FAMILY_BYTES, 
+                            QueryConstants.EMPTY_COLUMN_BYTES, 
+                            HConstants.LATEST_TIMESTAMP, 
+                            QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                    tableMetadata.add(emptyValue);
                     byte[] dataTableKey = null;
-                    if(dataTableKV != null) {
-                        dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, dataTableKV.getValue());
-                    }
-                    if(dataTableKey != null) {
-                        // make a copy of tableMetadata
-                        tableMetadata = new ArrayList<Mutation>(tableMetadata);
+                    if (dataTableKV != null) {
+                        dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, CellUtil.cloneValue(dataTableKV));
                         // insert an empty KV to trigger time stamp update on data table row
                         Put p = new Put(dataTableKey);
-                        p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+                        p.addColumn(TABLE_FAMILY_BYTES,
+                                QueryConstants.EMPTY_COLUMN_BYTES,
+                                HConstants.LATEST_TIMESTAMP,
+                                QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
                         tableMetadata.add(p);
                     }
                     boolean setRowKeyOrderOptimizableCell = newState == PIndexState.BUILDING && !rowKeyOrderOptimizable;
@@ -3589,7 +3606,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     }
                     if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1
                             || currentState == PIndexState.DISABLE || newState == PIndexState.BUILDING) {
-                        returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock);
+                        returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock, request.getClientVersion());
                     }
                 }
                 // Get client timeStamp from mutations, since it may get updated by the