You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/05/12 00:09:19 UTC

[2/4] phoenix git commit: PHOENIX-3811 Do not disable index on write failure by default

PHOENIX-3811 Do not disable index on write failure by default


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1f1ecc97
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1f1ecc97
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1f1ecc97

Branch: refs/heads/master
Commit: 1f1ecc97bab1b081a2b18e78ad141385c837ecd3
Parents: f6fbb0d
Author: James Taylor <ja...@apache.org>
Authored: Wed May 10 09:52:23 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu May 11 17:04:09 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/AutomaticRebuildIT.java     | 221 --------------
 .../end2end/IndexToolForPartialBuildIT.java     |  15 +-
 ...olForPartialBuildWithNamespaceEnabledIT.java |  15 +-
 .../end2end/index/MutableIndexFailureIT.java    | 252 ++++++++++------
 .../end2end/index/ReadOnlyIndexFailureIT.java   | 291 -------------------
 .../apache/phoenix/compile/DeleteCompiler.java  |   5 +
 .../apache/phoenix/compile/UpsertCompiler.java  |   4 +
 .../coprocessor/MetaDataEndpointImpl.java       |   6 +-
 .../coprocessor/MetaDataRegionObserver.java     |  44 ++-
 .../UngroupedAggregateRegionObserver.java       |   7 +
 .../phoenix/exception/SQLExceptionCode.java     |   2 +
 .../apache/phoenix/execute/CommitException.java |   8 +-
 .../apache/phoenix/execute/MutationState.java   |  13 +-
 .../phoenix/hbase/index/write/IndexWriter.java  |   4 +
 .../write/LeaveIndexActiveFailurePolicy.java    |  62 ++++
 .../index/PhoenixIndexFailurePolicy.java        |  82 +++++-
 .../index/PhoenixTransactionalIndexer.java      |   5 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  44 ++-
 .../phoenix/mapreduce/index/IndexTool.java      |  12 +-
 .../query/ConnectionQueryServicesImpl.java      |   2 +-
 .../org/apache/phoenix/query/QueryServices.java |   3 +-
 .../phoenix/query/QueryServicesOptions.java     |   3 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   7 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  12 +-
 .../java/org/apache/phoenix/util/JDBCUtil.java  |   5 +
 .../org/apache/phoenix/util/PhoenixRuntime.java |  14 +
 .../org/apache/phoenix/util/ServerUtil.java     |  37 +++
 .../hbase/index/write/TestIndexWriter.java      |   6 +
 28 files changed, 500 insertions(+), 681 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
deleted file mode 100644
index 25cab35..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Maps;
-
-/**
- * Tests for the {@link AutomaticRebuildIT}
- */
-@RunWith(Parameterized.class)
-public class AutomaticRebuildIT extends BaseOwnClusterIT {
-
-	private final boolean localIndex;
-	protected boolean isNamespaceEnabled = false;
-	protected final String tableDDLOptions;
-
-	public AutomaticRebuildIT(boolean localIndex) {
-		this.localIndex = localIndex;
-		StringBuilder optionBuilder = new StringBuilder();
-		optionBuilder.append(" SPLIT ON(1,2)");
-		this.tableDDLOptions = optionBuilder.toString();
-	}
-
-	@BeforeClass
-	public static void doSetup() throws Exception {
-		Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
-		serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-		serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
-		serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
-		serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-		serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
-		serverProps.put("hbase.client.pause", "5000");
-		serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "1000");
-		serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, "5");
-		Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
-		setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
-				new ReadOnlyProps(clientProps.entrySet().iterator()));
-	}
-
-	@Parameters(name = "localIndex = {0}")
-	public static Collection<Boolean[]> data() {
-		return Arrays.asList(new Boolean[][] { { false }, { true } });
-	}
-
-	@Test
-	public void testSecondaryAutomaticRebuildIndex() throws Exception {
-		String schemaName = generateUniqueName();
-		String dataTableName = generateUniqueName();
-		String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
-		final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
-		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-		props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
-		props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
-		props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
-		final Connection conn = DriverManager.getConnection(getUrl(), props);
-		Statement stmt = conn.createStatement();
-		try {
-			if (isNamespaceEnabled) {
-				conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
-			}
-			stmt.execute(String.format(
-					"CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
-					fullTableName, tableDDLOptions));
-			String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
-			PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-			FailingRegionObserver.FAIL_WRITE = false;
-			// insert two rows
-			upsertRow(stmt1, 1000);
-			upsertRow(stmt1, 2000);
-
-			conn.commit();
-			stmt.execute(String.format("CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME),11,'x')||'_xyz') ",
-					(localIndex ? "LOCAL" : ""), indxTable, fullTableName));
-			FailingRegionObserver.FAIL_WRITE = true;
-			upsertRow(stmt1, 3000);
-			upsertRow(stmt1, 4000);
-			upsertRow(stmt1, 5000);
-			try {
-				conn.commit();
-				fail();
-			} catch (SQLException e) {
-			} catch (Exception e) {
-			}
-			FailingRegionObserver.FAIL_WRITE = false;
-			ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
-					new String[] { PTableType.INDEX.toString() });
-			assertTrue(rs.next());
-			assertEquals(indxTable, rs.getString(3));
-			String indexState = rs.getString("INDEX_STATE");
-			assertEquals(PIndexState.DISABLE.toString(), indexState);
-			assertFalse(rs.next());
-			upsertRow(stmt1, 6000);
-			upsertRow(stmt1, 7000);
-			conn.commit();
-			int maxTries = 4, nTries = 0;
-			boolean isInactive = false;
-			do {
-				rs = conn.createStatement()
-						.executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + ","
-								+ PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
-								+"\""+ SYSTEM_CATALOG_SCHEMA + "\"." + SYSTEM_CATALOG_TABLE + " ("
-								+ PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
-								+ PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
-								+ PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
-				rs.next();
-				if (PIndexState.INACTIVE.getSerializedValue().equals(rs.getString(1)) && rs.getLong(2) > 3000) {
-					isInactive = true;
-					break;
-				}
-				Thread.sleep(10 * 1000); // sleep 10 secs
-			} while (++nTries < maxTries);
-			assertTrue(isInactive);
-			nTries = 0;
-			boolean isActive = false;
-			do {
-				Thread.sleep(15 * 1000); // sleep 15 secs
-				rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
-						new String[] { PTableType.INDEX.toString() });
-				assertTrue(rs.next());
-				if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
-					isActive = true;
-					break;
-				}
-			} while (++nTries < maxTries);
-			assertTrue(isActive);
-
-		} finally {
-			conn.close();
-		}
-	}
-
-	public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
-		// insert row
-		stmt.setInt(1, i);
-		stmt.setString(2, "uname" + String.valueOf(i));
-		stmt.setInt(3, 95050 + i);
-		stmt.executeUpdate();
-	}
-
-	public static class FailingRegionObserver extends SimpleRegionObserver {
-		public static volatile boolean FAIL_WRITE = false;
-		public static final String INDEX_NAME = "IDX";
-
-		@Override
-		public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
-				MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-			if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
-				throw new DoNotRetryIOException();
-			}
-			Mutation operation = miniBatchOp.getOperation(0);
-			Set<byte[]> keySet = operation.getFamilyMap().keySet();
-			for (byte[] family : keySet) {
-				if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
-					throw new DoNotRetryIOException();
-				}
-			}
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 599e601..59a9106 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -89,9 +89,8 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
         this.tableDDLOptions = optionBuilder.toString();
     }
     
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+    public static Map<String, String> getServerProperties() {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
         serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
         serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
@@ -99,8 +98,14 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
         serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
         serverProps.put("hbase.client.pause", "5000");
         serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.FALSE.toString());
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+        serverProps.put(QueryServices.INDEX_FAILURE_DISABLE_INDEX, Boolean.TRUE.toString());
+        return serverProps;
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = getServerProperties();
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
     }
     
     @Parameters(name="localIndex = {0}")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
index 4b2371c..a8c1f1e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
@@ -21,13 +21,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Maps;
@@ -35,7 +31,6 @@ import com.google.common.collect.Maps;
 /**
  * Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled}
  */
-@RunWith(Parameterized.class)
 public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolForPartialBuildIT {
     
     
@@ -45,15 +40,9 @@ public class IndexToolForPartialBuildWithNamespaceEnabledIT extends IndexToolFor
     }
     
     @BeforeClass
+    @Shadower(classBeingShadowed = IndexToolForPartialBuildIT.class)
     public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
-        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
-        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
-        serverProps.put("hbase.client.pause", "5000");
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "2000");
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
+        Map<String, String> serverProps = getServerProperties();
         serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
         clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index e612f49..11573a5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -24,16 +24,17 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -44,18 +45,23 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.execute.CommitException;
+import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TestUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -64,6 +70,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 /**
  * 
@@ -77,6 +84,7 @@ import com.google.common.collect.Maps;
 @RunWith(Parameterized.class)
 public class MutableIndexFailureIT extends BaseTest {
     public static final String INDEX_NAME = "IDX";
+    public static final String TABLE_NAME = "T";
 
     public static volatile boolean FAIL_WRITE = false;
     public static volatile String fullTableName;
@@ -89,23 +97,29 @@ public class MutableIndexFailureIT extends BaseTest {
     private final boolean localIndex;
     private final String tableDDLOptions;
     private final boolean isNamespaceMapped;
+    private final boolean leaveIndexActiveOnFailure;
+    private final boolean rebuildIndexOnWriteFailure;
     private String schema = generateUniqueName();
+    private List<CommitException> exceptions = Lists.newArrayList();
 
     @AfterClass
     public static void doTeardown() throws Exception {
         tearDownMiniCluster();
     }
 
-    public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped) {
+    public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, Boolean rebuildIndexOnWriteFailure) {
         this.transactional = transactional;
         this.localIndex = localIndex;
-        this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "");
-        this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "")
-                + (isNamespaceMapped ? "_NM" : "");
-        this.indexName = FailingRegionObserver.INDEX_NAME;
+        this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "") 
+                + (disableIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.DISABLE_INDEX_ON_WRITE_FAILURE + "=" + disableIndexOnWriteFailure))
+                + (rebuildIndexOnWriteFailure == null ? "" : (", " + PhoenixIndexFailurePolicy.REBUILD_INDEX_ON_WRITE_FAILURE + "=" + rebuildIndexOnWriteFailure));
+        this.tableName = FailingRegionObserver.FAIL_TABLE_NAME;
+        this.indexName = "A_" + FailingRegionObserver.FAIL_INDEX_NAME;
         fullTableName = SchemaUtil.getTableName(schema, tableName);
         this.fullIndexName = SchemaUtil.getTableName(schema, indexName);
         this.isNamespaceMapped = isNamespaceMapped;
+        this.leaveIndexActiveOnFailure = ! (disableIndexOnWriteFailure == null ? QueryServicesOptions.DEFAULT_INDEX_FAILURE_DISABLE_INDEX : disableIndexOnWriteFailure);
+        this.rebuildIndexOnWriteFailure = Boolean.TRUE.equals(rebuildIndexOnWriteFailure);
     }
 
     @BeforeClass
@@ -117,16 +131,30 @@ public class MutableIndexFailureIT extends BaseTest {
         serverProps.put("hbase.client.pause", "5000");
         serverProps.put("data.tx.snapshot.dir", "/tmp");
         serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
-        Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString());
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "4000");
+        Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         NUM_SLAVES_BASE = 4;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
 
-    @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2}") // name is used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] { { false, false, true }, { false, false, false }, { false, true, true },
-                { false, true, false }, { true, false, true }, { true, true, true }, { true, false, false },
-                { true, true, false } });
+    @Parameters(name = "MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},rebuildIndexOnWriteFailure={4}") // name is used by failsafe as file name in reports
+    public static List<Object[]> data() {
+        return Arrays.asList(new Object[][] { 
+                { false, false, true, true, true }, 
+                { false, false, false, true, true }, 
+                { true, false, false, true, true }, 
+                { true, false, true, true, true },
+                { false, true, true, true, true }, 
+                { false, true, false, true, true }, 
+                { true, true, false, true, true }, 
+                { true, true, true, true, true },
+
+                { false, false, false, null, true }, 
+                { false, true, false, false, true }, 
+                { false, false, false, false, null }, 
+        } 
+        );
     }
 
     @Test
@@ -135,9 +163,9 @@ public class MutableIndexFailureIT extends BaseTest {
     }
 
     public void helpTestWriteFailureDisablesIndex() throws Exception {
-        String secondTableName = fullTableName + "_2";
-        String secondIndexName = indexName + "_2";
-        String secondFullIndexName = fullIndexName + "_2";
+        String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME;
+//        String thirdIndexName = "C_" + INDEX_NAME;
+//        String thirdFullIndexName = SchemaUtil.getTableName(schema, thirdIndexName);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
         try (Connection conn = driver.connect(url, props)) {
@@ -149,29 +177,26 @@ public class MutableIndexFailureIT extends BaseTest {
             }
             conn.createStatement().execute("CREATE TABLE " + fullTableName
                     + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
-            conn.createStatement().execute("CREATE TABLE " + secondTableName
-                    + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
             query = "SELECT * FROM " + fullTableName;
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
             FailingRegionObserver.FAIL_WRITE = false;
             conn.createStatement().execute(
-                    "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+                    "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
             // Create other index which should be local/global if the other index is global/local to
             // check the drop index.
             conn.createStatement().execute(
-                "CREATE " + (!localIndex ? "LOCAL " : "") + "INDEX " + indexName + "_3" + " ON "
-                        + fullTableName + " (v2) INCLUDE (v1)");
-            conn.createStatement().execute(
-                    "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + secondIndexName + " ON " + secondTableName + " (v1) INCLUDE (v2)");
+                    "CREATE "  + (!localIndex ? "LOCAL " : "") + " INDEX " + secondIndexName + " ON " + fullTableName + " (v2) INCLUDE (v1)");
+//            conn.createStatement().execute(
+//                    "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
 
             query = "SELECT * FROM " + fullIndexName;
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
             // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName+"%",
+            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), null,
                     new String[] { PTableType.INDEX.toString() });
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
@@ -179,11 +204,10 @@ public class MutableIndexFailureIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals(secondIndexName, rs.getString(3));
             assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-            assertTrue(rs.next());
-            assertEquals(indexName+"_3", rs.getString(3));
-            assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+//            assertTrue(rs.next());
+//            assertEquals(thirdIndexName, rs.getString(3));
+//            assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
             initializeTable(conn, fullTableName);
-            initializeTable(conn, secondTableName);
             
             query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
             rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -203,15 +227,14 @@ public class MutableIndexFailureIT extends BaseTest {
             assertFalse(rs.next());
 
             FailingRegionObserver.FAIL_WRITE = true;
-            updateTable(conn, fullTableName);
-            updateTable(conn, secondTableName);
+            updateTable(conn, true);
             // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName,
+            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), StringUtil.escapeLike(indexName),
                     new String[] { PTableType.INDEX.toString() });
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
             // the index is only disabled for non-txn tables upon index table write failure
-            if (transactional) {
+            if (transactional || leaveIndexActiveOnFailure) {
                 assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
             } else {
                 String indexState = rs.getString("INDEX_STATE");
@@ -223,19 +246,7 @@ public class MutableIndexFailureIT extends BaseTest {
             // in an all or none manner. If the table is not transactional, then the data writes
             // would have succeeded while the index writes would have failed.
             if (!transactional) {
-                // Verify UPSERT on data table still work after index is disabled
-                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-                stmt.setString(1, "a3");
-                stmt.setString(2, "x3");
-                stmt.setString(3, "3");
-                stmt.execute();
-                conn.commit();
-                stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
-                stmt.setString(1, "a3");
-                stmt.setString(2, "x3");
-                stmt.setString(3, "3");
-                stmt.execute();
-                conn.commit();
+                updateTableAgain(conn, leaveIndexActiveOnFailure);
                 // Verify previous writes succeeded to data table
                 query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
                 rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -257,14 +268,20 @@ public class MutableIndexFailureIT extends BaseTest {
                 assertEquals("d", rs.getString(2));
                 assertFalse(rs.next());
             }
+            // Comment back in when PHOENIX-3815 is fixed
+//            validateDataWithIndex(conn, fullTableName, thirdFullIndexName, false);
 
             // re-enable index table
             FailingRegionObserver.FAIL_WRITE = false;
-            waitForIndexToBeActive(conn,indexName);
-            waitForIndexToBeActive(conn,indexName+"_2");
-            waitForIndexToBeActive(conn,secondIndexName);
+            if (rebuildIndexOnWriteFailure) {
+                // wait for index to be rebuilt automatically
+                waitForIndexToBeRebuilt(conn,indexName);
+            } else {
+                // simulate replaying failed mutation
+                replayMutations();
+            }
 
-            // Verify UPSERT on data table still work after index table is recreated
+            // Verify UPSERT on data table still works after index table is recreated
             PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
             stmt.setString(1, "a3");
             stmt.setString(2, "x4");
@@ -272,33 +289,26 @@ public class MutableIndexFailureIT extends BaseTest {
             stmt.execute();
             conn.commit();
             
-            stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
-            stmt.setString(1, "a3");
-            stmt.setString(2, "x4");
-            stmt.setString(3, "4");
-            stmt.execute();
-            conn.commit();
-            // To clear the index name from connection.
-            PhoenixConnection phoenixConn = conn.unwrap(PhoenixConnection.class);
-            phoenixConn.getMetaDataCache().removeTable(null, fullTableName, null, HConstants.LATEST_TIMESTAMP);
-            // verify index table has correct data
-            validateDataWithIndex(conn, fullTableName, fullIndexName);
-            validateDataWithIndex(conn, secondTableName, secondFullIndexName);
+            // verify index table has correct data (note that second index has been dropped)
+            validateDataWithIndex(conn, fullTableName, fullIndexName, localIndex);
         } finally {
             FAIL_WRITE = false;
         }
     }
 
-    private void waitForIndexToBeActive(Connection conn, String index) throws InterruptedException, SQLException {
+    private void waitForIndexToBeRebuilt(Connection conn, String index) throws InterruptedException, SQLException {
         boolean isActive = false;
         if (!transactional) {
-            int maxTries = 4, nTries = 0;
+            int maxTries = 12, nTries = 0;
             do {
-                Thread.sleep(15 * 1000); // sleep 15 secs
-                ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), index,
-                        new String[] { PTableType.INDEX.toString() });
+                Thread.sleep(5 * 1000); // sleep 5 secs
+                String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT) FROM " +
+                        PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
+                        + ") = (" + "'" + schema + "','" + index + "') "
+                        + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
+                ResultSet rs = conn.createStatement().executeQuery(query);
                 assertTrue(rs.next());
-                if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
+                if (rs.getLong(1) == 0 && !rs.wasNull()) {
                     isActive = true;
                     break;
                 }
@@ -325,14 +335,14 @@ public class MutableIndexFailureIT extends BaseTest {
 
     }
 
-    private void validateDataWithIndex(Connection conn, String tableName, String indexName) throws SQLException {
-        String query = "SELECT /*+ INDEX(" + indexName + ")  */ k,v1 FROM " + tableName;
+    private void validateDataWithIndex(Connection conn, String fullTableName, String fullIndexName, boolean localIndex) throws SQLException {
+        String query = "SELECT /*+ INDEX(" + fullTableName + " " + SchemaUtil.getTableNameFromFullName(fullIndexName) + ")  */ k,v1 FROM " + fullTableName;
         ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
         String expectedPlan = " OVER "
                 + (localIndex
                         ? Bytes.toString(
-                                SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped).getName())
-                        : SchemaUtil.getPhysicalTableName(indexName.getBytes(), isNamespaceMapped).getNameAsString());
+                                SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped).getName())
+                        : SchemaUtil.getPhysicalTableName(fullIndexName.getBytes(), isNamespaceMapped).getNameAsString());
         String explainPlan = QueryUtil.getExplainPlan(rs);
         assertTrue(explainPlan, explainPlan.contains(expectedPlan));
         rs = conn.createStatement().executeQuery(query);
@@ -367,8 +377,26 @@ public class MutableIndexFailureIT extends BaseTest {
         }
     }
     
-    private void updateTable(Connection conn, String tableName) throws SQLException {
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
+    private void replayMutations() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        for (int i = 0; i < exceptions.size(); i++) {
+            CommitException e = exceptions.get(i);
+            long ts = e.getServerTimestamp();
+            props.setProperty(PhoenixRuntime.REPLAY_AT_ATTRIB, Long.toString(ts));
+            try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+                if (i == 0) {
+                    updateTable(conn, false);
+                } else if (i == 1) {
+                    updateTableAgain(conn, false);
+                } else {
+                    fail();
+                }
+            }
+        }
+    }
+    
+    private void updateTable(Connection conn, boolean commitShouldFail) throws SQLException {
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
         // Insert new row
         stmt.setString(1, "d");
         stmt.setString(2, "d");
@@ -380,35 +408,79 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt.setString(3, "2");
         stmt.execute();
         // Delete existing row
-        stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE k=?");
+        stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
         stmt.setString(1, "b");
         stmt.execute();
         try {
             conn.commit();
-            fail();
-        } catch (SQLException e) {
-        } catch (Exception e) {
+            if (commitShouldFail) {
+                fail();
+            }
+        } catch (CommitException e) {
+            if (!commitShouldFail) {
+                throw e;
+            }
+            exceptions.add(e);
         }
 
     }
 
+    private void updateTableAgain(Connection conn, boolean commitShouldFail) throws SQLException {
+        // Verify UPSERT on data table still work after index is disabled
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+        stmt.setString(1, "a3");
+        stmt.setString(2, "x3");
+        stmt.setString(3, "3");
+        stmt.execute();
+        try {
+            conn.commit();
+            if (commitShouldFail) {
+                fail();
+            }
+        } catch (CommitException e) {
+            if (!commitShouldFail) {
+                throw e;
+            }
+            exceptions.add(e);
+        }
+    }
+
     public static class FailingRegionObserver extends SimpleRegionObserver {
         public static volatile boolean FAIL_WRITE = false;
-        public static final String INDEX_NAME = "IDX";
+        public static final String FAIL_INDEX_NAME = "FAIL_IDX";
+        public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
+
         @Override
         public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-            if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+            boolean throwException = false;
+            if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
+                    && FAIL_WRITE) {
+                throwException = true;
+            } else {
+                // When local index updates are atomic with data updates, testing a write failure to a local
+                // index won't make sense.
+                Mutation operation = miniBatchOp.getOperation(0);
+                if (FAIL_WRITE) {
+                    Map<byte[],List<Cell>>cellMap = operation.getFamilyCellMap();
+                    for (Map.Entry<byte[],List<Cell>> entry : cellMap.entrySet()) {
+                        byte[] family = entry.getKey();
+                        if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) {
+                            int regionStartKeyLen = c.getEnvironment().getRegionInfo().getStartKey().length;
+                            Cell firstCell = entry.getValue().get(0);
+                            short indexId = MetaDataUtil.getViewIndexIdDataType().getCodec().decodeShort(firstCell.getRowArray(), firstCell.getRowOffset() + regionStartKeyLen, SortOrder.getDefault());
+                            // Only throw for first local index as the test may have multiple local indexes
+                            if (indexId == Short.MIN_VALUE) {
+                                throwException = true;
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+            if (throwException) {
                 dropIndex(c);
                 throw new DoNotRetryIOException();
             }
-            Mutation operation = miniBatchOp.getOperation(0);
-            Set<byte[]> keySet = operation.getFamilyMap().keySet();
-            for(byte[] family: keySet) {
-                if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
-                    dropIndex(c);
-                    throw new DoNotRetryIOException();
-                }
-            }
         }
 
          private void dropIndex(ObserverContext<RegionCoprocessorEnvironment> c) {
@@ -416,7 +488,7 @@ public class MutableIndexFailureIT extends BaseTest {
                  Connection connection =
                          QueryUtil.getConnection(c.getEnvironment().getConfiguration());
                  connection.createStatement().execute(
-                     "DROP INDEX IF EXISTS " + INDEX_NAME + "_3" + " ON "
+                        "DROP INDEX IF EXISTS " + "B_" + FAIL_INDEX_NAME + " ON "
                              + fullTableName);
              } catch (ClassNotFoundException e) {
              } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
deleted file mode 100644
index cf3cb29..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.BaseOwnClusterIT;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.google.common.collect.Maps;
-/**
- * 
- * Test for failure of region server to write to index table.
- * For some reason dropping tables after running this test
- * fails unless it runs its own mini cluster. 
- * 
- * 
- * @since 2.1
- */
-
-@Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Parameterized.class)
-public class ReadOnlyIndexFailureIT extends BaseOwnClusterIT {
-    public static volatile boolean FAIL_WRITE = false;
-    public static final String INDEX_NAME = "IDX";
-
-    private String tableName;
-    private String indexName;
-    private String fullTableName;
-    private String fullIndexName;
-    private final boolean localIndex;
-
-    public ReadOnlyIndexFailureIT(boolean localIndex) {
-        this.localIndex = localIndex;
-        this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME;
-        this.indexName = INDEX_NAME;
-        this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-        this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-    }
-
-    @Parameters(name = "ReadOnlyIndexFailureIT_localIndex={0}") // name is used by failsafe as file name in reports
-    public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] { { false }, { true } });
-    }
-
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
-        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
-        serverProps.put("hbase.client.pause", "5000");
-        serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
-        serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true");
-        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
-        serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
-        serverProps.put("hbase.coprocessor.abortonerror", "false");
-        serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false");
-        NUM_SLAVES_BASE = 4;
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
-                ReadOnlyProps.EMPTY_PROPS);
-    }
-
-    @Test
-    public void testWriteFailureReadOnlyIndex() throws Exception {
-        helpTestWriteFailureReadOnlyIndex();
-    }
-
-    public void helpTestWriteFailureReadOnlyIndex() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = driver.connect(url, props)) {
-            String query;
-            ResultSet rs;
-            conn.setAutoCommit(false);
-            conn.createStatement().execute(
-                    "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-            query = "SELECT * FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-
-            FAIL_WRITE = false;
-            if(localIndex) {
-                conn.createStatement().execute(
-                        "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName 
-                        + " (v1) INCLUDE (v2)");
-            } else {
-                conn.createStatement().execute(
-                        "CREATE INDEX " + indexName + " ON " + fullTableName 
-                        + " (v1) INCLUDE (v2)");
-            }
-
-            query = "SELECT * FROM " + fullIndexName;
-            rs = conn.createStatement().executeQuery(query);
-            assertFalse(rs.next());
-
-            // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
-                    StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
-                    new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(indexName, rs.getString(3));
-            assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName 
-                    + " VALUES(?,?,?)");
-            stmt.setString(1, "1");
-            stmt.setString(2, "aaa");
-            stmt.setString(3, "a1");
-            stmt.execute();
-            conn.commit();
-
-            FAIL_WRITE = true;
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-            stmt.setString(1, "2");
-            stmt.setString(2, "bbb");
-            stmt.setString(3, "b2");
-            stmt.execute();
-            try {
-                conn.commit();
-                fail();
-            } catch (SQLException e) {
-            }
-
-            // Only successfully committed row should be seen
-            query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("aaa", rs.getString(1));
-            assertFalse(rs.next());
-            
-            // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, 
-                    StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
-                    new String[] { PTableType.INDEX.toString() });
-            assertTrue(rs.next());
-            assertEquals(indexName, rs.getString(3));
-            // the index is always active for tables upon index table write failure
-            assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-            assertFalse(rs.next());
-
-            // if the table is transactional the write to the index table will fail because the
-            // index has not been disabled
-            // Verify UPSERT on data table is blocked  after index write failed
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-            stmt.setString(1, "3");
-            stmt.setString(2, "ccc");
-            stmt.setString(3, "3c");
-            try {
-                stmt.execute();
-                /* Writes would be blocked */
-                conn.commit();
-                fail();
-            } catch (SQLException e) {
-                assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode());
-            }
-
-            FAIL_WRITE = false;
-            // Second attempt at writing will succeed
-            int retries = 0;
-            do {
-                Thread.sleep(5 * 1000); // sleep 5 secs
-                if(!hasIndexDisableTimestamp(conn, indexName)){
-                    break;
-                }
-                if (++retries == 5) {
-                    fail("Failed to rebuild index with allowed time");
-                }
-            } while(true);
-
-            // Verify UPSERT on data table still work after index table is recreated
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-            stmt.setString(1, "4");
-            stmt.setString(2, "ddd");
-            stmt.setString(3, "4d");
-            stmt.execute();
-            conn.commit();
-
-            // verify index table has data
-            query = "SELECT count(1) FROM " + fullIndexName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals(3, rs.getInt(1));
-            
-            query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("aaa", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("bbb", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("ddd", rs.getString(1));
-            assertFalse(rs.next());
-
-            query = "SELECT /*+ NO_INDEX*/ v1 FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery(query);
-            assertTrue(rs.next());
-            assertEquals("aaa", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("bbb", rs.getString(1));
-            assertTrue(rs.next());
-            assertEquals("ddd", rs.getString(1));
-            assertFalse(rs.next());
-        }
-    }
-    
-    private static boolean hasIndexDisableTimestamp(Connection conn, String indexName) throws SQLException {
-        ResultSet rs = conn.createStatement().executeQuery("SELECT " + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP +
-                " FROM " + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + 
-                " WHERE " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" +
-                " AND " + PhoenixDatabaseMetaData.TENANT_ID + " IS NULL" +
-                " AND " + PhoenixDatabaseMetaData.TABLE_SCHEM + " IS NULL" +
-                " AND " + PhoenixDatabaseMetaData.TABLE_NAME +  " = '" + indexName + "'");
-        assertTrue(rs.next());
-        long ts = rs.getLong(1);
-        return (!rs.wasNull() && ts > 0);
-    }
-
-    
-    public static class FailingRegionObserver extends SimpleRegionObserver {
-        @Override
-        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
-            if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
-                throw new DoNotRetryIOException();
-            }
-            Mutation operation = miniBatchOp.getOperation(0);
-            Set<byte[]> keySet = operation.getFamilyMap().keySet();
-            for(byte[] family: keySet) {
-                if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
-                    throw new DoNotRetryIOException();
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index cee545a..fe9be6e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -79,6 +79,7 @@ import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
@@ -533,6 +534,10 @@ public class DeleteCompiler {
             } else if (runOnServer) {
                 // TODO: better abstraction
                 Scan scan = context.getScan();
+                // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be
+                // future dated data row mutations that will get in the way of generating the
+                // correct index rows on replay.
+                scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
                 scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
     
                 // Build an ungrouped aggregate query: select COUNT(*) from <table> where <where>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index bbbd483..e5307d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -713,6 +713,10 @@ public class UpsertCompiler {
                      */
                     final StatementContext context = queryPlan.getContext();
                     final Scan scan = context.getScan();
+                    // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be
+                    // future dated data row mutations that will get in the way of generating the
+                    // correct index rows on replay.
+                    scan.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
                     scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable));
                     scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
                     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 4ad3a8c..d36bd7e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3418,7 +3418,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex);
                     long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
                             newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
-                    if(curTimeStampVal > 0 && curTimeStampVal < newDisableTimeStamp){
+                    // We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative)
+                    // from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to
+                    // drive the partial index rebuild rather than update it with each attempt to update the index
+                    // when a new data table write occurs.
+                    if (curTimeStampVal != 0 && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) {
                         // not reset disable timestamp
                         newKVs.remove(disableTimeStampKVIndex);
                         disableTimeStampKVIndex = -1;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 9482d37..ce42de6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.coprocessor;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
-import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -74,7 +73,6 @@ import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PLong;
@@ -100,7 +98,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
     protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
     private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
     private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
-    private boolean blockWriteRebuildIndex = false;
     private static Map<PName, Long> batchExecutedPerTableMap = new HashMap<PName, Long>();
 
     @Override
@@ -128,8 +125,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
             QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD);
         rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
             QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
-        blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
-        	QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
         
     }
     
@@ -172,7 +167,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
         t.setDaemon(true);
         t.start();
 
-        if (!enableRebuildIndex && !blockWriteRebuildIndex) {
+        if (!enableRebuildIndex) {
             LOG.info("Failure Index Rebuild is skipped by configuration.");
             return;
         }
@@ -229,7 +224,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                 Scan scan = new Scan();
                 SingleColumnValueFilter filter = new SingleColumnValueFilter(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
-                    CompareFilter.CompareOp.GREATER, PLong.INSTANCE.toBytes(0L));
+                    CompareFilter.CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0L));
                 filter.setFilterIfMissing(true);
                 scan.setFilter(filter);
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
@@ -240,10 +235,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
-                PreparedStatement updateDisabledTimeStampSmt = null;
 
                 Map<PTable, List<PTable>> dataTableToIndexesMap = null;
-                MetaDataClient client = null;
                 boolean hasMore = false;
                 List<Cell> results = new ArrayList<Cell>();
                 scanner = this.env.getRegion().getScanner(scan);
@@ -259,17 +252,10 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                             PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
 
-                    if (disabledTimeStamp == null || disabledTimeStamp.length == 0 || (indexState != null
-                            && PIndexState.BUILDING == PIndexState.fromSerializedValue(Bytes.toString(indexState)))) {
-                        // Don't rebuild the building index , because they are marked for aysnc
+                    if (disabledTimeStamp == null || disabledTimeStamp.length == 0) {
                         continue;
                     }
 
-                    // disableTimeStamp has to be a positive value
-                    long disabledTimeStampVal = PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, SortOrder.getDefault());
-                    if (disabledTimeStampVal <= 0) {
-                        continue;
-                    }
                     byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
                     if ((dataTable == null || dataTable.length == 0) || (indexState == null || indexState.length == 0)) {
@@ -302,7 +288,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     	// don't run a second index populations upsert select 
                         props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); 
                         conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class);
-                        client = new MetaDataClient(conn);
                         dataTableToIndexesMap = Maps.newHashMap();
                     }
                     String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
@@ -331,7 +316,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                         dataTableToIndexesMap.put(dataPTable, indexesToPartiallyRebuild);
                     }
                     LOG.debug("We have found " + indexPTable.getIndexState() + " Index:" + indexPTable.getName()
-                            + " on data table:" + dataPTable.getName() + " which was disabled at "
+                            + " on data table:" + dataPTable.getName() + " which failed to be updated at "
                             + indexPTable.getIndexDisableTimestamp());
                     indexesToPartiallyRebuild.add(indexPTable);
                 } while (hasMore);
@@ -349,9 +334,22 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 							long earliestDisableTimestamp = Long.MAX_VALUE;
 							List<IndexMaintainer> maintainers = Lists
 									.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+							int signOfDisableTimeStamp = 0;
 							for (PTable index : indexesToPartiallyRebuild) {
+					            // We need a way of differentiating the block writes to data table case from
+					            // the leave index active case. In either case, we need to know the time stamp
+					            // at which writes started failing so we can rebuild from that point. If we
+					            // keep the index active *and* have a positive INDEX_DISABLE_TIMESTAMP_BYTES,
+					            // then writes to the data table will be blocked (this is client side logic
+					            // and we can't change this in a minor release). So we use the sign of the
+					            // time stamp to differentiate.
 								long disabledTimeStampVal = index.getIndexDisableTimestamp();
-								if (disabledTimeStampVal > 0) {
+								if (disabledTimeStampVal != 0) {
+                                    if (signOfDisableTimeStamp != 0 && signOfDisableTimeStamp != Long.signum(disabledTimeStampVal)) {
+                                        LOG.warn("Found unexpected mix of signs with INDEX_DISABLE_TIMESTAMP for " + dataPTable.getName().getString() + " with " + indexesToPartiallyRebuild); 
+                                    }
+								    signOfDisableTimeStamp = Long.signum(disabledTimeStampVal);
+	                                disabledTimeStampVal = Math.abs(disabledTimeStampVal);
 									if (disabledTimeStampVal < earliestDisableTimestamp) {
 										earliestDisableTimestamp = disabledTimeStampVal;
 									}
@@ -409,8 +407,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 									batchExecutedPerTableMap.remove(dataPTable.getName());
                                     LOG.info("Making Index:" + indexPTable.getTableName() + " active after rebuilding");
 								} else {
-
-									updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime, metaTable);
+								    // Maintain sign of INDEX_DISABLE_TIMESTAMP (see comment above)
+									updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime * signOfDisableTimeStamp, metaTable);
 									Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName());
 									if (noOfBatches == null) {
 										noOfBatches = 0l;
@@ -507,7 +505,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
 		put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
 				PLong.INSTANCE.toBytes(disabledTimestamp));
 		metaTable.checkAndPut(indexTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-				PhoenixDatabaseMetaData.INDEX_STATE_BYTES, CompareOp.EQUAL, PIndexState.INACTIVE.getSerializedBytes(),
+				PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, CompareOp.NOT_EQUAL, PLong.INSTANCE.toBytes(0),
 				put);
 
 	}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 49ef884..a056807 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -379,6 +379,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         
         RegionScanner theScanner = s;
         
+        boolean replayMutations = scan.getAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
         byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
         byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
         List<Expression> selectExpressions = null;
@@ -610,6 +611,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             Cell firstKV = results.get(0);
                             Delete delete = new Delete(firstKV.getRowArray(),
                                 firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+                            if (replayMutations) {
+                                delete.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+                            }
                             mutations.add(delete);
                             // force tephra to ignore this deletes
                             delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
@@ -661,6 +665,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                 }
                             }
                             for (Mutation mutation : row.toRowMutations()) {
+                                if (replayMutations) {
+                                    mutation.setAttribute(IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+                                }
                                 mutations.add(mutation);
                             }
                             for (i = 0; i < selectExpressions.size(); i++) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 2836c45..35ba187 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -182,6 +182,8 @@ public enum SQLExceptionCode {
      ROWTIMESTAMP_COL_INVALID_TYPE(530, "42907", "A column can be added as ROW_TIMESTAMP only if it is of type DATE, BIGINT, TIME OR TIMESTAMP."),
      ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW(531, "42908", "Declaring a column as row_timestamp is not allowed for views."),
      INVALID_SCN(532, "42909", "Value of SCN cannot be less than zero."),
+     INVALID_REPLAY_AT(533, "42910", "Value of REPLAY_AT cannot be less than zero."),
+     UNEQUAL_SCN_AND_REPLAY_AT(534, "42911", "If both specified, values of CURRENT_SCN and REPLAY_AT must be equal."),
      /**
      * HBase and Phoenix specific implementation defined sub-classes.
      * Column family related exceptions.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index a9d8311..b0d22d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -24,10 +24,16 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 public class CommitException extends SQLException {
     private static final long serialVersionUID = 2L;
     private final int[] uncommittedStatementIndexes;
+    private final long serverTimestamp;
 
-    public CommitException(Exception e, int[] uncommittedStatementIndexes) {
+    public CommitException(Exception e, int[] uncommittedStatementIndexes, long serverTimestamp) {
         super(e);
         this.uncommittedStatementIndexes = uncommittedStatementIndexes;
+        this.serverTimestamp = serverTimestamp;
+    }
+    
+    public long getServerTimestamp() {
+        return this.serverTimestamp;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d32199b..6144c7f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -82,6 +82,7 @@ import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.trace.util.Tracing;
 import org.apache.phoenix.util.ByteUtil;
@@ -673,6 +674,14 @@ public class MutationState implements SQLCloseable {
                 rowMutationsPertainingToIndex = rowMutations;
             }
             mutationList.addAll(rowMutations);
+            if (connection.isReplayMutations()) {
+                // Propagate IGNORE_NEWER_MUTATIONS when replaying mutations since there will be
+                // future dated data row mutations that will get in the way of generating the
+                // correct index rows on replay.
+                for (Mutation mutation : rowMutations) {
+                    mutation.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+                }
+            }
             if (mutationsPertainingToIndex != null) mutationsPertainingToIndex
                     .addAll(rowMutationsPertainingToIndex);
         }
@@ -1030,6 +1039,7 @@ public class MutationState implements SQLCloseable {
                     joinMutationState(new TableRef(tableRef), valuesMap, txMutations);
                 }
             }
+            long serverTimestamp = HConstants.LATEST_TIMESTAMP;
             Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator = physicalTableMutationMap.entrySet().iterator();
             while (mutationsIterator.hasNext()) {
                 Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next();
@@ -1106,6 +1116,7 @@ public class MutationState implements SQLCloseable {
                         // Remove batches as we process them
                         mutations.remove(origTableRef);
                     } catch (Exception e) {
+                        serverTimestamp = ServerUtil.parseServerTimestamp(e);
                         SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
                         if (inferredE != null) {
                             if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
@@ -1127,7 +1138,7 @@ public class MutationState implements SQLCloseable {
                         }
                         // Throw to client an exception that indicates the statements that
                         // were not committed successfully.
-                        sqlE = new CommitException(e, getUncommittedStatementIndexes());
+                        sqlE = new CommitException(e, getUncommittedStatementIndexes(), serverTimestamp);
                     } finally {
                         try {
                             if (cache!=null) 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index 831aa16..a037e92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -61,6 +61,10 @@ public class IndexWriter implements Stoppable {
     this(getCommitter(env), getFailurePolicy(env), env, name);
   }
 
+  public IndexWriter(IndexFailurePolicy failurePolicy, RegionCoprocessorEnvironment env, String name) throws IOException {
+      this(getCommitter(env), failurePolicy, env, name);
+    }
+
   public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
     Configuration conf = env.getConfiguration();
     try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f1ecc97/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
new file mode 100644
index 0000000..edacd3a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/LeaveIndexActiveFailurePolicy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.util.ServerUtil;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * 
+ * Implementation of IndexFailurePolicy which takes no action when an
+ * index cannot be updated. As with the standard flow of control, an
+ * exception will still be thrown back to the client. Using this failure
+ * policy means that the action to take upon failure is completely up
+ * to the client.
+ *
+ */
+public class LeaveIndexActiveFailurePolicy implements IndexFailurePolicy {
+
+    @Override
+    public boolean isStopped() {
+        return false;
+    }
+
+    @Override
+    public void stop(String arg0) {
+    }
+
+    @Override
+    public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+    }
+
+    @Override
+    public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause)
+            throws IOException {
+        // get timestamp of first cell
+        long ts = attempted.values().iterator().next().getFamilyCellMap().values().iterator().next().get(0).getTimestamp();
+        throw ServerUtil.wrapInDoNotRetryIOException("Unable to update the following indexes: " + attempted.keySet(), cause, ts);
+    }
+
+}