You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2017/01/27 01:18:50 UTC
[06/26] phoenix git commit: PHOENIX-2890 Extend IndexTool to allow
incremental index rebuilds
PHOENIX-2890 Extend IndexTool to allow incremental index rebuilds
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/83827cd8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/83827cd8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/83827cd8
Branch: refs/heads/calcite
Commit: 83827cd8c2876c6b6dccf3a5678889b40a76261b
Parents: 70dc383
Author: Ankit Singhal <an...@gmail.com>
Authored: Mon Dec 26 11:59:19 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Mon Dec 26 11:59:19 2016 +0530
----------------------------------------------------------------------
.../phoenix/end2end/AutomaticRebuildIT.java | 219 +++++++++
.../end2end/IndexToolForPartialBuildIT.java | 298 ++++++++++++
...olForPartialBuildWithNamespaceEnabledIT.java | 70 +++
.../phoenix/end2end/index/IndexMetadataIT.java | 58 +++
.../end2end/index/MutableIndexFailureIT.java | 10 +-
phoenix-core/src/main/antlr3/PhoenixSQL.g | 4 +-
.../coprocessor/MetaDataEndpointImpl.java | 9 +-
.../coprocessor/MetaDataRegionObserver.java | 291 +++++++-----
.../phoenix/exception/SQLExceptionCode.java | 3 +-
.../index/PhoenixIndexFailurePolicy.java | 52 +--
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 2 +
.../apache/phoenix/jdbc/PhoenixStatement.java | 8 +-
.../phoenix/mapreduce/index/IndexTool.java | 455 +++++++++++++------
.../phoenix/mapreduce/index/IndexToolUtil.java | 6 +-
.../index/PhoenixIndexImportDirectMapper.java | 2 +-
.../index/PhoenixIndexPartialBuildMapper.java | 182 ++++++++
.../util/PhoenixConfigurationUtil.java | 31 ++
.../phoenix/parse/AlterIndexStatement.java | 8 +-
.../apache/phoenix/parse/ParseNodeFactory.java | 6 +-
.../org/apache/phoenix/query/QueryServices.java | 4 +
.../apache/phoenix/schema/MetaDataClient.java | 47 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 61 ++-
22 files changed, 1504 insertions(+), 322 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
new file mode 100644
index 0000000..cbb7745
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AutomaticRebuildIT.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link AutomaticRebuildIT}
+ */
+@RunWith(Parameterized.class)
+public class AutomaticRebuildIT extends BaseOwnClusterIT {
+
+ private final boolean localIndex;
+ protected boolean isNamespaceEnabled = false;
+ protected final String tableDDLOptions;
+
+ public AutomaticRebuildIT(boolean localIndex) {
+ this.localIndex = localIndex;
+ StringBuilder optionBuilder = new StringBuilder();
+ optionBuilder.append(" SPLIT ON(1,2)");
+ this.tableDDLOptions = optionBuilder.toString();
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+ serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
+ serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+ serverProps.put("hbase.client.pause", "5000");
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "1000");
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, "5");
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ @Parameters(name = "localIndex = {0}")
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] { { false }, { true } });
+ }
+
+ @Test
+ public void testSecondaryAutomaticRebuildIndex() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+ props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
+ props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
+ final Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement();
+ try {
+ if (isNamespaceEnabled) {
+ conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+ }
+ stmt.execute(String.format(
+ "CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
+ fullTableName, tableDDLOptions));
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+ FailingRegionObserver.FAIL_WRITE = false;
+ // insert two rows
+ upsertRow(stmt1, 1000);
+ upsertRow(stmt1, 2000);
+
+ conn.commit();
+ stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ",
+ (localIndex ? "LOCAL" : ""), indxTable, fullTableName));
+ FailingRegionObserver.FAIL_WRITE = true;
+ upsertRow(stmt1, 3000);
+ upsertRow(stmt1, 4000);
+ upsertRow(stmt1, 5000);
+ try {
+ conn.commit();
+ fail();
+ } catch (SQLException e) {
+ } catch (Exception e) {
+ }
+ FailingRegionObserver.FAIL_WRITE = false;
+ ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
+ new String[] { PTableType.INDEX.toString() });
+ assertTrue(rs.next());
+ assertEquals(indxTable, rs.getString(3));
+ String indexState = rs.getString("INDEX_STATE");
+ assertEquals(PIndexState.DISABLE.toString(), indexState);
+ assertFalse(rs.next());
+ upsertRow(stmt1, 6000);
+ upsertRow(stmt1, 7000);
+ conn.commit();
+ int maxTries = 4, nTries = 0;
+ boolean isInactive = false;
+ do {
+ rs = conn.createStatement()
+ .executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + ","
+ + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
+ + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " ("
+ + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
+ + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
+ + PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
+ rs.next();
+ if (PIndexState.INACTIVE.getSerializedValue().equals(rs.getString(1)) && rs.getLong(2) > 3000) {
+ isInactive = true;
+ break;
+ }
+ Thread.sleep(10 * 1000); // sleep 10 secs
+ } while (++nTries < maxTries);
+ assertTrue(isInactive);
+ nTries = 0;
+ boolean isActive = false;
+ do {
+ Thread.sleep(15 * 1000); // sleep 15 secs
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
+ new String[] { PTableType.INDEX.toString() });
+ assertTrue(rs.next());
+ if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
+ isActive = true;
+ break;
+ }
+ } while (++nTries < maxTries);
+ assertTrue(isActive);
+
+ } finally {
+ conn.close();
+ }
+ }
+
+ public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+ // insert row
+ stmt.setInt(1, i);
+ stmt.setString(2, "uname" + String.valueOf(i));
+ stmt.setInt(3, 95050 + i);
+ stmt.executeUpdate();
+ }
+
+ public static class FailingRegionObserver extends SimpleRegionObserver {
+ public static volatile boolean FAIL_WRITE = false;
+ public static final String INDEX_NAME = "IDX";
+
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+ throw new DoNotRetryIOException();
+ }
+ Mutation operation = miniBatchOp.getOperation(0);
+ Set<byte[]> keySet = operation.getFamilyMap().keySet();
+ for (byte[] family : keySet) {
+ if (Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
+ throw new DoNotRetryIOException();
+ }
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
new file mode 100644
index 0000000..116c47f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link IndexToolForPartialBuildIT}
+ */
+@RunWith(Parameterized.class)
+public class IndexToolForPartialBuildIT extends BaseOwnClusterIT {
+
+ private final boolean localIndex;
+ protected boolean isNamespaceEnabled = false;
+ protected final String tableDDLOptions;
+
+ public IndexToolForPartialBuildIT(boolean localIndex) {
+
+ this.localIndex = localIndex;
+ StringBuilder optionBuilder = new StringBuilder();
+ optionBuilder.append(" SPLIT ON(1,2)");
+ this.tableDDLOptions = optionBuilder.toString();
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+ serverProps.put(" yarn.scheduler.capacity.maximum-am-resource-percent", "1.0");
+ serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+ serverProps.put("hbase.client.pause", "5000");
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.FALSE.toString());
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ @Parameters(name="localIndex = {0}")
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] {
+ { false},{ true }
+ });
+ }
+
+ @Test
+ public void testSecondaryIndex() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, dataTableName);
+ final String indxTable = String.format("%s_%s", dataTableName, FailingRegionObserver.INDEX_NAME);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+ props.setProperty(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.FALSE.toString());
+ props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
+ final Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement();
+ try {
+ if (isNamespaceEnabled) {
+ conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+ }
+ stmt.execute(
+ String.format("CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
+ fullTableName, tableDDLOptions));
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+ FailingRegionObserver.FAIL_WRITE = false;
+ // insert two rows
+ upsertRow(stmt1, 1000);
+ upsertRow(stmt1, 2000);
+
+ conn.commit();
+ stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),11,'x')||'_xyz') ",
+ (localIndex ? "LOCAL" : ""), indxTable, fullTableName));
+ FailingRegionObserver.FAIL_WRITE = true;
+ upsertRow(stmt1, 3000);
+ upsertRow(stmt1, 4000);
+ upsertRow(stmt1, 5000);
+ try {
+ conn.commit();
+ fail();
+ } catch (SQLException e) {} catch (Exception e) {}
+ conn.createStatement()
+ .execute(String.format("ALTER INDEX %s on %s REBUILD ASYNC", indxTable, fullTableName));
+
+ FailingRegionObserver.FAIL_WRITE = false;
+ ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), indxTable,
+ new String[] { PTableType.INDEX.toString() });
+ assertTrue(rs.next());
+ assertEquals(indxTable, rs.getString(3));
+ String indexState = rs.getString("INDEX_STATE");
+ assertEquals(PIndexState.BUILDING.toString(), indexState);
+ assertFalse(rs.next());
+ upsertRow(stmt1, 6000);
+ upsertRow(stmt1, 7000);
+ conn.commit();
+
+ rs = conn.createStatement()
+ .executeQuery(String.format("SELECT " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + ","
+ + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " FROM "
+ + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " ("
+ + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " bigint) where "
+ + PhoenixDatabaseMetaData.TABLE_SCHEM + "='" + schemaName + "' and "
+ + PhoenixDatabaseMetaData.TABLE_NAME + "='" + indxTable + "'"));
+ rs.next();
+ PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
+ assertEquals(PIndexState.BUILDING, pindexTable.getIndexState());
+ assertEquals(rs.getLong(1), pindexTable.getTimeStamp());
+ //assert disabled timestamp
+ assertEquals(rs.getLong(2), 3000);
+
+ String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName);
+ rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+
+ // assert we are pulling from data table.
+ assertExplainPlan(actualExplainPlan, schemaName, dataTableName, null, false, isNamespaceEnabled);
+
+ rs = stmt1.executeQuery(selectSql);
+ for (int i = 1; i <= 7; i++) {
+ assertTrue(rs.next());
+ assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
+ }
+
+ // Validate Index table data till disabled timestamp
+ rs = stmt1.executeQuery(String.format("SELECT * FROM %s", SchemaUtil.getTableName(schemaName, indxTable)));
+ for (int i = 1; i <= 2; i++) {
+ assertTrue(rs.next());
+ assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
+ }
+ assertFalse(rs.next());
+ // run the index MR job.
+ final IndexTool indexingTool = new IndexTool();
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
+ indexingTool.setConf(conf);
+
+ final String[] cmdArgs = getArgValues(schemaName, dataTableName);
+ int status = indexingTool.run(cmdArgs);
+ assertEquals(0, status);
+
+ // insert two more rows
+ upsertRow(stmt1, 8000);
+ upsertRow(stmt1, 9000);
+ conn.commit();
+
+ // assert we are pulling from index table.
+ rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ assertExplainPlan(actualExplainPlan, schemaName, dataTableName, indxTable, localIndex, isNamespaceEnabled);
+
+ rs = stmt.executeQuery(selectSql);
+
+ for (int i = 1; i <= 9; i++) {
+ assertTrue(rs.next());
+ assertEquals("xxUNAME" + i*1000 + "_xyz", rs.getString(1));
+ }
+
+ assertFalse(rs.next());
+
+ // conn.createStatement().execute(String.format("DROP INDEX %s ON %s", indxTable, fullTableName));
+ } finally {
+ conn.close();
+ }
+ }
+
+ public static void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
+ String indxTable, boolean isLocal, boolean isNamespaceMapped) {
+
+ String expectedExplainPlan = "";
+ if (indxTable != null) {
+ if (isLocal) {
+ final String localIndexName = SchemaUtil
+ .getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable), isNamespaceMapped,
+ PTableType.INDEX)
+ .getString();
+ expectedExplainPlan = String.format("CLIENT PARALLEL 3-WAY RANGE SCAN OVER %s [1]", localIndexName);
+ } else {
+ expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
+ SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, indxTable),
+ isNamespaceMapped, PTableType.INDEX));
+ }
+ } else {
+ expectedExplainPlan = String.format("CLIENT PARALLEL 1-WAY FULL SCAN OVER %s",
+ SchemaUtil.getPhysicalHBaseTableName(SchemaUtil.getTableName(schemaName, dataTable),
+ isNamespaceMapped, PTableType.TABLE));
+ }
+ assertTrue(actualExplainPlan.contains(expectedExplainPlan));
+ }
+
+ public String[] getArgValues(String schemaName, String dataTable) {
+ final List<String> args = Lists.newArrayList();
+ if (schemaName!=null) {
+ args.add("-s");
+ args.add(schemaName);
+ }
+ args.add("-dt");
+ args.add(dataTable);
+ args.add("-pr");
+ args.add("-op");
+ args.add("/tmp/output/partialTable_"+localIndex);
+ return args.toArray(new String[0]);
+ }
+
+ public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+ // insert row
+ stmt.setInt(1, i);
+ stmt.setString(2, "uname" + String.valueOf(i));
+ stmt.setInt(3, 95050 + i);
+ stmt.executeUpdate();
+ }
+
+
+ public static class FailingRegionObserver extends SimpleRegionObserver {
+ public static volatile boolean FAIL_WRITE = false;
+ public static final String INDEX_NAME = "IDX";
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+ throw new DoNotRetryIOException();
+ }
+ Mutation operation = miniBatchOp.getOperation(0);
+ Set<byte[]> keySet = operation.getFamilyMap().keySet();
+ for(byte[] family: keySet) {
+ if(Bytes.toString(family).startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX) && FAIL_WRITE) {
+ throw new DoNotRetryIOException();
+ }
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
new file mode 100644
index 0000000..5e16b05
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildWithNamespaceEnabledIT.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link IndexToolForPartialBuildWithNamespaceEnabled}
+ */
+@RunWith(Parameterized.class)
+public class IndexToolForPartialBuildWithNamespaceEnabled extends IndexToolForPartialBuildIT {
+
+
+ public IndexToolForPartialBuildWithNamespaceEnabled(boolean localIndex, boolean isNamespaceEnabled) {
+ super(localIndex);
+ this.isNamespaceEnabled=isNamespaceEnabled;
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(7);
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+ serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
+ serverProps.put("hbase.client.pause", "5000");
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, "2000");
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
+ serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, "true");
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ @Parameters(name="localIndex = {0} , isNamespaceEnabled = {1}")
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] {
+ { false, true},{ true, false }
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index f0c670b..63a6bd6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -31,6 +31,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Types;
import java.util.Properties;
@@ -43,10 +44,13 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnAlreadyExistsException;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
@@ -216,6 +220,15 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
assertFalse(rs.next());
assertActiveIndex(conn, INDEX_DATA_SCHEMA, indexDataTable);
+
+ ddl = "ALTER INDEX " + indexName + " ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + indexDataTable + " REBUILD ASYNC";
+ conn.createStatement().execute(ddl);
+ // Verify the metadata for index is correct.
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(INDEX_DATA_SCHEMA), indexName , new String[] {PTableType.INDEX.toString()});
+ assertTrue(rs.next());
+ assertEquals(indexName , rs.getString(3));
+ assertEquals(PIndexState.BUILDING.toString(), rs.getString("INDEX_STATE"));
+ assertFalse(rs.next());
ddl = "DROP INDEX " + indexName + " ON " + INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + indexDataTable;
stmt = conn.prepareStatement(ddl);
@@ -568,4 +581,49 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
assertTrue(d2.after(d1));
assertFalse(rs.next());
}
+
+ @Test
+ public void testAsyncRebuildTimestamp() throws Exception {
+ long startTimestamp = System.currentTimeMillis();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(false);
+ String testTable = generateUniqueName();
+
+
+ String ddl = "create table " + testTable + " (k varchar primary key, v1 varchar, v2 varchar, v3 varchar)";
+ Statement stmt = conn.createStatement();
+ stmt.execute(ddl);
+ String indexName = "R_ASYNCIND_" + generateUniqueName();
+
+ ddl = "CREATE INDEX " + indexName + "1 ON " + testTable + " (v1) ";
+ stmt.execute(ddl);
+ ddl = "CREATE INDEX " + indexName + "2 ON " + testTable + " (v2) ";
+ stmt.execute(ddl);
+ ddl = "CREATE INDEX " + indexName + "3 ON " + testTable + " (v3)";
+ stmt.execute(ddl);
+ conn.createStatement().execute("ALTER INDEX "+indexName+"1 ON " + testTable +" DISABLE ");
+ conn.createStatement().execute("ALTER INDEX "+indexName+"2 ON " + testTable +" REBUILD ");
+ conn.createStatement().execute("ALTER INDEX "+indexName+"3 ON " + testTable +" REBUILD ASYNC");
+
+ ResultSet rs = conn.createStatement().executeQuery(
+ "select table_name, " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " +
+ "from system.catalog (" + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + ") " +
+ "where " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " !=0 and table_name like 'R_ASYNCIND_%' " +
+ "order by table_name");
+ assertTrue(rs.next());
+ assertEquals(indexName + "3", rs.getString(1));
+ long asyncTimestamp = rs.getLong(2);
+ assertTrue("Async timestamp is recent timestamp", asyncTimestamp > startTimestamp);
+ PTable table = PhoenixRuntime.getTable(conn, indexName+"3");
+ assertEquals(table.getTimeStamp(), asyncTimestamp);
+ assertFalse(rs.next());
+ conn.createStatement().execute("ALTER INDEX "+indexName+"3 ON " + testTable +" DISABLE");
+ rs = conn.createStatement().executeQuery(
+ "select table_name, " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " +
+ "from system.catalog (" + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + ") " +
+ "where " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " !=0 and table_name like 'ASYNCIND_%' " +
+ "order by table_name" );
+ assertFalse(rs.next());
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 687b2c2..e9205c9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -102,7 +102,7 @@ public class MutableIndexFailureIT extends BaseTest {
this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "");
this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "")
+ (isNamespaceMapped ? "_NM" : "");
- this.indexName = INDEX_NAME;
+ this.indexName = FailingRegionObserver.INDEX_NAME;
fullTableName = SchemaUtil.getTableName(schema, tableName);
this.fullIndexName = SchemaUtil.getTableName(schema, indexName);
this.isNamespaceMapped = isNamespaceMapped;
@@ -155,7 +155,7 @@ public class MutableIndexFailureIT extends BaseTest {
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
- FAIL_WRITE = false;
+ FailingRegionObserver.FAIL_WRITE = false;
conn.createStatement().execute(
"CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
// Create other index which should be local/global if the other index is global/local to
@@ -202,7 +202,7 @@ public class MutableIndexFailureIT extends BaseTest {
assertEquals("z", rs.getString(2));
assertFalse(rs.next());
- FAIL_WRITE = true;
+ FailingRegionObserver.FAIL_WRITE = true;
updateTable(conn, fullTableName);
updateTable(conn, secondTableName);
// Verify the metadata for index is correct.
@@ -259,7 +259,7 @@ public class MutableIndexFailureIT extends BaseTest {
}
// re-enable index table
- FAIL_WRITE = false;
+ FailingRegionObserver.FAIL_WRITE = false;
waitForIndexToBeActive(conn,indexName);
waitForIndexToBeActive(conn,indexName+"_2");
waitForIndexToBeActive(conn,secondIndexName);
@@ -391,6 +391,8 @@ public class MutableIndexFailureIT extends BaseTest {
}
public static class FailingRegionObserver extends SimpleRegionObserver {
+ public static volatile boolean FAIL_WRITE = false;
+ public static final String INDEX_NAME = "IDX";
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 3e09766..07a51ce 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -567,8 +567,8 @@ drop_index_node returns [DropIndexStatement ret]
// Parse a alter index statement
alter_index_node returns [AlterIndexStatement ret]
- : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE)
- {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText()))); }
+ : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE) (async=ASYNC)?
+ {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), async!=null); }
;
// Parse a trace statement.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 38dc494..a0681fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3296,11 +3296,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
List<Cell> newKVs = tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES);
Cell newKV = null;
int disableTimeStampKVIndex = -1;
+ int indexStateKVIndex = 0;
int index = 0;
for(Cell cell : newKVs){
if(Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){
newKV = cell;
+ indexStateKVIndex = index;
} else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
INDEX_DISABLE_TIMESTAMP_BYTES, 0, INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0){
disableTimeStampKVIndex = index;
@@ -3378,11 +3380,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if ((currentState == PIndexState.UNUSABLE && newState == PIndexState.ACTIVE)
|| (currentState == PIndexState.ACTIVE && newState == PIndexState.UNUSABLE)) {
newState = PIndexState.INACTIVE;
- newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
+ newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
} else if (currentState == PIndexState.INACTIVE && newState == PIndexState.USABLE) {
newState = PIndexState.ACTIVE;
- newKVs.set(0, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
+ newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
}
@@ -3414,7 +3416,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if(dataTableKey != null) {
metaDataCache.invalidate(new ImmutableBytesPtr(dataTableKey));
}
- if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1) {
+ if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1
+ || currentState == PIndexState.DISABLE || newState == PIndexState.BUILDING) {
returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index e790b59..a60de03 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -17,12 +17,19 @@
*/
package org.apache.phoenix.coprocessor;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
import java.io.IOException;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -45,6 +52,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -69,6 +77,7 @@ import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
@@ -97,6 +106,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD;
private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL;
private boolean blockWriteRebuildIndex = false;
+ private static Map<PName, Long> batchExecutedPerTableMap = new HashMap<PName, Long>();
@Override
public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -125,6 +135,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL);
blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
+
}
@Override
@@ -195,9 +206,15 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
// running
private final static AtomicInteger inProgress = new AtomicInteger(0);
RegionCoprocessorEnvironment env;
+ private long rebuildIndexBatchSize = HConstants.LATEST_TIMESTAMP;
+ private long configuredBatches = 10;
public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) {
this.env = env;
+ this.rebuildIndexBatchSize = env.getConfiguration().getLong(
+ QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, HConstants.LATEST_TIMESTAMP);
+ this.configuredBatches = env.getConfiguration().getLong(
+ QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, configuredBatches);
}
@Override
@@ -228,6 +245,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+ PreparedStatement updateDisabledTimeStampSmt = null;
Map<PTable, List<PTable>> dataTableToIndexesMap = null;
MetaDataClient client = null;
@@ -243,8 +261,13 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
Result r = Result.create(results);
byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+ byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
+
+ if (disabledTimeStamp == null || disabledTimeStamp.length == 0 || (indexState != null
+ && PIndexState.BUILDING == PIndexState.fromSerializedValue(Bytes.toString(indexState)))) {
- if (disabledTimeStamp == null || disabledTimeStamp.length == 0) {
+ // Don't rebuild the building index , because they are marked for aysnc
continue;
}
@@ -255,8 +278,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
}
byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
- byte[] indexState = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
- PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
if ((dataTable == null || dataTable.length == 0) || (indexState == null || indexState.length == 0)) {
// data table name can't be empty
continue;
@@ -317,109 +338,169 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
indexesToPartiallyRebuild.add(indexPTable);
} while (hasMore);
- if (dataTableToIndexesMap != null) {
- long overlapTime = env.getConfiguration().getLong(
- QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
- QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
- for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet()) {
- PTable dataPTable = entry.getKey();
- List<PTable> indexesToPartiallyRebuild = entry.getValue();
- try {
- long earliestDisableTimestamp = Long.MAX_VALUE;
- List<IndexMaintainer> maintainers = Lists
- .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
- for (PTable index : indexesToPartiallyRebuild) {
- long disabledTimeStampVal = index.getIndexDisableTimestamp();
- if (disabledTimeStampVal > 0) {
- if (disabledTimeStampVal < earliestDisableTimestamp) {
- earliestDisableTimestamp = disabledTimeStampVal;
- }
-
- maintainers.add(index.getIndexMaintainer(dataPTable, conn));
- }
- }
- // No indexes are disabled, so skip this table
- if (earliestDisableTimestamp == Long.MAX_VALUE) {
- continue;
- }
-
- long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
- LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild
- + " from timestamp=" + timeStamp);
- TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false);
- // TODO Need to set high timeout
- PostDDLCompiler compiler = new PostDDLCompiler(conn);
- MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null,
- HConstants.LATEST_TIMESTAMP);
- Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
- maintainers);
- dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
- dataTableScan.setCacheBlocks(false);
- dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
-
- ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
- ByteUtil.EMPTY_BYTE_ARRAY);
- IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild,
- conn);
- byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-
- dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
- MutationState mutationState = plan.execute();
- long rowCount = mutationState.getUpdateCount();
- LOG.info(rowCount + " rows of index which are rebuild");
- for (PTable indexPTable : indexesToPartiallyRebuild) {
- String indexTableFullName = SchemaUtil.getTableName(indexPTable.getSchemaName()
- .getString(), indexPTable.getTableName().getString());
- updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE);
- }
- } catch (Exception e) { // Log, but try next table's indexes
- LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
- + ". Will try again next on next scheduled invocation.", e);
- }
- }
- }
- } catch (Throwable t) {
- LOG.warn("ScheduledBuildIndexTask failed!", t);
- } finally {
- inProgress.decrementAndGet();
- if (scanner != null) {
- try {
- scanner.close();
- } catch (IOException ignored) {
- LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored);
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException ignored) {
- LOG.debug("ScheduledBuildIndexTask can't close connection", ignored);
- }
- }
- }
+ if (dataTableToIndexesMap != null) {
+ long overlapTime = env.getConfiguration().getLong(
+ QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
+ QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
+ for (Map.Entry<PTable, List<PTable>> entry : dataTableToIndexesMap.entrySet()) {
+ PTable dataPTable = entry.getKey();
+ List<PTable> indexesToPartiallyRebuild = entry.getValue();
+ ReadOnlyProps props = new ReadOnlyProps(env.getConfiguration().iterator());
+ try (HTableInterface metaTable = env.getTable(
+ SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props))) {
+ long earliestDisableTimestamp = Long.MAX_VALUE;
+ List<IndexMaintainer> maintainers = Lists
+ .newArrayListWithExpectedSize(indexesToPartiallyRebuild.size());
+ for (PTable index : indexesToPartiallyRebuild) {
+ long disabledTimeStampVal = index.getIndexDisableTimestamp();
+ if (disabledTimeStampVal > 0) {
+ if (disabledTimeStampVal < earliestDisableTimestamp) {
+ earliestDisableTimestamp = disabledTimeStampVal;
+ }
+
+ maintainers.add(index.getIndexMaintainer(dataPTable, conn));
+ }
+ }
+ // No indexes are disabled, so skip this table
+ if (earliestDisableTimestamp == Long.MAX_VALUE) {
+ continue;
+ }
+ long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
+ LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild
+ + " from timestamp=" + timeStamp);
+
+ TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false);
+ // TODO Need to set high timeout
+ PostDDLCompiler compiler = new PostDDLCompiler(conn);
+ MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null,
+ HConstants.LATEST_TIMESTAMP);
+ Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
+ maintainers);
+
+ long scanEndTime = getTimestampForBatch(timeStamp,
+ batchExecutedPerTableMap.get(dataPTable.getName()));
+ dataTableScan.setTimeRange(timeStamp, scanEndTime);
+ dataTableScan.setCacheBlocks(false);
+ dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+
+ ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
+ ByteUtil.EMPTY_BYTE_ARRAY);
+ IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild,
+ conn);
+ byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+ dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ MutationState mutationState = plan.execute();
+ long rowCount = mutationState.getUpdateCount();
+ LOG.info(rowCount + " rows of index which are rebuild");
+ for (PTable indexPTable : indexesToPartiallyRebuild) {
+ String indexTableFullName = SchemaUtil.getTableName(
+ indexPTable.getSchemaName().getString(),
+ indexPTable.getTableName().getString());
+ if (scanEndTime == HConstants.LATEST_TIMESTAMP) {
+ updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE,
+ PIndexState.ACTIVE);
+ batchExecutedPerTableMap.remove(dataPTable.getName());
+ } else {
+
+ updateDisableTimestamp(conn, indexTableFullName, env, scanEndTime, metaTable);
+ Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName());
+ if (noOfBatches == null) {
+ noOfBatches = 0l;
+ }
+ batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches);
+ // clearing cache to get the updated
+ // disabled timestamp
+ new MetaDataClient(conn).updateCache(dataPTable.getSchemaName().getString(),
+ dataPTable.getTableName().getString());
+ new MetaDataClient(conn).updateCache(indexPTable.getSchemaName().getString(),
+ indexPTable.getTableName().getString());
+ LOG.info(
+ "During Round-robin build: Successfully updated index disabled timestamp for "
+ + indexTableFullName + " to " + scanEndTime);
+ }
+
+ }
+ } catch (Exception e) { // Log, but try next table's
+ // indexes
+ LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
+ + ". Will try again next on next scheduled invocation.", e);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn("ScheduledBuildIndexTask failed!", t);
+ } finally {
+ inProgress.decrementAndGet();
+ if (scanner != null) {
+ try {
+ scanner.close();
+ } catch (IOException ignored) {
+ LOG.debug("ScheduledBuildIndexTask can't close scanner.", ignored);
+ }
+ }
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException ignored) {
+ LOG.debug("ScheduledBuildIndexTask can't close connection", ignored);
+ }
+ }
+ }
}
- }
-
- private static void updateIndexState(PhoenixConnection conn, String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState,
- PIndexState newState) throws ServiceException, Throwable {
- byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
- String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
- String indexName = SchemaUtil.getTableNameFromFullName(indexTableName);
- // Mimic the Put that gets generated by the client on an update of the index state
- Put put = new Put(indexTableKey);
- put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
- newState.getSerializedBytes());
- if (newState == PIndexState.ACTIVE) {
- put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
- PLong.INSTANCE.toBytes(0));
+
+ private long getTimestampForBatch(long disabledTimeStamp, Long noOfBatches) {
+ if (disabledTimeStamp < 0 || rebuildIndexBatchSize > (HConstants.LATEST_TIMESTAMP
+ - disabledTimeStamp)) { return HConstants.LATEST_TIMESTAMP; }
+ long timestampForNextBatch = disabledTimeStamp + rebuildIndexBatchSize;
+ if (timestampForNextBatch < 0 || timestampForNextBatch > System.currentTimeMillis()
+ || (noOfBatches != null && noOfBatches > configuredBatches)) {
+ // if timestampForNextBatch cross current time , then we should
+ // build the complete index
+ timestampForNextBatch = HConstants.LATEST_TIMESTAMP;
+ }
+ return timestampForNextBatch;
}
- final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
- MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null);
- MutationCode code = result.getMutationCode();
- if (code == MutationCode.TABLE_NOT_FOUND) { throw new TableNotFoundException(schemaName, indexName); }
- if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
- .setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName)
- .setTableName(indexName).build().buildException(); }
}
+
+ private static void updateIndexState(PhoenixConnection conn, String indexTableName,
+ RegionCoprocessorEnvironment env, PIndexState oldState, PIndexState newState)
+ throws ServiceException, Throwable {
+ byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
+ String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
+ String indexName = SchemaUtil.getTableNameFromFullName(indexTableName);
+ // Mimic the Put that gets generated by the client on an update of the
+ // index state
+ Put put = new Put(indexTableKey);
+ put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+ newState.getSerializedBytes());
+ if (newState == PIndexState.ACTIVE) {
+ put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0));
+ put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0));
+ }
+ final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
+ MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null);
+ MutationCode code = result.getMutationCode();
+ if (code == MutationCode.TABLE_NOT_FOUND) {
+ throw new TableNotFoundException(schemaName, indexName);
+ }
+ if (code == MutationCode.UNALLOWED_TABLE_MUTATION) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
+ .setMessage(" currentState=" + oldState + ". requestedState=" + newState).setSchemaName(schemaName)
+ .setTableName(indexName).build().buildException();
+ }
+ }
+
+ private static void updateDisableTimestamp(PhoenixConnection conn, String indexTableName,
+ RegionCoprocessorEnvironment env, long disabledTimestamp, HTableInterface metaTable) throws IOException {
+ byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
+ Put put = new Put(indexTableKey);
+ put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
+ PLong.INSTANCE.toBytes(disabledTimestamp));
+ metaTable.checkAndPut(indexTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+ PhoenixDatabaseMetaData.INDEX_STATE_BYTES, CompareOp.EQUAL, PIndexState.INACTIVE.getSerializedBytes(),
+ put);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index fb4e3c3..fde403c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -421,7 +421,8 @@ public enum SQLExceptionCode {
724, "43M07", "Schema name not allowed!!"), CREATE_SCHEMA_NOT_ALLOWED(725, "43M08",
"Cannot create schema because config " + QueryServices.IS_NAMESPACE_MAPPING_ENABLED
+ " for enabling name space mapping isn't enabled."), INCONSISTENET_NAMESPACE_MAPPING_PROPERTIES(
- 726, "43M10", " Inconsistent namespace mapping properites..");
+ 726, "43M10", " Inconsistent namespace mapping properites.."), ASYNC_NOT_ALLOWED(
+ 727, "43M11", " ASYNC option is not allowed.. ");
private final int errorCode;
private final String sqlState;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index eb73d6b..e515dbb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -35,30 +35,21 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
-import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
-import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
-import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
@@ -153,47 +144,10 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
String indexTableName = tableTimeElement.getKey();
long minTimeStamp = tableTimeElement.getValue();
// Disable the index by using the updateIndexState method of MetaDataProtocol end point coprocessor.
- byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
HTableInterface systemTable = env.getTable(SchemaUtil
.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, env.getConfiguration()));
- // Mimic the Put that gets generated by the client on an update of the index state
- Put put = new Put(indexTableKey);
- if (blockWriteRebuildIndex)
- put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
- PIndexState.ACTIVE.getSerializedBytes());
- else
- put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
- PIndexState.DISABLE.getSerializedBytes());
- put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
- PLong.INSTANCE.toBytes(minTimeStamp));
- final List<Mutation> tableMetadata = Collections.<Mutation>singletonList(put);
-
- final Map<byte[], MetaDataResponse> results =
- systemTable.coprocessorService(MetaDataService.class, indexTableKey, indexTableKey,
- new Batch.Call<MetaDataService, MetaDataResponse>() {
- @Override
- public MetaDataResponse call(MetaDataService instance) throws IOException {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<MetaDataResponse> rpcCallback =
- new BlockingRpcCallback<MetaDataResponse>();
- UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder();
- for (Mutation m : tableMetadata) {
- MutationProto mp = ProtobufUtil.toProto(m);
- builder.addTableMetadataMutations(mp.toByteString());
- }
- instance.updateIndexState(controller, builder.build(), rpcCallback);
- if (controller.getFailedOn() != null) {
- throw controller.getFailedOn();
- }
- return rpcCallback.get();
- }
- });
- if (results.isEmpty()) {
- throw new IOException("Didn't get expected result size");
- }
- MetaDataResponse tmpResponse = results.values().iterator().next();
- MetaDataMutationResult result = MetaDataMutationResult.constructFromProto(tmpResponse);
-
+ MetaDataMutationResult result = IndexUtil.disableIndexWithTimestamp(indexTableName, minTimeStamp,
+ systemTable, blockWriteRebuildIndex);
if (result.getMutationCode() == MutationCode.TABLE_NOT_FOUND) {
LOG.info("Index " + indexTableName + " has been dropped. Ignore uncommitted mutations");
continue;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 54080d1..5142b57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -279,6 +279,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String LAST_STATS_UPDATE_TIME = "LAST_STATS_UPDATE_TIME";
public static final byte[] LAST_STATS_UPDATE_TIME_BYTES = Bytes.toBytes(LAST_STATS_UPDATE_TIME);
public static final String GUIDE_POST_KEY = "GUIDE_POST_KEY";
+ public static final String ASYNC_REBUILD_TIMESTAMP = "ASYNC_REBUILD_TIMESTAMP";
+ public static final byte[] ASYNC_REBUILD_TIMESTAMP_BYTES = Bytes.toBytes(ASYNC_REBUILD_TIMESTAMP);
public static final String PARENT_TENANT_ID = "PARENT_TENANT_ID";
public static final byte[] PARENT_TENANT_ID_BYTES = Bytes.toBytes(PARENT_TENANT_ID);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d57c250..f3c6d30 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -989,8 +989,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
private static class ExecutableAlterIndexStatement extends AlterIndexStatement implements CompilableStatement {
- public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
- super(indexTableNode, dataTableName, ifExists, state);
+ public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
+ super(indexTableNode, dataTableName, ifExists, state, async);
}
@SuppressWarnings("unchecked")
@@ -1302,8 +1302,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
@Override
- public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
- return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state);
+ public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) {
+ return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async);
}
@Override