You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/01/27 01:18:50 UTC

[06/26] phoenix git commit: PHOENIX-2890 Extend IndexTool to allow incremental index rebuilds

PHOENIX-2890 Extend IndexTool to allow incremental index rebuilds


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/83827cd8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/83827cd8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/83827cd8

Branch: refs/heads/calcite
Commit: 83827cd8c2876c6b6dccf3a5678889b40a76261b
Parents: 70dc383
Author: Ankit Singhal <an...@gmail.com>
Authored: Mon Dec 26 11:59:19 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Mon Dec 26 11:59:19 2016 +0530

----------------------------------------------------------------------
 .../phoenix/end2end/AutomaticRebuildIT.java     | 219 +++++++++
 .../end2end/IndexToolForPartialBuildIT.java     | 298 ++++++++++++
 ...olForPartialBuildWithNamespaceEnabledIT.java |  70 +++
 .../phoenix/end2end/index/IndexMetadataIT.java  |  58 +++
 .../end2end/index/MutableIndexFailureIT.java    |  10 +-
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |   4 +-
 .../coprocessor/MetaDataEndpointImpl.java       |   9 +-
 .../coprocessor/MetaDataRegionObserver.java     | 291 +++++++-----
 .../phoenix/exception/SQLExceptionCode.java     |   3 +-
 .../index/PhoenixIndexFailurePolicy.java        |  52 +--
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   2 +
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   8 +-
 .../phoenix/mapreduce/index/IndexTool.java      | 455 +++++++++++++------
 .../phoenix/mapreduce/index/IndexToolUtil.java  |   6 +-
 .../index/PhoenixIndexImportDirectMapper.java   |   2 +-
 .../index/PhoenixIndexPartialBuildMapper.java   | 182 ++++++++
 .../util/PhoenixConfigurationUtil.java          |  31 ++
 .../phoenix/parse/AlterIndexStatement.java      |   8 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |   6 +-
 .../org/apache/phoenix/query/QueryServices.java |   4 +
 .../apache/phoenix/schema/MetaDataClient.java   |  47 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  61 ++-
 22 files changed, 1504 insertions(+), 322 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
new file mode 100644
index 0000000..cbb7745
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link AutomaticRebuildIT}
+ */
+@RunWith(Parameterized.class)
+public class AutomaticRebuildIT extends BaseOwnClusterIT {
+
+	private final boolean localIndex;
+	protected boolean isNamespaceEnabled = false;
+	protected final String tableDDLOptions;
+
+	public AutomaticRebuildIT(boolean localIndex) {
+		this.localIndex = localIndex;
+		StringBuilder optionBuilder = new StringBuilder();
+		optionBuilder.append(" SPLIT ON(1,2)");
+		this.tableDDLOptions = optionBuilder.toString();
+	}
+
+	@BeforeClass
+	public static void doSetup() throws Exception {
+		Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+		serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+		serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+		serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
+		serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+		serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+		serverProps.put("hbase.client.pause", "5000");
+		serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "1000");
+		serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, "5");
+		Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+		setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+				new ReadOnlyProps(clientProps.entrySet().iterator()));
+	}
+
+	@Parameters(name = "localIndex = {0}")
+	public static Collection<Boolean[]> data() {
+		return Arrays.asList(new Boolean[][] { { false }, { true } });
+	}
+
+	@Test
+	public void testSecondaryAutomaticRebuildIndex() throws Exception {
+		String schemaName = generateUniqueName();
+		String dataTableName = generateUniqueName();
+		String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+		final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+		props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
+		props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
+		final Connection conn = DriverManager.getConnection(getUrl(), props);
+		Statement stmt = conn.createStatement();
+		try {
+			if (isNamespaceEnabled) {
+				conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+			}
+			stmt.execute(String.format(
+					"CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
+					fullTableName, tableDDLOptions));
+			String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
+			PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+			FailingRegionObserver.FAIL_WRITE = false;
+			// insert two rows
+			upsertRow(stmt1, 1000);
+			upsertRow(stmt1, 2000);
+
+			conn.commit();
+			stmt.execute(String.format("CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME),11,'x')||'_xyz') ",
+					(localIndex ? "LOCAL" : ""), indxTable, fullTableName));
+			FailingRegionObserver.FAIL_WRITE = true;
+			upsertRow(stmt1, 3000);
+			upsertRow(stmt1, 4000);
+			upsertRow(stmt1, 5000);
+			try {
+				conn.commit();
+				fail();
+			} catch (SQLException e) {
+			} catch (Exception e) {
+			}
+			FailingRegionObserver.FAIL_WRITE = false;
+			ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
+					new String[] { PTableType.INDEX.toString() });
+			assertTrue(rs.next());
+			assertEquals(indxTable, rs.getString(3));
+			String indexState = rs.getString("INDEX_STATE");
+			assertEquals(PIndexState.DISABLE.toString(), indexState);
+			assertFalse(rs.next());
+			upsertRow(stmt1, 6000);
+			upsertRow(stmt1, 7000);
+			conn.commit();
+			int maxTries = 4, nTries = 0;
+			boolean isInactive = false;
+			do {
+				rs = conn.createStatement()
+						.executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + ","
+								+ PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
+								+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " ("
+								+ PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
+								+ PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
+								+ PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
+				rs.next();
+				if (PIndexState.INACTIVE.getSerializedValue().equals(rs.getString(1)) && rs.getLong(2) > 3000) {
+					isInactive = true;
+					break;
+				}
+				Thread.sleep(10 * 1000); // sleep 10 secs
+			} while (++nTries < maxTries);
+			assertTrue(isInactive);
+			nTries = 0;
+			boolean isActive = false;
+			do {
+				Thread.sleep(15 * 1000); // sleep 15 secs
+				rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
+						new String[] { PTableType.INDEX.toString() });
+				assertTrue(rs.next());
+				if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
+					isActive = true;
+					break;
+				}
+			} while (++nTries < maxTries);
+			assertTrue(isActive);
+
+		} finally {
+			conn.close();
+		}
+	}
+
+	public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+		// insert row
+		stmt.setInt(1, i);
+		stmt.setString(2, "uname" + String.valueOf(i));
+		stmt.setInt(3, 95050 + i);
+		stmt.executeUpdate();
+	}
+
+	public static class FailingRegionObserver extends SimpleRegionObserver {
+		public static volatile boolean FAIL_WRITE = false;
+		public static final String INDEX_NAME = "IDX";
+
+		@Override
+		public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+				MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+			if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+				throw new DoNotRetryIOException();
+			}
+			Mutation operation = miniBatchOp.getOperation(0);
+			Set<byte[]> keySet = operation.getFamilyMap().keySet();
+			for (byte[] family : keySet) {
+				if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
+					throw new DoNotRetryIOException();
+				}
+			}
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
new file mode 100644
index 0000000..116c47f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link IndexToolForPartialBuildIT}
+ */
+@RunWith(Parameterized.class)
+public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
+    
+    private final boolean localIndex;
+    protected boolean isNamespaceEnabled = false;
+    protected final String tableDDLOptions;
+    
+    public IndexToolForPartialBuildIT(boolean localIndex) {
+
+        this.localIndex = localIndex;
+        StringBuilder optionBuilder = new StringBuilder();
+        optionBuilder.append(" SPLIT ON(1,2)");
+        this.tableDDLOptions = optionBuilder.toString();
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+        serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
+        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+        serverProps.put("hbase.client.pause", "5000");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.FALSE.toString());
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+    
+    @Parameters(name="localIndex = {0}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false},{ true }
+           });
+    }
+    
+    @Test
+    public void testSecondaryIndex() throws Exception {
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+        final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+        props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
+        props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
+        final Connection conn = DriverManager.getConnection(getUrl(), props);
+        Statement stmt = conn.createStatement();
+        try {
+            if (isNamespaceEnabled) {
+                conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+            }
+            stmt.execute(
+                    String.format("CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
+                            fullTableName, tableDDLOptions));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            FailingRegionObserver.FAIL_WRITE = false;
+            // insert two rows
+            upsertRow(stmt1, 1000);
+            upsertRow(stmt1, 2000);
+
+            conn.commit();
+            stmt.execute(String.format("CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME),11,'x')||'_xyz') ",
+                    (localIndex ? "LOCAL" : ""), indxTable, fullTableName));
+            FailingRegionObserver.FAIL_WRITE = true;
+            upsertRow(stmt1, 3000);
+            upsertRow(stmt1, 4000);
+            upsertRow(stmt1, 5000);
+            try {
+                conn.commit();
+                fail();
+            } catch (SQLException e) {} catch (Exception e) {}
+            conn.createStatement()
+                    .execute(String.format("ALTER INDEX %s on %s REBUILD ASYNC", indxTable, fullTableName));
+            
+            FailingRegionObserver.FAIL_WRITE = false;
+            ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
+                    new String[] { PTableType.INDEX.toString() });
+            assertTrue(rs.next());
+            assertEquals(indxTable, rs.getString(3));
+            String indexState = rs.getString("INDEX_STATE");
+            assertEquals(PIndexState.BUILDING.toString(), indexState);            
+            assertFalse(rs.next());
+            upsertRow(stmt1, 6000);
+            upsertRow(stmt1, 7000);
+            conn.commit();
+            
+			rs = conn.createStatement()
+					.executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + ","
+							+ PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
+							+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " ("
+							+ PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
+							+ PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
+							+ PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
+			rs.next();
+            PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
+            assertEquals(PIndexState.BUILDING, pindexTable.getIndexState());
+            assertEquals(rs.getLong(1), pindexTable.getTimeStamp());
+            //assert disabled timestamp
+            assertEquals(rs.getLong(2), 3000);
+
+            String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName);
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+
+            // assert we are pulling from data table.
+			assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, false, isNamespaceEnabled);
+
+            rs = stmt1.executeQuery(selectSql);
+            for (int i = 1; i <= 7; i++) {
+                assertTrue(rs.next());
+                assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
+            }
+
+            // Validate Index table data till disabled timestamp
+            rs = stmt1.executeQuery(String.format("SELECT * FROM %s", SchemaUtil.getTableName(schemaName, indxTable)));
+            for (int i = 1; i <= 2; i++) {
+                assertTrue(rs.next());
+                assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
+            }
+            assertFalse(rs.next());
+            // run the index MR job.
+            final IndexTool indexingTool = new IndexTool();
+            Configuration conf = new Configuration(getUtility().getConfiguration());
+            conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
+            indexingTool.setConf(conf);
+
+            final String[] cmdArgs = getArgValues(schemaName, dataTableName);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+
+            // insert two more rows
+            upsertRow(stmt1, 8000);
+            upsertRow(stmt1, 9000);
+            conn.commit();
+
+            // assert we are pulling from index table.
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(actualExplainPlan, schemaName, dataTableName, indxTable, localIndex, isNamespaceEnabled);
+
+            rs = stmt.executeQuery(selectSql);
+
+            for (int i = 1; i <= 9; i++) {
+                assertTrue(rs.next());
+                assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
+            }
+
+            assertFalse(rs.next());
+
+           // conn.createStatement().execute(String.format("DROP INDEX  %s ON %s", indxTable, fullTableName));
+        } finally {
+            conn.close();
+        }
+    }
+    
+	public static void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
+			String indxTable, boolean isLocal, boolean isNamespaceMapped) {
+
+		String expectedExplainPlan = "";
+		if (indxTable != null) {
+			if (isLocal) {
+				final String localIndexName = SchemaUtil
+						.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable), isNamespaceMapped,
+								PTableType.INDEX)
+						.getString();
+				expectedExplainPlan = String.format("CLIENT PARALLEL 3-WAY RANGE SCAN OVER %s [1]", localIndexName);
+			} else {
+				expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
+						SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, indxTable),
+								isNamespaceMapped, PTableType.INDEX));
+			}
+		} else {
+			expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
+					SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable),
+							isNamespaceMapped, PTableType.TABLE));
+		}
+		assertTrue(actualExplainPlan.contains(expectedExplainPlan));
+	}
+
+    public String[] getArgValues(String schemaName, String dataTable) {
+        final List<String> args = Lists.newArrayList();
+        if (schemaName!=null) {
+            args.add("-s");
+            args.add(schemaName);
+        }
+        args.add("-dt");
+        args.add(dataTable);
+        args.add("-pr");
+        args.add("-op");
+        args.add("/tmp/output/partialTable_"+localIndex);
+        return args.toArray(new String[0]);
+    }
+
+    public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+        // insert row
+        stmt.setInt(1, i);
+        stmt.setString(2, "uname" + String.valueOf(i));
+        stmt.setInt(3, 95050 + i);
+        stmt.executeUpdate();
+    }
+    
+
+    public static class FailingRegionObserver extends SimpleRegionObserver {
+        public static volatile boolean FAIL_WRITE = false;
+        public static final String INDEX_NAME = "IDX";
+        @Override
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+            if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+                throw new DoNotRetryIOException();
+            }
+            Mutation operation = miniBatchOp.getOperation(0);
+            Set<byte[]> keySet = operation.getFamilyMap().keySet();
+            for(byte[] family: keySet) {
+                if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
+                    throw new DoNotRetryIOException();
+                }
+            }
+        }
+
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
new file mode 100644
index 0000000..5e16b05
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled}
+ */
+@RunWith(Parameterized.class)
+public class IndexToolForPartialBuildWithNamespaceEnabled extends IndexToolForPartialBuildIT {
+    
+    
+    public IndexToolForPartialBuildWithNamespaceEnabled(boolean localIndex, boolean isNamespaceEnabled) {
+        super(localIndex);
+        this.isNamespaceEnabled=isNamespaceEnabled;
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+        serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+        serverProps.put("hbase.client.pause", "5000");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "2000");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
+        serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+    
+    @Parameters(name="localIndex = {0} , isNamespaceEnabled = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, true},{ true, false }
+           });
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index f0c670b..63a6bd6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -31,6 +31,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.sql.Types;
 import java.util.Properties;
 
@@ -43,10 +44,13 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnAlreadyExistsException;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
@@ -216,6 +220,15 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
             assertFalse(rs.next());
             
             assertActiveIndex(conn, INDEX_DATA_SCHEMA, indexDataTable);
+            
+            ddl = "ALTER INDEX " + indexName + " ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + indexDataTable + " REBUILD ASYNC";
+            conn.createStatement().execute(ddl);
+            // Verify the metadata for index is correct.
+            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(INDEX_DATA_SCHEMA), indexName , new String[] {PTableType.INDEX.toString()});
+            assertTrue(rs.next());
+            assertEquals(indexName , rs.getString(3));
+            assertEquals(PIndexState.BUILDING.toString(), rs.getString("INDEX_STATE"));
+            assertFalse(rs.next());
 
             ddl = "DROP INDEX " + indexName + " ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + indexDataTable;
             stmt = conn.prepareStatement(ddl);
@@ -568,4 +581,49 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
         assertTrue(d2.after(d1));
         assertFalse(rs.next());
     }
+    
+    @Test
+    public void testAsyncRebuildTimestamp() throws Exception {
+        long startTimestamp = System.currentTimeMillis();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        String testTable = generateUniqueName();
+
+
+        String ddl = "create table " + testTable  + " (k varchar primary key, v1 varchar, v2 varchar, v3 varchar)";
+        Statement stmt = conn.createStatement();
+        stmt.execute(ddl);
+        String indexName = "R_ASYNCIND_" + generateUniqueName();
+        
+        ddl = "CREATE INDEX " + indexName + "1 ON " + testTable  + " (v1) ";
+        stmt.execute(ddl);
+        ddl = "CREATE INDEX " + indexName + "2 ON " + testTable  + " (v2) ";
+        stmt.execute(ddl);
+        ddl = "CREATE INDEX " + indexName + "3 ON " + testTable  + " (v3)";
+        stmt.execute(ddl);
+        conn.createStatement().execute("ALTER INDEX "+indexName+"1 ON " + testTable +" DISABLE ");
+        conn.createStatement().execute("ALTER INDEX "+indexName+"2 ON " + testTable +" REBUILD ");
+        conn.createStatement().execute("ALTER INDEX "+indexName+"3 ON " + testTable +" REBUILD ASYNC");
+        
+        ResultSet rs = conn.createStatement().executeQuery(
+            "select table_name, " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " +
+            "from system.catalog (" + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + ") " +
+            "where " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " !=0 and table_name like 'R_ASYNCIND_%' " +
+            "order by table_name");
+        assertTrue(rs.next());
+        assertEquals(indexName + "3", rs.getString(1));
+        long asyncTimestamp = rs.getLong(2);
+		assertTrue("Async timestamp is recent timestamp", asyncTimestamp > startTimestamp);
+        PTable table = PhoenixRuntime.getTable(conn, indexName+"3");
+        assertEquals(table.getTimeStamp(), asyncTimestamp);
+        assertFalse(rs.next());
+        conn.createStatement().execute("ALTER INDEX "+indexName+"3 ON " + testTable +" DISABLE");
+        rs = conn.createStatement().executeQuery(
+                "select table_name, " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " +
+                "from system.catalog (" + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + ") " +
+                "where " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " !=0 and table_name like 'ASYNCIND_%' " +
+                "order by table_name" );
+        assertFalse(rs.next());
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 687b2c2..e9205c9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -102,7 +102,7 @@ public class MutableIndexFailureIT extends BaseTest {
         this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "");
         this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "")
                 + (isNamespaceMapped ? "_NM" : "");
-        this.indexName = INDEX_NAME;
+        this.indexName = FailingRegionObserver.INDEX_NAME;
         fullTableName = SchemaUtil.getTableName(schema, tableName);
         this.fullIndexName = SchemaUtil.getTableName(schema, indexName);
         this.isNamespaceMapped = isNamespaceMapped;
@@ -155,7 +155,7 @@ public class MutableIndexFailureIT extends BaseTest {
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
-            FAIL_WRITE = false;
+            FailingRegionObserver.FAIL_WRITE = false;
             conn.createStatement().execute(
                     "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
             // Create other index which should be local/global if the other index is global/local to
@@ -202,7 +202,7 @@ public class MutableIndexFailureIT extends BaseTest {
             assertEquals("z", rs.getString(2));
             assertFalse(rs.next());
 
-            FAIL_WRITE = true;
+            FailingRegionObserver.FAIL_WRITE = true;
             updateTable(conn, fullTableName);
             updateTable(conn, secondTableName);
             // Verify the metadata for index is correct.
@@ -259,7 +259,7 @@ public class MutableIndexFailureIT extends BaseTest {
             }
 
             // re-enable index table
-            FAIL_WRITE = false;
+            FailingRegionObserver.FAIL_WRITE = false;
             waitForIndexToBeActive(conn,indexName);
             waitForIndexToBeActive(conn,indexName+"_2");
             waitForIndexToBeActive(conn,secondIndexName);
@@ -391,6 +391,8 @@ public class MutableIndexFailureIT extends BaseTest {
     }
 
     public static class FailingRegionObserver extends SimpleRegionObserver {
+        public static volatile boolean FAIL_WRITE = false;
+        public static final String INDEX_NAME = "IDX";
         @Override
         public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
             if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 3e09766..07a51ce 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -567,8 +567,8 @@ drop_index_node returns [DropIndexStatement ret]
 
 // Parse a alter index statement
 alter_index_node returns [AlterIndexStatement ret]
-    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE)
-      {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText()))); }
+    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE) (async=ASYNC)?
+      {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), async!=null); }
     ;
 
 // Parse a trace statement.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 38dc494..a0681fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3296,11 +3296,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             List<Cell> newKVs = tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES);
             Cell newKV = null;
             int disableTimeStampKVIndex = -1;
+            int indexStateKVIndex = 0;
             int index = 0;
             for(Cell cell : newKVs){
                 if(Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
                       INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){
                   newKV = cell;
+                  indexStateKVIndex = index;
                 } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
                   INDEX_DISABLE_TIMESTAMP_BYTES, 0, INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0){
                   disableTimeStampKVIndex = index;
@@ -3378,11 +3380,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                 if ((currentState == PIndexState.UNUSABLE && newState == PIndexState.ACTIVE)
                         || (currentState == PIndexState.ACTIVE && newState == PIndexState.UNUSABLE)) {
                     newState = PIndexState.INACTIVE;
-                    newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
+                    newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                         INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
                 } else if (currentState == PIndexState.INACTIVE && newState == PIndexState.USABLE) {
                     newState = PIndexState.ACTIVE;
-                    newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
+                    newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
                         INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
                 }
 
@@ -3414,7 +3416,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                     if(dataTableKey != null) {
                         metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
                     }
-                    if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1) {
+                    if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1
+                            || currentState == PIndexState.DISABLE || newState == PIndexState.BUILDING) {
                         returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index e790b59..a60de03 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -17,12 +17,19 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
+import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -45,6 +52,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -69,6 +77,7 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
@@ -97,6 +106,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
     private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
     private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
     private boolean blockWriteRebuildIndex = false;
+    private static Map<PName, Long> batchExecutedPerTableMap = new HashMap<PName, Long>();
 
     @Override
     public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -125,6 +135,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
             QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
         blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
         	QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
+        
     }
     
     @Override
@@ -195,9 +206,15 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
         // running
         private final static AtomicInteger inProgress = new AtomicInteger(0);
         RegionCoprocessorEnvironment env;
+        private long rebuildIndexBatchSize = HConstants.LATEST_TIMESTAMP;
+        private long configuredBatches = 10;
 
         public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
             this.env = env;
+            this.rebuildIndexBatchSize = env.getConfiguration().getLong(
+                    QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, HConstants.LATEST_TIMESTAMP);
+            this.configuredBatches = env.getConfiguration().getLong(
+                    QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, configuredBatches);
         }
 
         @Override
@@ -228,6 +245,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                 scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                     PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+                PreparedStatement updateDisabledTimeStampSmt = null;
 
                 Map<PTable, List<PTable>> dataTableToIndexesMap = null;
                 MetaDataClient client = null;
@@ -243,8 +261,13 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     Result r = Result.create(results);
                     byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+                    byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                            PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
+
+                    if (disabledTimeStamp == null || disabledTimeStamp.length == 0 || (indexState != null
+                            && PIndexState.BUILDING == PIndexState.fromSerializedValue(Bytes.toString(indexState)))) {
 
-                    if (disabledTimeStamp == null || disabledTimeStamp.length == 0) {
+                        // Don't rebuild the building index , because they are marked for aysnc
                         continue;
                     }
 
@@ -255,8 +278,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     }
                     byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
-                    byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
-                        PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
                     if ((dataTable == null || dataTable.length == 0) || (indexState == null || indexState.length == 0)) {
                         // data table name can't be empty
                         continue;
@@ -317,109 +338,169 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     indexesToPartiallyRebuild.add(indexPTable);
                 } while (hasMore);
 
-                if (dataTableToIndexesMap != null) {
-                    long overlapTime = env.getConfiguration().getLong(
-                            QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
-                            QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
-                    for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet()) {
-                        PTable dataPTable = entry.getKey();
-                        List<PTable> indexesToPartiallyRebuild = entry.getValue();
-                        try {
-                            long earliestDisableTimestamp = Long.MAX_VALUE;
-                            List<IndexMaintainer> maintainers = Lists
-                                    .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
-                            for (PTable index : indexesToPartiallyRebuild) {
-                                long disabledTimeStampVal = index.getIndexDisableTimestamp();
-                                if (disabledTimeStampVal > 0) {
-                                    if (disabledTimeStampVal < earliestDisableTimestamp) {
-                                        earliestDisableTimestamp = disabledTimeStampVal;
-                                    }
-    
-                                    maintainers.add(index.getIndexMaintainer(dataPTable, conn));
-                                }
-                            }
-                            // No indexes are disabled, so skip this table
-                            if (earliestDisableTimestamp == Long.MAX_VALUE) {
-                                continue;
-                            }
-
-                            long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
-                            LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild
-                                    + " from timestamp=" + timeStamp);
-                            TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false);
-                            // TODO Need to set high timeout 
-                            PostDDLCompiler compiler = new PostDDLCompiler(conn);
-                            MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null,
-                                    HConstants.LATEST_TIMESTAMP);
-                            Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
-                                    maintainers);
-                            dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
-                            dataTableScan.setCacheBlocks(false);
-                            dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
-                            
-                            ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
-                                    ByteUtil.EMPTY_BYTE_ARRAY);
-                            IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild,
-                                    conn);
-                            byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-
-                            dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                            MutationState mutationState = plan.execute();
-                            long rowCount = mutationState.getUpdateCount();
-                            LOG.info(rowCount + " rows of index which are rebuild");
-                            for (PTable indexPTable : indexesToPartiallyRebuild) {
-                                String indexTableFullName = SchemaUtil.getTableName(indexPTable.getSchemaName()
-                                        .getString(), indexPTable.getTableName().getString());
-                                updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE);
-                            }
-                        } catch (Exception e) { // Log, but try next table's indexes
-                            LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
-                                    + ". Will try again next on next scheduled invocation.", e);
-                        }
-                    }
-                }
-            } catch (Throwable t) {
-                LOG.warn("ScheduledBuildIndexTask failed!", t);
-            } finally {
-                inProgress.decrementAndGet();
-                if (scanner != null) {
-                    try {
-                        scanner.close();
-                    } catch (IOException ignored) {
-                        LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored);
-                    }
-                }
-                if (conn != null) {
-                    try {
-                        conn.close();
-                    } catch (SQLException ignored) {
-                        LOG.debug("ScheduledBuildIndexTask can't close connection", ignored);
-                    }
-                }
-            }
+				if (dataTableToIndexesMap != null) {
+					long overlapTime = env.getConfiguration().getLong(
+							QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
+							QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
+					for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet()) {
+						PTable dataPTable = entry.getKey();
+						List<PTable> indexesToPartiallyRebuild = entry.getValue();
+						ReadOnlyProps props = new ReadOnlyProps(env.getConfiguration().iterator());
+						try (HTableInterface metaTable = env.getTable(
+								SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props))) {
+							long earliestDisableTimestamp = Long.MAX_VALUE;
+							List<IndexMaintainer> maintainers = Lists
+									.newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+							for (PTable index : indexesToPartiallyRebuild) {
+								long disabledTimeStampVal = index.getIndexDisableTimestamp();
+								if (disabledTimeStampVal > 0) {
+									if (disabledTimeStampVal < earliestDisableTimestamp) {
+										earliestDisableTimestamp = disabledTimeStampVal;
+									}
+
+									maintainers.add(index.getIndexMaintainer(dataPTable, conn));
+								}
+							}
+							// No indexes are disabled, so skip this table
+							if (earliestDisableTimestamp == Long.MAX_VALUE) {
+								continue;
+							}
+							long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
+							LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild
+									+ " from timestamp=" + timeStamp);
+
+							TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false);
+							// TODO Need to set high timeout
+							PostDDLCompiler compiler = new PostDDLCompiler(conn);
+							MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null,
+									HConstants.LATEST_TIMESTAMP);
+							Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
+									maintainers);
+
+							long scanEndTime = getTimestampForBatch(timeStamp,
+									batchExecutedPerTableMap.get(dataPTable.getName()));
+							dataTableScan.setTimeRange(timeStamp, scanEndTime);
+							dataTableScan.setCacheBlocks(false);
+							dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+
+							ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
+									ByteUtil.EMPTY_BYTE_ARRAY);
+							IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild,
+									conn);
+							byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+							dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+							MutationState mutationState = plan.execute();
+							long rowCount = mutationState.getUpdateCount();
+							LOG.info(rowCount + " rows of index which are rebuild");
+							for (PTable indexPTable : indexesToPartiallyRebuild) {
+								String indexTableFullName = SchemaUtil.getTableName(
+										indexPTable.getSchemaName().getString(),
+										indexPTable.getTableName().getString());
+								if (scanEndTime == HConstants.LATEST_TIMESTAMP) {
+									updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE,
+											PIndexState.ACTIVE);
+									batchExecutedPerTableMap.remove(dataPTable.getName());
+								} else {
+
+									updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime, metaTable);
+									Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName());
+									if (noOfBatches == null) {
+										noOfBatches = 0l;
+									}
+									batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches);
+									// clearing cache to get the updated
+									// disabled timestamp
+									new MetaDataClient(conn).updateCache(dataPTable.getSchemaName().getString(),
+											dataPTable.getTableName().getString());
+									new MetaDataClient(conn).updateCache(indexPTable.getSchemaName().getString(),
+											indexPTable.getTableName().getString());
+									LOG.info(
+											"During Round-robin build: Successfully updated index disabled timestamp  for "
+													+ indexTableFullName + " to " + scanEndTime);
+								}
+
+							}
+						} catch (Exception e) { // Log, but try next table's
+												// indexes
+							LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
+									+ ". Will try again next on next scheduled invocation.", e);
+						}
+					}
+				}
+			} catch (Throwable t) {
+				LOG.warn("ScheduledBuildIndexTask failed!", t);
+			} finally {
+				inProgress.decrementAndGet();
+				if (scanner != null) {
+					try {
+						scanner.close();
+					} catch (IOException ignored) {
+						LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored);
+					}
+				}
+				if (conn != null) {
+					try {
+						conn.close();
+					} catch (SQLException ignored) {
+						LOG.debug("ScheduledBuildIndexTask can't close connection", ignored);
+					}
+				}
+			}
         }
-    }
-    
-    private static void updateIndexState(PhoenixConnection conn, String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState,
-            PIndexState newState) throws ServiceException, Throwable {
-        byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
-        String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
-        String indexName = SchemaUtil.getTableNameFromFullName(indexTableName);
-        // Mimic the Put that gets generated by the client on an update of the index state
-        Put put = new Put(indexTableKey);
-        put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                newState.getSerializedBytes());
-        if (newState == PIndexState.ACTIVE) {
-            put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
-                    PLong.INSTANCE.toBytes(0));
+
+        private long getTimestampForBatch(long disabledTimeStamp, Long noOfBatches) {
+            if (disabledTimeStamp < 0 || rebuildIndexBatchSize > (HConstants.LATEST_TIMESTAMP
+                    - disabledTimeStamp)) { return HConstants.LATEST_TIMESTAMP; }
+            long timestampForNextBatch = disabledTimeStamp + rebuildIndexBatchSize;
+			if (timestampForNextBatch < 0 || timestampForNextBatch > System.currentTimeMillis()
+					|| (noOfBatches != null && noOfBatches > configuredBatches)) {
+				// if timestampForNextBatch cross current time , then we should
+				// build the complete index
+				timestampForNextBatch = HConstants.LATEST_TIMESTAMP;
+			}
+            return timestampForNextBatch;
         }
-        final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
-        MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null);
-        MutationCode code = result.getMutationCode();
-        if (code == MutationCode.TABLE_NOT_FOUND) { throw new TableNotFoundException(schemaName, indexName); }
-        if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new SQLExceptionInfo.Builder(
-                SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
-                .setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName)
-                .setTableName(indexName).build().buildException(); }
     }
+    
+	private static void updateIndexState(PhoenixConnection conn, String indexTableName,
+			RegionCoprocessorEnvironment env, PIndexState oldState, PIndexState newState)
+					throws ServiceException, Throwable {
+		byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
+		String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
+		String indexName = SchemaUtil.getTableNameFromFullName(indexTableName);
+		// Mimic the Put that gets generated by the client on an update of the
+		// index state
+		Put put = new Put(indexTableKey);
+		put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+				newState.getSerializedBytes());
+		if (newState == PIndexState.ACTIVE) {
+			put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+					PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0));
+			put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+					PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0));
+		}
+		final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
+		MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null);
+		MutationCode code = result.getMutationCode();
+		if (code == MutationCode.TABLE_NOT_FOUND) {
+			throw new TableNotFoundException(schemaName, indexName);
+		}
+		if (code == MutationCode.UNALLOWED_TABLE_MUTATION) {
+			throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
+					.setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName)
+					.setTableName(indexName).build().buildException();
+		}
+	}
+
+	private static void updateDisableTimestamp(PhoenixConnection conn, String indexTableName,
+			RegionCoprocessorEnvironment env, long disabledTimestamp, HTableInterface metaTable) throws IOException {
+		byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
+		Put put = new Put(indexTableKey);
+		put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
+				PLong.INSTANCE.toBytes(disabledTimestamp));
+		metaTable.checkAndPut(indexTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+				PhoenixDatabaseMetaData.INDEX_STATE_BYTES, CompareOp.EQUAL, PIndexState.INACTIVE.getSerializedBytes(),
+				put);
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index fb4e3c3..fde403c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -421,7 +421,8 @@ public enum SQLExceptionCode {
             724, "43M07", "Schema name not allowed!!"), CREATE_SCHEMA_NOT_ALLOWED(725, "43M08",
                     "Cannot create schema because config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED
                             + " for enabling name space mapping isn't enabled."), INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES(
-                                    726, "43M10", " Inconsistent namespace mapping properites..");
+                                    726, "43M10", " Inconsistent namespace mapping properites.."), ASYNC_NOT_ALLOWED(
+                                    727, "43M11", " ASYNC option is not allowed.. ");
 
     private final int errorCode;
     private final String sqlState;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index eb73d6b..e515dbb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -35,30 +35,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
-import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
-import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
-import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy;
 import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.protobuf.ProtobufUtil;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
@@ -153,47 +144,10 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
             String indexTableName = tableTimeElement.getKey();
             long minTimeStamp = tableTimeElement.getValue();
             // Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
-            byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
             HTableInterface systemTable = env.getTable(SchemaUtil
                     .getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
-            // Mimic the Put that gets generated by the client on an update of the index state
-            Put put = new Put(indexTableKey);
-            if (blockWriteRebuildIndex) 
-                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                        PIndexState.ACTIVE.getSerializedBytes());
-            else  
-                put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
-                        PIndexState.DISABLE.getSerializedBytes());
-            put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
-                PLong.INSTANCE.toBytes(minTimeStamp));
-            final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put);
-
-            final Map<byte[], MetaDataResponse> results =
-                    systemTable.coprocessorService(MetaDataService.class, indexTableKey, indexTableKey,
-                            new Batch.Call<MetaDataService, MetaDataResponse>() {
-                                @Override
-                                public MetaDataResponse call(MetaDataService instance) throws IOException {
-                                    ServerRpcController controller = new ServerRpcController();
-                                    BlockingRpcCallback<MetaDataResponse> rpcCallback =
-                                            new BlockingRpcCallback<MetaDataResponse>();
-                                    UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
-                                    for (Mutation m : tableMetadata) {
-                                        MutationProto mp = ProtobufUtil.toProto(m);
-                                        builder.addTableMetadataMutations(mp.toByteString());
-                                    }
-                                    instance.updateIndexState(controller, builder.build(), rpcCallback);
-                                    if (controller.getFailedOn() != null) {
-                                        throw controller.getFailedOn();
-                                    }
-                                    return rpcCallback.get();
-                                }
-                            });
-            if (results.isEmpty()) {
-                throw new IOException("Didn't get expected result size");
-            }
-            MetaDataResponse tmpResponse = results.values().iterator().next();
-            MetaDataMutationResult result = MetaDataMutationResult.constructFromProto(tmpResponse);
-
+            MetaDataMutationResult result = IndexUtil.disableIndexWithTimestamp(indexTableName, minTimeStamp,
+                    systemTable, blockWriteRebuildIndex);
             if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
                 LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
                 continue;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 54080d1..5142b57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -279,6 +279,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
     public static final String LAST_STATS_UPDATE_TIME = "LAST_STATS_UPDATE_TIME";
     public static final byte[] LAST_STATS_UPDATE_TIME_BYTES = Bytes.toBytes(LAST_STATS_UPDATE_TIME);
     public static final String GUIDE_POST_KEY = "GUIDE_POST_KEY";
+    public static final String ASYNC_REBUILD_TIMESTAMP = "ASYNC_REBUILD_TIMESTAMP";
+    public static final byte[] ASYNC_REBUILD_TIMESTAMP_BYTES = Bytes.toBytes(ASYNC_REBUILD_TIMESTAMP);
 
     public static final String PARENT_TENANT_ID = "PARENT_TENANT_ID";
     public static final byte[] PARENT_TENANT_ID_BYTES = Bytes.toBytes(PARENT_TENANT_ID);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d57c250..f3c6d30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -989,8 +989,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
 
     private static class ExecutableAlterIndexStatement extends AlterIndexStatement implements CompilableStatement {
 
-        public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
-            super(indexTableNode, dataTableName, ifExists, state);
+        public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
+            super(indexTableNode, dataTableName, ifExists, state, async);
         }
 
         @SuppressWarnings("unchecked")
@@ -1302,8 +1302,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
         }
         
         @Override
-        public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
-            return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state);
+        public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
+            return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async);
         }
 
         @Override