You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gj...@apache.org on 2019/05/30 19:53:38 UTC
[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5272 Implement
ALTER INDEX REBUILD ALL ASYNC
This is an automated email from the ASF dual-hosted git repository.
gjacoby pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
new 6827889 PHOENIX-5272 Implement ALTER INDEX REBUILD ALL ASYNC
6827889 is described below
commit 6827889f018a326f49926878be86b59ca5b0fa4d
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Wed May 1 16:06:19 2019 -0700
PHOENIX-5272 Implement ALTER INDEX REBUILD ALL ASYNC
Signed-off-by: Geoffrey Jacoby <gj...@apache.org>
---
.../apache/phoenix/end2end/IndexRebuildTaskIT.java | 31 ++++++-
.../phoenix/end2end/index/IndexMetadataIT.java | 96 ++++++++++++++++++++--
phoenix-core/src/main/antlr3/PhoenixSQL.g | 4 +-
.../coprocessor/tasks/IndexRebuildTask.java | 31 ++++---
.../org/apache/phoenix/jdbc/PhoenixStatement.java | 8 +-
.../apache/phoenix/mapreduce/index/IndexTool.java | 6 +-
.../apache/phoenix/parse/AlterIndexStatement.java | 12 ++-
.../org/apache/phoenix/parse/ParseNodeFactory.java | 6 +-
.../org/apache/phoenix/schema/MetaDataClient.java | 58 +++++++++----
.../java/org/apache/phoenix/util/TestUtil.java | 27 +++++-
10 files changed, 229 insertions(+), 50 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
index 3c53808..28821ee 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
@@ -39,6 +39,8 @@ import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Collections;
import java.util.Map;
@@ -47,6 +49,7 @@ import java.util.Properties;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
protected static String TENANT1 = "tenant1";
@@ -146,15 +149,12 @@ public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true);
task.run();
- Thread.sleep(15000);
-
Table systemHTable= queryServices.getTable(Bytes.toBytes("SYSTEM."+PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE));
count = getUtility().countRows(systemHTable);
assertEquals(1, count);
// Check task status and other column values.
- DropTableWithViewsIT.assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.INDEX_REBUILD,
- null);
+ waitForTaskState(conn, PTable.TaskType.INDEX_REBUILD, PTable.TaskStatus.COMPLETED);
// See that index is rebuilt and confirm index has rows
Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName));
@@ -171,4 +171,27 @@ public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
}
}
}
+
+ public static void waitForTaskState(Connection conn, PTable.TaskType taskType, PTable.TaskStatus expectedTaskStatus) throws InterruptedException,
+ SQLException {
+ int maxTries = 100, nTries = 0;
+ do {
+ Thread.sleep(2000);
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
+ " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+ " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
+ taskType.getSerializedValue());
+
+ String taskStatus = null;
+
+ if (rs.next()) {
+ taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS);
+ boolean matchesExpected = (expectedTaskStatus.toString().equals(taskStatus));
+ if (matchesExpected) {
+ return;
+ }
+ }
+ } while (++nTries < maxTries);
+ fail("Ran out of time waiting for task state to become " + expectedTaskStatus);
+ }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index bb7e6ad..395fcdf 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.end2end.index;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_UPDATE_CACHE_FREQ_FOR_INDEX;
@@ -37,7 +39,10 @@ import java.sql.Statement;
import java.sql.Types;
import java.util.Properties;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.DropTableWithViewsIT;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -49,7 +54,6 @@ import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -120,7 +124,7 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).getIndexMaintainers(ptr, pconn);
assertTrue(ptr.getLength() == 0);
}
-
+
@Test
public void testIndexCreateDrop() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -579,17 +583,18 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
Statement stmt = conn.createStatement();
stmt.execute(ddl);
String indexName = "R_ASYNCIND_" + generateUniqueName();
-
+
ddl = "CREATE INDEX " + indexName + "1 ON " + testTable + " (v1) ";
stmt.execute(ddl);
ddl = "CREATE INDEX " + indexName + "2 ON " + testTable + " (v2) ";
stmt.execute(ddl);
ddl = "CREATE INDEX " + indexName + "3 ON " + testTable + " (v3)";
stmt.execute(ddl);
+
conn.createStatement().execute("ALTER INDEX "+indexName+"1 ON " + testTable +" DISABLE ");
conn.createStatement().execute("ALTER INDEX "+indexName+"2 ON " + testTable +" REBUILD ");
conn.createStatement().execute("ALTER INDEX "+indexName+"3 ON " + testTable +" REBUILD ASYNC");
-
+
ResultSet rs = conn.createStatement().executeQuery(
"select table_name, " + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " +
"from \"SYSTEM\".catalog (" + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + ") " +
@@ -610,7 +615,88 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
"order by table_name" );
assertFalse(rs.next());
}
-
+
+ @Test
+ public void testAsyncRebuildAll() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String testTable = generateUniqueName();
+
+ String ddl = "create table " + testTable + " (k varchar primary key, v4 varchar)";
+ Statement stmt = conn.createStatement();
+ stmt.execute(ddl);
+
+ PreparedStatement upsertStmt = conn.prepareStatement("UPSERT INTO " + testTable + " VALUES(?, ?)");
+ upsertStmt.setString(1, "key1");
+ upsertStmt.setString(2, "val1");
+ upsertStmt.execute();
+
+ String indexName = "R_ASYNCIND_" + generateUniqueName();
+ ddl = "CREATE INDEX " + indexName + " ON " + testTable + " (k, v4)";
+ stmt.execute(ddl);
+
+ // Check that index value is same as table
+ String val = getIndexValue(conn, indexName, 2);
+ assertEquals("val1", val);
+
+ // Update index value, check that index value is still not updated
+ conn.createStatement()
+ .execute("ALTER INDEX " + indexName + " ON " + testTable + " DISABLE");
+ upsertStmt = conn.prepareStatement("UPSERT INTO " + testTable + " VALUES(?, ?)");
+ upsertStmt.setString(1, "key1");
+ upsertStmt.setString(2, "val2");
+ upsertStmt.execute();
+ conn.commit();
+
+ val = getIndexValue(conn, indexName, 2);
+ assertEquals("val1", val);
+
+ // Add extra row to Index
+ upsertStmt = conn.prepareStatement("UPSERT INTO " + indexName + " VALUES(?, ?)");
+ upsertStmt.setString(1, "key3");
+ upsertStmt.setString(2, "val3");
+ upsertStmt.execute();
+
+ conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+ conn.commit();
+
+ conn.createStatement().execute(
+ "ALTER INDEX " + indexName + " ON " + testTable + " REBUILD ALL ASYNC");
+
+ String
+ queryTaskTable =
+ "SELECT * FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
+ ResultSet rs = conn.createStatement().executeQuery(queryTaskTable);
+ assertTrue(rs.next());
+ assertEquals(testTable, rs.getString(TABLE_NAME));
+ assertFalse(rs.next());
+
+ TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE);
+
+ // Check task status and other column values.
+ DropTableWithViewsIT.assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(),
+ PTable.TaskType.INDEX_REBUILD, null);
+
+ // Check that the value is updated to correct one
+ val = getIndexValue(conn, indexName, 2);
+ assertEquals("val2", val);
+
+ Table indexTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName));
+ int count = getUtility().countRows(indexTable);
+ assertEquals(1, count);
+ }
+ }
+
+ private String getIndexValue(Connection conn, String indexName, int column)
+ throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + indexName);
+ assertTrue(rs.next());
+ String val = rs.getString(column);
+ assertFalse(rs.next());
+ return val;
+ }
+
@Test
public void testImmutableTableOnlyHasPrimaryKeyIndex() throws Exception {
helpTestTableOnlyHasPrimaryKeyIndex(false, false);
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 87153cd..2191007 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -606,8 +606,8 @@ drop_index_node returns [DropIndexStatement ret]
// Parse a alter index statement
alter_index_node returns [AlterIndexStatement ret]
: ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name
- ((s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE)) (async=ASYNC)? ((SET?)p=fam_properties)?)
- {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), async!=null, p); }
+ ((s=(USABLE | UNUSABLE | REBUILD (isRebuildAll=ALL)? | DISABLE | ACTIVE)) (async=ASYNC)? ((SET?)p=fam_properties)?)
+ {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), isRebuildAll!=null, async!=null, p); }
;
// Parse a trace statement.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
index c2bdf51..754ea8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -26,8 +26,11 @@ import java.util.Map;
*
*/
public class IndexRebuildTask extends BaseTask {
- public static final String IndexName = "IndexName";
- public static final String JobID = "JobID";
+ public static final String INDEX_NAME = "IndexName";
+ public static final String JOB_ID = "JobID";
+ public static final String DISABLE_BEFORE = "DisableBefore";
+ public static final String REBUILD_ALL = "RebuildAll";
+
public static final Log LOG = LogFactory.getLog(IndexRebuildTask.class);
@Override
@@ -56,18 +59,26 @@ public class IndexRebuildTask extends BaseTask {
}
boolean shouldDisable = false;
- if (jsonObject.has("DisableBefore")) {
- String disableBefore = jsonObject.get("DisableBefore").toString();
+ if (jsonObject.has(DISABLE_BEFORE)) {
+ String disableBefore = jsonObject.get(DISABLE_BEFORE).toString();
if (!Strings.isNullOrEmpty(disableBefore)) {
shouldDisable = Boolean.valueOf(disableBefore);
}
}
+ boolean rebuildAll = false;
+ if (jsonObject.has(REBUILD_ALL)) {
+ String rebuildAllStr = jsonObject.get(REBUILD_ALL).toString();
+ if (!Strings.isNullOrEmpty(rebuildAllStr)) {
+ rebuildAll = Boolean.valueOf(rebuildAllStr);
+ }
+ }
+
// Run index tool async.
boolean runForeground = false;
Map.Entry<Integer, Job> indexToolRes = IndexTool
.run(conf, taskRecord.getSchemaName(), taskRecord.getTableName(), indexName, true,
- false, taskRecord.getTenantId(), shouldDisable, runForeground);
+ false, taskRecord.getTenantId(), shouldDisable, rebuildAll, runForeground);
int status = indexToolRes.getKey();
if (status != 0) {
return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.FAIL, "Index tool returned : " + status);
@@ -75,7 +86,7 @@ public class IndexRebuildTask extends BaseTask {
Job job = indexToolRes.getValue();
- jsonObject.addProperty(JobID, job.getJobID().toString());
+ jsonObject.addProperty(JOB_ID, job.getJobID().toString());
Task.addTask(conn.unwrap(PhoenixConnection.class ), taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(), jsonObject.toString(), taskRecord.getPriority(),
taskRecord.getTimeStamp(), null, true);
@@ -104,8 +115,8 @@ public class IndexRebuildTask extends BaseTask {
private String getIndexName(JsonObject jsonObject) {
String indexName = null;
// Get index name from data column.
- if (jsonObject.has(IndexName)) {
- indexName = jsonObject.get(IndexName).toString().replaceAll("\"", "");
+ if (jsonObject.has(INDEX_NAME)) {
+ indexName = jsonObject.get(INDEX_NAME).toString().replaceAll("\"", "");
}
return indexName;
}
@@ -117,8 +128,8 @@ public class IndexRebuildTask extends BaseTask {
JsonParser jsonParser = new JsonParser();
JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject();
String jobId = null;
- if (jsonObject.has(JobID)) {
- jobId = jsonObject.get(JobID).toString().replaceAll("\"", "");
+ if (jsonObject.has(JOB_ID)) {
+ jobId = jsonObject.get(JOB_ID).toString().replaceAll("\"", "");
}
return jobId;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 55e2ead..7f5897b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -1248,8 +1248,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
private static class ExecutableAlterIndexStatement extends AlterIndexStatement implements CompilableStatement {
- public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async, ListMultimap<String,Pair<String,Object>> props) {
- super(indexTableNode, dataTableName, ifExists, state, async, props);
+ public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean isRebuildAll, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+ super(indexTableNode, dataTableName, ifExists, state, isRebuildAll, async, props);
}
@SuppressWarnings("unchecked")
@@ -1597,8 +1597,8 @@ public class PhoenixStatement implements Statement, SQLCloseable {
}
@Override
- public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async, ListMultimap<String,Pair<String,Object>> props) {
- return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async, props);
+ public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean isRebuildAll, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+ return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state, isRebuildAll, async, props);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 8a7181a..16fb538 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -892,7 +892,7 @@ public class IndexTool extends Configured implements Tool {
}
public static Map.Entry<Integer, Job> run(Configuration conf, String schemaName, String dataTable, String indexTable,
- boolean directApi, boolean useSnapshot, String tenantId, boolean disableBefore, boolean runForeground) throws Exception {
+ boolean directApi, boolean useSnapshot, String tenantId, boolean disableBefore, boolean shouldDeleteBeforeRebuild, boolean runForeground) throws Exception {
final List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
@@ -919,6 +919,10 @@ public class IndexTool extends Configured implements Tool {
args.add(tenantId);
}
+ if (shouldDeleteBeforeRebuild) {
+ args.add("-deleteall");
+ }
+
args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
index de04505..2a5fca4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java
@@ -28,19 +28,21 @@ public class AlterIndexStatement extends SingleTableStatement {
private final boolean ifExists;
private final PIndexState indexState;
private boolean async;
+ private boolean isRebuildAll;
private ListMultimap<String,Pair<String,Object>> props;
private static final PTableType tableType=PTableType.INDEX;
- public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState, boolean async) {
- this(indexTableNode,dataTableName,ifExists,indexState,async,null);
+ public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState, boolean isRebuildAll, boolean async) {
+ this(indexTableNode,dataTableName,ifExists,indexState, isRebuildAll, async,null);
}
- public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+ public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState, boolean isRebuildAll, boolean async, ListMultimap<String,Pair<String,Object>> props) {
super(indexTableNode,0);
this.dataTableName = dataTableName;
this.ifExists = ifExists;
this.indexState = indexState;
this.async = async;
+ this.isRebuildAll = isRebuildAll;
this.props= props==null ? ImmutableListMultimap.<String,Pair<String,Object>>of() : props;
}
@@ -65,6 +67,10 @@ public class AlterIndexStatement extends SingleTableStatement {
return async;
}
+ public boolean isRebuildAll() {
+ return isRebuildAll;
+ }
+
public ListMultimap<String,Pair<String,Object>> getProps() { return props; }
public PTableType getTableType(){ return tableType; }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index aef2a84..1080996 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -373,12 +373,12 @@ public class ParseNodeFactory {
return new DropIndexStatement(indexName, tableName, ifExists);
}
- public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async, ListMultimap<String,Pair<String,Object>> props) {
- return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async, props);
+ public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean isRebuildAll, boolean async, ListMultimap<String,Pair<String,Object>> props) {
+ return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, isRebuildAll, async, props);
}
public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
- return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, false);
+ return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, false, false);
}
public TraceStatement trace(boolean isTraceOn, double samplingRate) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 8d65dde..ab7d881 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -19,9 +19,9 @@ package org.apache.phoenix.schema;
import static com.google.common.collect.Sets.newLinkedHashSet;
import static com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
-import static org.apache.hadoop.hbase.HColumnDescriptor.TTL;
-import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ANALYZE_TABLE;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
+import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.INDEX_NAME;
+import static org.apache.phoenix.coprocessor.tasks.IndexRebuildTask.REBUILD_ALL;
import static org.apache.phoenix.exception.SQLExceptionCode.INSUFFICIENT_MULTI_TENANT_COLUMNS;
import static org.apache.phoenix.exception.SQLExceptionCode.PARENT_TABLE_NOT_FOUND;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA;
@@ -99,7 +99,6 @@ import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL
import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_DROP_METADATA;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_RUN_UPDATE_STATS_ASYNC;
-import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_USE_STATS_FOR_PARALLELIZATION;
import static org.apache.phoenix.schema.PTable.EncodedCQCounter.NULL_COUNTER;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS;
@@ -117,6 +116,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
import java.util.BitSet;
@@ -135,6 +135,7 @@ import java.util.Set;
import org.apache.hadoop.hbase.HColumnDescriptor;
import com.google.common.base.Objects;
+import com.google.gson.JsonObject;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.ClusterConnection;
@@ -159,7 +160,6 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.ServerBuildIndexCompiler;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.StatementNormalizer;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
@@ -220,9 +220,9 @@ import org.apache.phoenix.schema.PTable.QualifierEncodingScheme.QualifierOutOfRa
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.stats.GuidePostsKey;
import org.apache.phoenix.schema.stats.StatisticsUtil;
+import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
-import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PUnsignedLong;
@@ -4289,6 +4289,7 @@ public class MetaDataClient {
String dataTableName = statement.getTableName();
String indexName = statement.getTable().getName().getTableName();
boolean isAsync = statement.isAsync();
+ boolean isRebuildAll = statement.isRebuildAll();
String tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getString();
PTable table = FromCompiler.getIndexResolver(statement, connection)
.getTables().get(0).getTable();
@@ -4303,6 +4304,7 @@ public class MetaDataClient {
boolean changingPhoenixTableProperty= evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName);
PIndexState newIndexState = statement.getIndexState();
+
if (isAsync && newIndexState != PIndexState.REBUILD) { throw new SQLExceptionInfo.Builder(
SQLExceptionCode.ASYNC_NOT_ALLOWED)
.setMessage(" ASYNC building of index is allowed only with REBUILD index state")
@@ -4363,18 +4365,40 @@ public class MetaDataClient {
// Set so that we get the table below with the potentially modified rowKeyOrderOptimizable flag set
indexRef.setTable(result.getTable());
if (newIndexState == PIndexState.BUILDING && isAsync) {
- try {
- tableUpsert = connection.prepareStatement(UPDATE_INDEX_REBUILD_ASYNC_STATE);
- tableUpsert.setString(1,
- connection.getTenantId() == null ? null : connection.getTenantId().getString());
- tableUpsert.setString(2, schemaName);
- tableUpsert.setString(3, indexName);
- tableUpsert.setLong(4, result.getTable().getTimeStamp());
- tableUpsert.execute();
- connection.commit();
- } finally {
- if (tableUpsert != null) {
- tableUpsert.close();
+ if (isRebuildAll) {
+ List<Task.TaskRecord> tasks = Task.queryTaskTable(connection, schemaName, tableName, PTable.TaskType.INDEX_REBUILD,
+ tenantId, indexName);
+ if (tasks == null || tasks.size() == 0) {
+ Timestamp ts = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty(INDEX_NAME, indexName);
+ jsonObject.addProperty(REBUILD_ALL, true);
+ try {
+ Task.addTask(connection, PTable.TaskType.INDEX_REBUILD,
+ tenantId, schemaName,
+ dataTableName, PTable.TaskStatus.CREATED.toString(),
+ jsonObject.toString(), null, ts, null, true);
+ connection.commit();
+ } catch (IOException e) {
+ throw new SQLException("Exception happened while adding a System.Task" + e.toString());
+ }
+ }
+ } else {
+ try {
+ tableUpsert = connection.prepareStatement(UPDATE_INDEX_REBUILD_ASYNC_STATE);
+ tableUpsert.setString(1, connection.getTenantId() == null ?
+ null :
+ connection.getTenantId().getString());
+ tableUpsert.setString(2, schemaName);
+ tableUpsert.setString(3, indexName);
+ long beginTimestamp = result.getTable().getTimeStamp();
+ tableUpsert.setLong(4, beginTimestamp);
+ tableUpsert.execute();
+ connection.commit();
+ } finally {
+ if (tableUpsert != null) {
+ tableUpsert.close();
+ }
}
}
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 945aacb..0e077e9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -51,6 +51,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -73,6 +74,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.phoenix.compile.AggregationManager;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.SequenceManager;
@@ -933,7 +935,30 @@ public class TestUtil {
this.success = success;
}
}
-
+
+ public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
+ int maxTries = 60, nTries = 0;
+ do {
+ String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName);
+ String index = SchemaUtil.getTableNameFromFullName(fullIndexName);
+ Thread.sleep(1000); // sleep 1 sec
+ String query = "SELECT " + PhoenixDatabaseMetaData.INDEX_STATE + " FROM " +
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME
+ + ") = (" + "'" + schema + "','" + index + "') "
+ + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ PIndexState actualIndexState = null;
+ if (rs.next()) {
+ actualIndexState = PIndexState.fromSerializedValue(rs.getString(1));
+ boolean matchesExpected = (actualIndexState == expectedIndexState);
+ if (matchesExpected) {
+ return;
+ }
+ }
+ } while (++nTries < maxTries);
+ fail("Ran out of time waiting for index state to become " + expectedIndexState);
+ }
+
public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws InterruptedException, SQLException {
int maxTries = 60, nTries = 0;
do {