You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/24 22:13:44 UTC

[26/50] [abbrv] phoenix git commit: PHOENIX-2635 Partial index rebuild doesn't work for mutable data

PHOENIX-2635 Partial index rebuild doesn't work for mutable data


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/046bda34
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/046bda34
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/046bda34

Branch: refs/heads/calcite
Commit: 046bda34771aaec3befd4ad17024afc5af9b83ed
Parents: e2a6386
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Feb 15 00:33:05 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Feb 15 10:14:54 2016 -0800

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    | 379 +++++++------------
 .../end2end/index/ReadOnlyIndexFailureIT.java   |  75 ++--
 .../EndToEndCoveredColumnsIndexBuilderIT.java   |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   5 +-
 .../coprocessor/MetaDataRegionObserver.java     | 120 +++++-
 .../hbase/index/covered/LocalTableState.java    |  19 +-
 .../phoenix/hbase/index/covered/TableState.java |   7 +-
 .../index/covered/data/LocalHBaseState.java     |   6 +-
 .../hbase/index/covered/data/LocalTable.java    |   9 +-
 .../example/CoveredColumnIndexCodec.java        |   4 +-
 .../hbase/index/scanner/ScannerBuilder.java     |   1 -
 .../apache/phoenix/index/IndexMaintainer.java   |   4 +-
 .../apache/phoenix/index/PhoenixIndexCodec.java |  12 +-
 .../phoenix/index/PhoenixIndexMetaData.java     |  10 +-
 .../index/PhoenixTransactionalIndexer.java      |   2 +-
 .../org/apache/phoenix/jdbc/PhoenixDriver.java  |  32 +-
 .../apache/phoenix/parse/NamedTableNode.java    |   8 +
 .../phoenix/query/QueryServicesOptions.java     |   2 +-
 .../apache/phoenix/schema/MetaDataClient.java   |  34 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |   8 +-
 .../index/covered/TestLocalTableState.java      |  10 +-
 .../example/TestCoveredColumnIndexCodec.java    |   4 +-
 22 files changed, 368 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 176c5a0..ebc6988 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -30,24 +30,17 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseCluster;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.query.QueryServices;
@@ -61,7 +54,6 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -75,28 +67,29 @@ import com.google.common.collect.Maps;
  * For some reason dropping tables after running this test
  * fails unless it runs its own mini cluster. 
  * 
- * 
- * @since 2.1
  */
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
-    private Timer scheduleTimer;
-
+    public static volatile boolean FAIL_WRITE = false;
+    public static final String INDEX_NAME = "IDX";
+    
     private String tableName;
     private String indexName;
     private String fullTableName;
     private String fullIndexName;
 
-    private boolean transactional;
+    private final boolean transactional;
+    private final boolean localIndex;
     private final String tableDDLOptions;
 
-    public MutableIndexFailureIT(boolean transactional) {
+    public MutableIndexFailureIT(boolean transactional, boolean localIndex) {
         this.transactional = transactional;
+        this.localIndex = localIndex;
         this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : "";
-        this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "");
-        this.indexName = "IDX";
+        this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "");
+        this.indexName = INDEX_NAME;
         this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
         this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
     }
@@ -104,31 +97,28 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
-        serverProps.put("hbase.client.retries.number", "2");
+        serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
         serverProps.put("hbase.client.pause", "5000");
+        serverProps.put("data.tx.snapshot.dir", "/tmp");
         serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0");
         Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
         NUM_SLAVES_BASE = 4;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
-    @Parameters(name = "transactional = {0}")
+    @Parameters(name = "transactional = {0}, localIndex = {1}")
     public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] { { false }, { true } });
-    }
-
-    @Test
-    public void testWriteFailureDisablesLocalIndex() throws Exception {
-        helpTestWriteFailureDisablesIndex(true);
+        return Arrays.asList(new Boolean[][] { { false, false }, { false, true }, { true, false }, { true, true } });
     }
 
     @Test
     public void testWriteFailureDisablesIndex() throws Exception {
-        helpTestWriteFailureDisablesIndex(false);
+        helpTestWriteFailureDisablesIndex();
     }
 
-    public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Exception {
+    public void helpTestWriteFailureDisablesIndex() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = driver.connect(url, props)) {
             String query;
@@ -140,15 +130,9 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
-            if(localIndex) {
-                conn.createStatement().execute(
-                        "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
-                conn.createStatement().execute(
-                        "CREATE LOCAL INDEX " + indexName+ "_2" + " ON " + fullTableName + " (v2) INCLUDE (v1)");
-            } else {
-                conn.createStatement().execute(
-                        "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
-            }
+            FAIL_WRITE = false;
+            conn.createStatement().execute(
+                    "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
 
             query = "SELECT * FROM " + fullIndexName;
             rs = conn.createStatement().executeQuery(query);
@@ -167,23 +151,50 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             stmt.setString(2, "x");
             stmt.setString(3, "1");
             stmt.execute();
+            stmt.setString(1, "b");
+            stmt.setString(2, "y");
+            stmt.setString(3, "2");
+            stmt.execute();
+            stmt.setString(1, "c");
+            stmt.setString(2, "z");
+            stmt.setString(3, "3");
+            stmt.execute();
             conn.commit();
 
-            TableName indexTable =
-                    TableName.valueOf(localIndex ? MetaDataUtil
-                            .getLocalIndexTableName(fullTableName) : fullIndexName);
-            HBaseAdmin admin = getUtility().getHBaseAdmin();
-            HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
-            try{
-                admin.disableTable(indexTable);
-                admin.deleteTable(indexTable);
-            } catch (TableNotFoundException ignore) {}
+            query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            String expectedPlan =
+                    "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
+            assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("x", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("c", rs.getString(1));
+            assertEquals("z", rs.getString(2));
+            assertFalse(rs.next());
+
+            FAIL_WRITE = true;
 
             stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-            stmt.setString(1, "a2");
+            // Insert new row
+            stmt.setString(1, "d");
+            stmt.setString(2, "d");
+            stmt.setString(3, "4");
+            stmt.execute();
+            // Update existing row
+            stmt.setString(1, "a");
             stmt.setString(2, "x2");
             stmt.setString(3, "2");
             stmt.execute();
+            // Delete existing row
+            stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
+            stmt.setString(1, "b");
+            stmt.execute();
             try {
                 conn.commit();
                 fail();
@@ -196,20 +207,17 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
             // the index is only disabled for non-txn tables upon index table write failure
-            PIndexState indexState =  transactional ? PIndexState.ACTIVE : PIndexState.DISABLE;
-            assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-            if(localIndex) {
-                rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2",
-                        new String[] { PTableType.INDEX.toString() });
-                assertTrue(rs.next());
-                assertEquals(indexName + "_2", rs.getString(3));
-                assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
-                assertFalse(rs.next());
+            if (transactional) {
+                assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+            } else {
+                String indexState = rs.getString("INDEX_STATE");
+                assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState));
             }
+            assertFalse(rs.next());
 
-            // if the table is transactional the write to the index table will fail because the
-            // index has not been disabled
+            // If the table is transactional the write to both the data and index table will fail 
+            // in an all or none manner. If the table is not transactional, then the data writes
+            // would have succeeded while the index writes would have failed.
             if (!transactional) {
                 // Verify UPSERT on data table still work after index is disabled
                 stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
@@ -218,210 +226,101 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
                 stmt.setString(3, "3");
                 stmt.execute();
                 conn.commit();
-            }
 
-            if (transactional) {
-                // if the table was transactional there should be 1 row (written before the index
-                // was disabled)
-                query = "SELECT /*+ NO_INDEX */ v2 FROM " + fullTableName;
+                // Verify previous writes succeeded to data table
+                query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
                 rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-                String expectedPlan =
+                expectedPlan =
                         "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
                 assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
                 rs = conn.createStatement().executeQuery(query);
                 assertTrue(rs.next());
-                assertEquals("1", rs.getString(1));
-                assertFalse(rs.next());
-            } else {
-                // if the table was not transactional there should be three rows (all writes to data
-                // table should succeed)
-                query = "SELECT v2 FROM " + fullTableName;
-                rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-                String expectedPlan =
-                        "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
-                assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
-                rs = conn.createStatement().executeQuery(query);
+                assertEquals("a", rs.getString(1));
+                assertEquals("x2", rs.getString(2));
                 assertTrue(rs.next());
-                assertEquals("1", rs.getString(1));
+                assertEquals("a3", rs.getString(1));
+                assertEquals("x3", rs.getString(2));
                 assertTrue(rs.next());
-                assertEquals("2", rs.getString(1));
+                assertEquals("c", rs.getString(1));
+                assertEquals("z", rs.getString(2));
                 assertTrue(rs.next());
-                assertEquals("3", rs.getString(1));
+                assertEquals("d", rs.getString(1));
+                assertEquals("d", rs.getString(2));
                 assertFalse(rs.next());
             }
 
-            // recreate index table
-            admin.createTable(indexTableDesc);
-            do {
-                Thread.sleep(15 * 1000); // sleep 15 secs
-                rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
-                        new String[] { PTableType.INDEX.toString() });
-                assertTrue(rs.next());
-                if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-                    break;
-                }
-                if(localIndex) {
-                    rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2",
+            // re-enable index table
+            FAIL_WRITE = false;
+            
+            boolean isActive = false;
+            if (!transactional) {
+                int maxTries = 3, nTries = 0;
+                do {
+                    Thread.sleep(15 * 1000); // sleep 15 secs
+                    rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
                             new String[] { PTableType.INDEX.toString() });
                     assertTrue(rs.next());
                     if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+                        isActive = true;
                         break;
                     }
-                }
-            } while(true);
+                } while(++nTries < maxTries);
+                assertTrue(isActive);
+            }
 
             // Verify UPSERT on data table still work after index table is recreated
             stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-            stmt.setString(1, "a4");
+            stmt.setString(1, "a3");
             stmt.setString(2, "x4");
             stmt.setString(3, "4");
             stmt.execute();
             conn.commit();
 
-            // verify index table has data
-            query = "SELECT count(1) FROM " + fullIndexName;
+            // verify index table has correct data
+            query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + fullTableName;
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            expectedPlan =
+                    " OVER " + (localIndex ? MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + tableName : fullIndexName);
+            String explainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(explainPlan.contains(expectedPlan));
             rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-
-            // for txn tables there will be only one row in the index (a4)
-            // for non txn tables there will be three rows because we only partially build index
-            // from where we failed and the oldest
-            // index row has been deleted when we dropped the index table during test
-            assertEquals(transactional ? 1 : 3, rs.getInt(1));
-        }
-    }
-
-
-    @Ignore("See PHOENIX-2332")
-    @Test
-    public void testWriteFailureWithRegionServerDown() throws Exception {
-        String query;
-        ResultSet rs;
-
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = driver.connect(url, props);) {
-            conn.setAutoCommit(false);
-            conn.createStatement().execute(
-                    "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "+tableDDLOptions);
-            query = "SELECT * FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-
-            conn.createStatement().execute(
-                    "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
-            query = "SELECT * FROM " + fullIndexName;
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-
-            // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
-                    new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(indexName, rs.getString(3));
-            assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-            stmt.setString(1, "a");
-            stmt.setString(2, "x");
-            stmt.setString(3, "1");
-            stmt.execute();
-            conn.commit();
-
-            // find a RS which doesn't has CATALOG table
-            TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
-            TableName indexTable = TableName.valueOf(fullIndexName);
-            final HBaseCluster cluster = getUtility().getHBaseCluster();
-            Collection<ServerName> rss = cluster.getClusterStatus().getServers();
-            HBaseAdmin admin = getUtility().getHBaseAdmin();
-            List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
-            ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(),
-                    regions.get(0).getRegionName());
-            ServerName metaRS = cluster.getServerHoldingMeta();
-            ServerName rsToBeKilled = null;
-
-            // find first RS isn't holding META or CATALOG table
-            for(ServerName curRS : rss) {
-                if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
-                    rsToBeKilled = curRS;
-                    break;
-                }
-            }
-            assertTrue(rsToBeKilled != null);
-
-            regions = admin.getTableRegions(indexTable);
-            final HRegionInfo indexRegion = regions.get(0);
-            final ServerName dstRS = rsToBeKilled;
-            admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
-            getUtility().waitFor(30000, 200, new Waiter.Predicate<Exception>() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(),
-                            indexRegion.getRegionName());
-                    return (sn != null && sn.equals(dstRS));
-                }
-            });
-
-            // use timer sending updates in every 10ms
-            this.scheduleTimer = new Timer(true);
-            this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn, fullTableName), 0, 10);
-            // let timer sending some updates
-            Thread.sleep(100);
-
-            // kill RS hosting index table
-            getUtility().getHBaseCluster().killRegionServer(rsToBeKilled);
-
-            // wait for index table completes recovery
-            getUtility().waitUntilAllRegionsAssigned(indexTable);
-
-            // Verify the metadata for index is correct.       
-            do {
-                Thread.sleep(15 * 1000); // sleep 15 secs
-                rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
-                        new String[] { PTableType.INDEX.toString() });
+            if (transactional) { // failed commit does not get retried
+                assertTrue(rs.next());
+                assertEquals("a", rs.getString(1));
+                assertEquals("x", rs.getString(2));
+                assertTrue(rs.next());
+                assertEquals("a3", rs.getString(1));
+                assertEquals("x4", rs.getString(2));
+                assertTrue(rs.next());
+                assertEquals("b", rs.getString(1));
+                assertEquals("y", rs.getString(2));
+                assertTrue(rs.next());
+                assertEquals("c", rs.getString(1));
+                assertEquals("z", rs.getString(2));
+                assertFalse(rs.next());
+            } else { // failed commit eventually succeeds
+                assertTrue(rs.next());
+                assertEquals("d", rs.getString(1));
+                assertEquals("d", rs.getString(2));
+                assertTrue(rs.next());
+                assertEquals("a", rs.getString(1));
+                assertEquals("x2", rs.getString(2));
+                assertTrue(rs.next());
+                assertEquals("a3", rs.getString(1));
+                assertEquals("x4", rs.getString(2));
                 assertTrue(rs.next());
-                if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-                    break;
-                }
-            } while(true);
-            this.scheduleTimer.cancel();
+                assertEquals("c", rs.getString(1));
+                assertEquals("z", rs.getString(2));
+                assertFalse(rs.next());
+            }
         }
     }
-
-    static class SendingUpdatesScheduleTask extends TimerTask {
-        private static final Log LOG = LogFactory.getLog(SendingUpdatesScheduleTask.class);
-
-        // inProgress is to prevent timer from invoking a new task while previous one is still
-        // running
-        private final static AtomicInteger inProgress = new AtomicInteger(0);
-        private final Connection conn;
-        private final String fullTableName;
-        private int inserts = 0;
-
-        public SendingUpdatesScheduleTask(Connection conn, String fullTableName) {
-            this.conn = conn;
-            this.fullTableName = fullTableName;
-        }
-
+    
+    public static class FailingRegionObserver extends SimpleRegionObserver {
         @Override
-        public void run() {
-            if(inProgress.get() > 0){
-                return;
-            }
-
-            try {
-                inProgress.incrementAndGet();
-                inserts++;
-                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-                stmt.setString(1, "a" + inserts);
-                stmt.setString(2, "x" + inserts);
-                stmt.setString(3, String.valueOf(inserts));
-                stmt.execute();
-                conn.commit();
-            } catch (Throwable t) {
-                LOG.warn("ScheduledBuildIndexTask failed!", t);
-            } finally {
-                inProgress.decrementAndGet();
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+            if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+                throw new DoNotRetryIOException();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
index 8df82ce..931fcae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
@@ -27,19 +27,20 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -56,6 +57,9 @@ import org.apache.phoenix.util.TestUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Maps;
 /**
@@ -69,28 +73,37 @@ import com.google.common.collect.Maps;
  */
 
 @Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
 public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
-    private static final String FAIL_ON_FIRST_PUT = "bbb";
+    public static volatile boolean FAIL_WRITE = false;
+    public static final String INDEX_NAME = "IDX";
 
     private String tableName;
     private String indexName;
     private String fullTableName;
     private String fullIndexName;
+    private final boolean localIndex;
 
-    public ReadOnlyIndexFailureIT() {
-        this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
-        this.indexName = "IDX";
+    public ReadOnlyIndexFailureIT(boolean localIndex) {
+        this.localIndex = localIndex;
+        this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME;
+        this.indexName = INDEX_NAME;
         this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
         this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
     }
 
+    @Parameters(name = "localIndex = {0}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] { { false }, { true } });
+    }
+
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
-        serverProps.put("hbase.client.retries.number", "2");
+        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
         serverProps.put("hbase.client.pause", "5000");
         serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0");
         serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true");
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
@@ -105,16 +118,11 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
     }
 
     @Test
-    public void testWriteFailureReadOnlyLocalIndex() throws Exception {
-        helpTestWriteFailureReadOnlyIndex(true);
-    }
-
-    @Test
     public void testWriteFailureReadOnlyIndex() throws Exception {
-        helpTestWriteFailureReadOnlyIndex(false);
+        helpTestWriteFailureReadOnlyIndex();
     }
 
-    public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Exception {
+    public void helpTestWriteFailureReadOnlyIndex() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = driver.connect(url, props)) {
             String query;
@@ -126,6 +134,7 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
+            FAIL_WRITE = false;
             if(localIndex) {
                 conn.createStatement().execute(
                         "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName 
@@ -157,9 +166,10 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             stmt.execute();
             conn.commit();
 
+            FAIL_WRITE = true;
             stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
             stmt.setString(1, "2");
-            stmt.setString(2, FAIL_ON_FIRST_PUT);
+            stmt.setString(2, "bbb");
             stmt.setString(3, "b2");
             stmt.execute();
             try {
@@ -201,6 +211,7 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
                 assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode());
             }
 
+            FAIL_WRITE = false;
             // Second attempt at writing will succeed
             int retries = 0;
             do {
@@ -222,12 +233,12 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             conn.commit();
 
             // verify index table has data
-            query = "SELECT count(1) FROM " + indexName;
+            query = "SELECT count(1) FROM " + fullIndexName;
             rs = conn.createStatement().executeQuery(query);
             assertTrue(rs.next());
             assertEquals(3, rs.getInt(1));
             
-            query = "SELECT v1 FROM " + fullTableName;
+            query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + fullTableName;
             rs = conn.createStatement().executeQuery(query);
             assertTrue(rs.next());
             assertEquals("aaa", rs.getString(1));
@@ -261,29 +272,13 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
         return (!rs.wasNull() && ts > 0);
     }
 
+    
     public static class FailingRegionObserver extends SimpleRegionObserver {
-        private Integer failCount = new Integer(0);
-        
         @Override
-        public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
-                final Durability durability) throws HBaseIOException {
-            if (shouldFailUpsert(c, put)) {
-                synchronized (failCount) {
-                    failCount++;
-                    if (failCount.intValue() == 1) {
-                        // throwing anything other than instances of IOException result
-                        // in this coprocessor being unloaded
-                        // DoNotRetryIOException tells HBase not to retry this mutation
-                        // multiple times
-                        throw new DoNotRetryIOException();
-                    }
-                }
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+            if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+                throw new DoNotRetryIOException();
             }
         }
-        
-        private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
-            return Bytes.contains(put.getRow(), Bytes.toBytes(FAIL_ON_FIRST_PUT));
-        }
-        
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 2396719..fe2f1b4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -148,7 +148,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
     public void verify(TableState state) {
       try {
         Scanner kvs =
-            ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst();
+            ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns), false).getFirst();
 
         int count = 0;
         Cell kv;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9487b36..a5533af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -58,10 +58,10 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
-import co.cask.tephra.Transaction;
-
 import com.google.common.collect.ImmutableList;
 
+import co.cask.tephra.Transaction;
+
 
 abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
 
@@ -98,6 +98,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK";
     public static final String TX_SCN = "_TxScn";
     public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
+    public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS";
     
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 4e019cd..0cce4d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.TimerTask;
@@ -31,37 +32,53 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.parse.AlterIndexStatement;
+import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpgradeUtil;
 
+import com.google.common.collect.Lists;
+
 
 /**
  * Coprocessor for metadata related operations. This coprocessor would only be registered
@@ -190,8 +207,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
             // separately, all updating the same data.
             RegionScanner scanner = null;
             PhoenixConnection conn = null;
-            boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE, 
-                    QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
             if (inProgress.get() > 0) {
                 LOG.debug("New ScheduledBuildIndexTask skipped as there is already one running");
                 return;
@@ -213,9 +228,13 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
 
+                PTable dataPTable = null;
+                MetaDataClient client = null;
                 boolean hasMore = false;
                 List<Cell> results = new ArrayList<Cell>();
+                List<PTable> indexesToPartiallyRebuild = Collections.emptyList();
                 scanner = this.env.getRegion().getScanner(scan);
+                long earliestDisableTimestamp = Long.MAX_VALUE;
 
                 do {
                     results.clear();
@@ -226,16 +245,18 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
 
-                    Long disabledTimeStampVal = 0L;
                     if (disabledTimeStamp == null || disabledTimeStamp.length == 0) {
                         continue;
                     }
 
                     // disableTimeStamp has to be a positive value
-                    disabledTimeStampVal = (Long) PLong.INSTANCE.toObject(disabledTimeStamp);
+                    long disabledTimeStampVal = PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, SortOrder.getDefault());
                     if (disabledTimeStampVal <= 0) {
                         continue;
                     }
+                    if (disabledTimeStampVal < earliestDisableTimestamp) {
+                        earliestDisableTimestamp = disabledTimeStampVal;
+                    }
 
                     byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
@@ -247,12 +268,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                         continue;
                     }
 
-                    if (!blockWriteRebuildIndex && ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
-                                    && (Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) {
-                        // index has to be either in disable or inactive state
-                        continue;
-                    }
-
                     byte[][] rowKeyMetaData = new byte[3][];
                     SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData);
                     byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
@@ -266,34 +281,101 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 
                     if (conn == null) {
                     	final Properties props = new Properties();
+                    	props.setProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB, Boolean.TRUE.toString());
                     	// Set SCN so that we don't ping server and have the upper bound set back to
                     	// the timestamp when the failure occurred.
                     	props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE));
                     	// don't run a second index populations upsert select 
                         props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); 
                         conn = DriverManager.getConnection(getJdbcUrl(env), props).unwrap(PhoenixConnection.class);
+                        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
+                        dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+                        indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
+                        client = new MetaDataClient(conn);
                     }
 
-                    String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
                     String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable);
-                    PTable dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
                     PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName);
                     if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
                         LOG.debug("Index rebuild has been skipped because not all regions of index table="
                                 + indexPTable.getName() + " are online.");
                         continue;
                     }
+                    // Allow index to begin incremental maintenance as index is back online and we
+                    // cannot transition directly from DISABLED -> ACTIVE
+                    if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) == 0) {
+                        AlterIndexStatement statement = new AlterIndexStatement(
+                                NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()),
+                                dataPTable.getTableName().getString(),
+                                false, PIndexState.INACTIVE);
+                        client.alterIndex(statement);
+                    }
+                    indexesToPartiallyRebuild.add(indexPTable);
+                } while (hasMore);
 
-                    MetaDataClient client = new MetaDataClient(conn);
+                if (!indexesToPartiallyRebuild.isEmpty()) {
                     long overlapTime = env.getConfiguration().getLong(
                         QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
                         QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
-                    long timeStamp = Math.max(0, disabledTimeStampVal - overlapTime);
+                    long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
                     
-                    LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp);
-                    client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp), blockWriteRebuildIndex);
-
-                } while (hasMore);
+                    LOG.info("Starting to build indexes=" + indexesToPartiallyRebuild + " from timestamp=" + timeStamp);
+                    Scan dataTableScan = new Scan();
+                    dataTableScan.setRaw(true);
+                    dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
+                    byte[] physicalTableName = dataPTable.getPhysicalName().getBytes();
+                    try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) {
+                        Result result;
+                        try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) {
+                            int batchSize = conn.getMutateBatchSize();
+                            List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
+                            ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+                            IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, conn);
+                            byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+                            byte[] uuidValue = ServerCacheClient.generateId();
+        
+                            while ((result = dataTableScanner.next()) != null && !result.isEmpty()) {
+                                Put put = null;
+                                Delete del = null;
+                                for (Cell cell : result.rawCells()) {
+                                    if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                        if (put == null) {
+                                            put = new Put(CellUtil.cloneRow(cell));
+                                            put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                            put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                                            put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+                                            mutations.add(put);
+                                        }
+                                        put.add(cell);
+                                    } else {
+                                        if (del == null) {
+                                            del = new Delete(CellUtil.cloneRow(cell));
+                                            del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                            del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                                            del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+                                            mutations.add(del);
+                                        }
+                                        del.addDeleteMarker(cell);
+                                    }
+                                }
+                                if (mutations.size() == batchSize) {
+                                    dataHTable.batch(mutations);
+                                    uuidValue = ServerCacheClient.generateId();
+                                }
+                            }
+                            if (!mutations.isEmpty()) {
+                                dataHTable.batch(mutations);
+                            }
+                        }
+                    }
+                    for (PTable indexPTable : indexesToPartiallyRebuild) {
+                        AlterIndexStatement statement = new AlterIndexStatement(
+                                NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()),
+                                dataPTable.getTableName().getString(),
+                                false, PIndexState.ACTIVE);
+                        client.alterIndex(statement);
+                    }
+                }
             } catch (Throwable t) {
                 LOG.warn("ScheduledBuildIndexTask failed!", t);
             } finally {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index c4ed7a0..2739cc2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -36,8 +36,6 @@ import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 
-import com.google.inject.Key;
-
 /**
  * Manage the state of the HRegion's view of the table, for the single row.
  * <p>
@@ -108,7 +106,7 @@ public class LocalTableState implements TableState {
     public void setCurrentTimestamp(long timestamp) {
         this.ts = timestamp;
     }
-
+    
     public void resetTrackedColumns() {
         this.trackedColumns.clear();
     }
@@ -139,6 +137,9 @@ public class LocalTableState implements TableState {
      * request - you will never see a column with the timestamp we are tracking, but the next oldest
      * timestamp for that column.
      * @param indexedColumns the columns to that will be indexed
+     * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
+     *        when replaying mutation state for partial index rebuild where writes succeeded to the data
+     *        table, but not to the index table.
      * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
      *         the builder. Even if no update is necessary for the requested columns, you still need
      *         to return the {@link IndexUpdate}, just don't set the update for the
@@ -146,8 +147,8 @@ public class LocalTableState implements TableState {
      * @throws IOException
      */
     public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
-        Collection<? extends ColumnReference> indexedColumns) throws IOException {
-        ensureLocalStateInitialized(indexedColumns);
+        Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations) throws IOException {
+        ensureLocalStateInitialized(indexedColumns, ignoreNewerMutations);
         // filter out things with a newer timestamp and track the column references to which it applies
         ColumnTracker tracker = new ColumnTracker(indexedColumns);
         synchronized (this.trackedColumns) {
@@ -167,7 +168,7 @@ public class LocalTableState implements TableState {
      * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even
      * then, there is still fairly low contention as each new Put/Delete will have its own table state.
      */
-    private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns)
+    private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
             throws IOException {
         // check to see if we haven't initialized any columns yet
         Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
@@ -175,7 +176,7 @@ public class LocalTableState implements TableState {
         if (toCover.isEmpty()) { return; }
 
         // add the current state of the row
-        this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false);
+        this.addUpdate(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).list(), false);
 
         // add the covered columns to the set
         for (ColumnReference ref : toCover) {
@@ -268,9 +269,9 @@ public class LocalTableState implements TableState {
     }
 
     @Override
-    public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+    public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations)
             throws IOException {
-        Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns);
+        Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations);
         ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
         return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index 0e961db..bd4bdfb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -58,9 +57,13 @@ public interface TableState {
 
   /**
    * Get a getter interface for the state of the index row
+   * @param indexedColumns list of indexed columns.
+   * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
+   *        when replaying mutation state for partial index rebuild where writes succeeded to the data
+   *        table, but not to the index table.
    */
   Pair<ValueGetter, IndexUpdate> getIndexUpdateState(
-      Collection<? extends ColumnReference> indexedColumns) throws IOException;
+      Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations) throws IOException;
 
   /**
    * @return the row key for the current row for which we are building an index update.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
index 6d20c18..9968627 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
-
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 
 /**
@@ -35,13 +34,16 @@ public interface LocalHBaseState {
    * @param m mutation for which we should get the current table state
    * @param toCover all the columns the current row state needs to cover; hint the underlying lookup
    *          to save getting all the columns for the row
+   * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
+   *        when replaying mutation state for partial index rebuild where writes succeeded to the data
+   *        table, but not to the index table.
    * @return the full state of the given row. Includes all current versions (even if they are not
    *         usually visible to the client (unless they are also doing a raw scan)). Never returns a
    *         <tt>null</tt> {@link Result} - instead, when there is not data for the row, returns a
    *         {@link Result} with no stored {@link KeyValue}s.
    * @throws IOException if there is an issue reading the row
    */
-  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover)
+  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
       throws IOException;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
index 549fe8c..003df2a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 
@@ -52,13 +51,19 @@ public class LocalTable implements LocalHBaseState {
   }
 
   @Override
-  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns)
+  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
       throws IOException {
     byte[] row = m.getRow();
     // need to use a scan here so we can get raw state, which Get doesn't provide.
     Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
     s.setStartRow(row);
     s.setStopRow(row);
+    if (ignoreNewerMutations) {
+        // Provides a means of client indicating that newer cells should not be considered,
+        // enabling mutations to be replayed to partially rebuild the index when a write fails.
+        long ts = m.getFamilyCellMap().firstEntry().getValue().get(0).getTimestamp();
+        s.setTimeRange(0,ts);
+    }
     Region region = this.env.getRegion();
     RegionScanner scanner = region.getScanner(s);
     List<Cell> kvs = new ArrayList<Cell>(1);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 4efca9f..0f960e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -77,7 +77,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
     private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) {
         List<CoveredColumn> refs = group.getColumns();
         try {
-            Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+            Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false);
             Scanner kvs = stateInfo.getFirst();
             Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey());
             // make sure we close the scanner
@@ -132,7 +132,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
     private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
         List<CoveredColumn> refs = group.getColumns();
         try {
-            Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+            Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false);
             Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
             // make sure we close the scanner reference
             kvs.getFirst().close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index e120268..b4282ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -59,7 +59,6 @@ public class ScannerBuilder {
 
   public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) {
 
-    // TODO: This needs to use some form of the filter that Tephra has when transactional
     Filter columnFilters = getColumnFilters(indexedColumns);
     FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 4d545a2..13ad7e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -89,14 +89,14 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
-import co.cask.tephra.TxConstants;
-
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import co.cask.tephra.TxConstants;
+
 /**
  * 
  * Class that builds index row key from data row key and current state of

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 7acc90c..8ad4d3e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -15,8 +15,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
@@ -59,7 +59,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
 
     @Override
     public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException {
-        List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+        PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
+        List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers();
         if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
             return Collections.emptyList();
         }
@@ -67,7 +68,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
         ptr.set(state.getCurrentRowKey());
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
         for (IndexMaintainer maintainer : indexMaintainers) {
-            Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+            Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();
             indexUpdate.setTable(maintainer.getIndexTableName());
@@ -81,7 +82,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
 
     @Override
     public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException {
-        List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+        PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
+        List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers();
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         ptr.set(state.getCurrentRowKey());
         List<IndexUpdate> indexUpdates = Lists.newArrayList();
@@ -90,7 +92,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
             // to aid in rollback if there's a KeyValue column in the index. The alternative would be
             // to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
             // client side.
-            Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+            Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
             ValueGetter valueGetter = statePair.getFirst();
             IndexUpdate indexUpdate = statePair.getSecond();
             indexUpdate.setTable(maintainer.getIndexTableName());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 60ae915..4fab674 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -22,8 +22,6 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 
-import co.cask.tephra.Transaction;
-
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.GlobalCache;
@@ -39,9 +37,12 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ServerUtil;
 
+import co.cask.tephra.Transaction;
+
 public class PhoenixIndexMetaData implements IndexMetaData {
     private final Map<String, byte[]> attributes;
     private final IndexMetaDataCache indexMetaDataCache;
+    private final boolean ignoreNewerMutations;
     
     private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
         if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
@@ -87,6 +88,7 @@ public class PhoenixIndexMetaData implements IndexMetaData {
     public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
         this.indexMetaDataCache = getIndexMetaData(env, attributes);
         this.attributes = attributes;
+        this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
     }
     
     public Transaction getTransaction() {
@@ -100,4 +102,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
     public Map<String, byte[]> getAttributes() {
         return attributes;
     }
+    
+    public boolean ignoreNewerMutations() {
+        return ignoreNewerMutations;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 26f9725..e4c106e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -486,7 +486,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         }
 
         @Override
-        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+        public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations)
                 throws IOException {
             // TODO: creating these objects over and over again is wasteful
             ColumnTracker tracker = new ColumnTracker(indexedColumns);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index 22b02c5..2c33d21 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesImpl;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -224,20 +225,23 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
                     connectionQueryServices = prevValue;
                 }
             }
-            boolean success = false;
-            SQLException sqlE = null;
-            try {
-                connectionQueryServices.init(url, info);
-                success = true;
-            } catch (SQLException e) {
-                sqlE = e;
-            }
-            finally {
-                if (!success) {
-                    // Remove from map, as initialization failed
-                    connectionQueryServicesMap.remove(normalizedConnInfo);
-                    if (sqlE != null) {
-                        throw sqlE;
+            String noUpgradeProp = info.getProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB);
+            if (!Boolean.TRUE.equals(noUpgradeProp)) {
+                boolean success = false;
+                SQLException sqlE = null;
+                try {
+                    connectionQueryServices.init(url, info);
+                    success = true;
+                } catch (SQLException e) {
+                    sqlE = e;
+                }
+                finally {
+                    if (!success) {
+                        // Remove from map, as initialization failed
+                        connectionQueryServicesMap.remove(normalizedConnInfo);
+                        if (sqlE != null) {
+                            throw sqlE;
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
index 4e0906f..d3b4505 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
@@ -39,6 +39,14 @@ public class NamedTableNode extends ConcreteTableNode {
         return new NamedTableNode(alias, name, dynColumns);
     }
     
+    public static NamedTableNode create (TableName name) {
+        return new NamedTableNode(null, name, Collections.<ColumnDef>emptyList());
+    }
+    
+    public static NamedTableNode create (String schemaName, String tableName) {
+        return new NamedTableNode(null, TableName.create(schemaName, tableName), Collections.<ColumnDef>emptyList());
+    }
+    
     NamedTableNode(String alias, TableName name) {
         super(alias, name);
         dynColumns = Collections.<ColumnDef> emptyList();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 62297ee..27c5693 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -154,7 +154,7 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on
     public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false; 
     public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs
-    public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins
+    public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 1; // 1 ms
 
     /**
      * HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 6409dcd..7f3f850 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -204,8 +204,6 @@ import org.apache.phoenix.util.UpgradeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import co.cask.tephra.TxConstants;
-
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
@@ -214,6 +212,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 
+import co.cask.tephra.TxConstants;
+
 public class MetaDataClient {
     private static final Logger logger = LoggerFactory.getLogger(MetaDataClient.class);
 
@@ -1096,36 +1096,6 @@ public class MetaDataClient {
     }
 
     /**
-     * Rebuild indexes from a timestamp which is the value from hbase row key timestamp field
-     */
-    public void buildPartialIndexFromTimeStamp(PTable index, TableRef dataTableRef, boolean blockWriteRebuildIndex) throws SQLException {
-        boolean needRestoreIndexState = true;
-        AlterIndexStatement indexStatement = null;
-        if (!blockWriteRebuildIndex) {
-            // Need to change index state from Disable to InActive when build index partially so that
-            // new changes will be indexed during index rebuilding
-            indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
-                    TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
-                    dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE);
-            alterIndex(indexStatement); 
-        } 
-        try {
-            buildIndex(index, dataTableRef);
-            needRestoreIndexState = false;
-        } finally {
-            if(needRestoreIndexState) {
-                if (!blockWriteRebuildIndex) {
-                    // reset index state to disable
-                    indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
-                            TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
-                            dataTableRef.getTable().getTableName().getString(), false, PIndexState.DISABLE);
-                    alterIndex(indexStatement);
-                }
-            }
-        }
-    }
-    
-    /**
      * Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling
      * MetaDataClient.createTable. In doing so, we perform the following translations:
      * 1) Change the type of any columns being indexed to types that support null if the column is nullable.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index bebdd8c..05ba6d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -130,6 +130,11 @@ public class PhoenixRuntime {
     public static final String TENANT_ID_ATTRIB = "TenantId";
 
     /**
+     * Use this connection property to prevent an upgrade from occurring when
+     * connecting to a new server version.
+     */
+    public static final String NO_UPGRADE_ATTRIB = "NoUpgrade";
+    /**
      * Use this connection property to control the number of rows that are
      * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
      * It's only used when autoCommit is true and your source table is
@@ -163,7 +168,8 @@ public class PhoenixRuntime {
             UPSERT_BATCH_SIZE_ATTRIB,
             AUTO_COMMIT_ATTRIB,
             CONSISTENCY_ATTRIB,
-            REQUEST_METRIC_ATTRIB
+            REQUEST_METRIC_ATTRIB,
+            NO_UPGRADE_ATTRIB
             };
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
index fa8bd85..a2e45af 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
@@ -91,7 +91,7 @@ public class TestLocalTableState {
     ColumnReference col = new ColumnReference(fam, qual);
     table.setCurrentTimestamp(ts);
     //check that our value still shows up first on scan, even though this is a lazy load
-    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col));
+    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
     Scanner s = p.getFirst();
     assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0), s.next());
   }
@@ -135,13 +135,13 @@ public class TestLocalTableState {
     ColumnReference col = new ColumnReference(fam, qual);
     table.setCurrentTimestamp(ts);
     // check that the value is there
-    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col));
+    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
     Scanner s = p.getFirst();
     assertEquals("Didn't get the pending mutation's value first", kv, s.next());
 
     // rollback that value
     table.rollback(Arrays.asList(kv));
-    p = table.getIndexedColumnsTableState(Arrays.asList(col));
+    p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
     s = p.getFirst();
     assertEquals("Didn't correctly rollback the row - still found it!", null, s.next());
     Mockito.verify(env, Mockito.times(1)).getRegion();
@@ -179,14 +179,14 @@ public class TestLocalTableState {
     ColumnReference col = new ColumnReference(fam, qual);
     table.setCurrentTimestamp(ts);
     // check that the value is there
-    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col));
+    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
     Scanner s = p.getFirst();
     // make sure it read the table the one time
     assertEquals("Didn't get the stored keyvalue!", storedKv, s.next());
 
     // on the second lookup it shouldn't access the underlying table again - the cached columns
     // should know they are done
-    p = table.getIndexedColumnsTableState(Arrays.asList(col));
+    p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
     s = p.getFirst();
     assertEquals("Lost already loaded update!", storedKv, s.next());
     Mockito.verify(env, Mockito.times(1)).getRegion();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
index fc3a976..b8fa72d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.covered.IndexUpdate;
 import org.apache.phoenix.hbase.index.covered.LocalTableState;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
@@ -138,7 +138,7 @@ public class TestCoveredColumnIndexCodec {
     }
 
     @Override
-    public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover)
+    public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
         throws IOException {
       return r;
     }