You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/08/16 22:53:35 UTC
[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5348: Fix flaky
test: testIndexRebuildTask
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
new 77cf9ee PHOENIX-5348: Fix flaky test: testIndexRebuildTask
77cf9ee is described below
commit 77cf9eeb5a9760d362087f2879514f1516f900ac
Author: Gokcen Iskender <gi...@salesforce.com>
AuthorDate: Thu Jul 25 11:04:18 2019 -0700
PHOENIX-5348: Fix flaky test: testIndexRebuildTask
Signed-off-by: Chinmay Kulkarni <ch...@apache.org>
---
.../phoenix/end2end/DropTableWithViewsIT.java | 33 ++++++++-
.../apache/phoenix/end2end/IndexRebuildTaskIT.java | 85 +++++++++++-----------
.../phoenix/end2end/index/IndexMetadataIT.java | 5 +-
.../phoenix/coprocessor/TaskRegionObserver.java | 9 ++-
.../coprocessor/tasks/IndexRebuildTask.java | 4 +-
.../index/PhoenixIndexImportDirectReducer.java | 7 +-
.../org/apache/phoenix/schema/MetaDataClient.java | 2 +-
.../java/org/apache/phoenix/schema/task/Task.java | 19 ++++-
8 files changed, 103 insertions(+), 61 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
index 5836c56..6663dde 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DropTableWithViewsIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -26,6 +27,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collection;
@@ -137,7 +139,8 @@ public class DropTableWithViewsIT extends SplitSystemCatalogIT {
task.run();
task.run();
- assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.DROP_CHILD_VIEWS, null);
+ assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(), PTable.TaskType.DROP_CHILD_VIEWS,
+ null, null, null, null, null);
// Views should be dropped by now
TableName linkTable = TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES);
@@ -156,7 +159,9 @@ public class DropTableWithViewsIT extends SplitSystemCatalogIT {
}
}
- public static void assertTaskColumns(Connection conn, String expectedStatus, PTable.TaskType taskType, String expectedData)
+ public static void assertTaskColumns(Connection conn, String expectedStatus, PTable.TaskType taskType,
+ String expectedTableName, String expectedTenantId, String expectedSchema, Timestamp expectedTs,
+ String expectedIndexName)
throws SQLException {
ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
" FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
@@ -166,9 +171,29 @@ public class DropTableWithViewsIT extends SplitSystemCatalogIT {
String taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS);
assertEquals(expectedStatus, taskStatus);
- if (expectedData != null) {
+ if (expectedTableName != null) {
+ String tableName = rs.getString(PhoenixDatabaseMetaData.TABLE_NAME);
+ assertEquals(expectedTableName, tableName);
+ }
+
+ if (expectedTenantId != null) {
+ String tenantId = rs.getString(PhoenixDatabaseMetaData.TENANT_ID);
+ assertEquals(expectedTenantId, tenantId);
+ }
+
+ if (expectedSchema != null) {
+ String schema = rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM);
+ assertEquals(expectedSchema, schema);
+ }
+
+ if (expectedTs != null) {
+ Timestamp ts = rs.getTimestamp(PhoenixDatabaseMetaData.TASK_TS);
+ assertEquals(expectedTs, ts);
+ }
+
+ if (expectedIndexName != null) {
String data = rs.getString(PhoenixDatabaseMetaData.TASK_DATA);
- assertEquals(expectedData, data);
+ assertEquals(true, data.contains("\"IndexName\":\"" + expectedIndexName));
}
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
index c4bcb30..8d6bb06 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRebuildTaskIT.java
@@ -27,6 +27,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.task.Task;
import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -47,12 +48,11 @@ import java.util.Map;
import java.util.Properties;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.waitForIndexRebuild;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-// disabled -- see PHOENIX-5348
-public abstract class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
+public class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
protected static String TENANT1 = "tenant1";
private static RegionCoprocessorEnvironment TaskRegionEnvironment;
@@ -81,15 +81,16 @@ public abstract class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
@Test
public void testIndexRebuildTask() throws Throwable {
String baseTable = generateUniqueName();
+ String viewName = generateUniqueName();
Connection conn = null;
- Connection viewConn = null;
+ Connection tenantConn = null;
try {
conn = DriverManager.getConnection(getUrl());
conn.setAutoCommit(false);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, TENANT1);
- viewConn =DriverManager.getConnection(getUrl(), props);
+ tenantConn =DriverManager.getConnection(getUrl(), props);
String ddlFormat =
"CREATE TABLE IF NOT EXISTS " + baseTable + " ("
+ " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR "
@@ -97,45 +98,37 @@ public abstract class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute(generateDDL(ddlFormat));
conn.commit();
// Create a view
- String viewName = generateUniqueName();
String viewDDL = "CREATE VIEW " + viewName + " AS SELECT * FROM " + baseTable;
- viewConn.createStatement().execute(viewDDL);
+ tenantConn.createStatement().execute(viewDDL);
// Create index
String indexName = generateUniqueName();
String idxSDDL = String.format("CREATE INDEX %s ON %s (V1)", indexName, viewName);
- viewConn.createStatement().execute(idxSDDL);
+ tenantConn.createStatement().execute(idxSDDL);
// Insert rows
int numOfValues = 1000;
for (int i=0; i < numOfValues; i++){
- viewConn.createStatement().execute(
+ tenantConn.createStatement().execute(
String.format("UPSERT INTO %s VALUES('%s', '%s', '%s')", viewName, String.valueOf(i), "y",
"z"));
}
- viewConn.commit();
-
- String data = "{IndexName:" + indexName + "}";
- // Run IndexRebuildTask
- TaskRegionObserver.SelfHealingTask task =
- new TaskRegionObserver.SelfHealingTask(
- TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
-
- Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
- // Add a task to System.Task to build indexes
- Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD,
- TENANT1, null, viewName,
- PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true);
-
-
- task.run();
+ tenantConn.commit();
+ waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
String viewIndexTableName = MetaDataUtil.getViewIndexPhysicalName(baseTable);
ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
- int count = getUtility().countRows(queryServices.getTable(Bytes.toBytes(viewIndexTableName)));
- assertTrue(count == numOfValues);
+ Table indexHTable = queryServices.getTable(Bytes.toBytes(viewIndexTableName));
+ int count = getUtility().countRows(indexHTable);
+ assertEquals(numOfValues, count);
+
+ // Alter to Unusable makes the index status inactive.
+ // If I Alter to DISABLE, it fails to in Index tool while setting state to active due to Invalid transition.
+ tenantConn.createStatement().execute(
+ String.format("ALTER INDEX %s ON %s UNUSABLE", indexName, viewName));
+ tenantConn.commit();
// Remove index contents and try again
Admin admin = queryServices.getAdmin();
@@ -143,50 +136,56 @@ public abstract class IndexRebuildTaskIT extends BaseUniqueNamesOwnClusterIT {
admin.disableTable(tableName);
admin.truncateTable(tableName, false);
- data = "{IndexName:" + indexName + ", DisableBefore:true}";
+ count = getUtility().countRows(indexHTable);
+ assertEquals(0, count);
+
+ String data = "{IndexName:" + indexName + ", DisableBefore: true}";
+
+ // Run IndexRebuildTask
+ TaskRegionObserver.SelfHealingTask task =
+ new TaskRegionObserver.SelfHealingTask(
+ TaskRegionEnvironment, QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
- // Add a new task (update status to created) to System.Task to rebuild indexes
+ Timestamp startTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
Task.addTask(conn.unwrap(PhoenixConnection.class), PTable.TaskType.INDEX_REBUILD,
TENANT1, null, viewName,
PTable.TaskStatus.CREATED.toString(), data, null, startTs, null, true);
task.run();
- Table systemHTable= queryServices.getTable(Bytes.toBytes("SYSTEM."+PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE));
- count = getUtility().countRows(systemHTable);
- assertEquals(1, count);
-
// Check task status and other column values.
- waitForTaskState(conn, PTable.TaskType.INDEX_REBUILD, PTable.TaskStatus.COMPLETED);
+ waitForTaskState(conn, PTable.TaskType.INDEX_REBUILD, viewName, PTable.TaskStatus.COMPLETED);
// See that index is rebuilt and confirm index has rows
- Table htable= queryServices.getTable(Bytes.toBytes(viewIndexTableName));
- count = getUtility().countRows(htable);
+ count = getUtility().countRows(indexHTable);
assertEquals(numOfValues, count);
} finally {
- conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
- conn.commit();
if (conn != null) {
+ conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME
+ + " WHERE TABLE_NAME ='" + viewName + "'");
+ conn.commit();
conn.close();
}
- if (viewConn != null) {
- viewConn.close();
+ if (tenantConn != null) {
+ tenantConn.close();
}
}
}
- public static void waitForTaskState(Connection conn, PTable.TaskType taskType, PTable.TaskStatus expectedTaskStatus) throws InterruptedException,
+ public static void waitForTaskState(Connection conn, PTable.TaskType taskType, String expectedTableName,
+ PTable.TaskStatus expectedTaskStatus) throws InterruptedException,
SQLException {
int maxTries = 100, nTries = 0;
do {
Thread.sleep(2000);
ResultSet rs = conn.createStatement().executeQuery("SELECT * " +
" FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
- " WHERE " + PhoenixDatabaseMetaData.TASK_TYPE + " = " +
+ " WHERE " + PhoenixDatabaseMetaData.TABLE_NAME + "='" + expectedTableName + "' AND " +
+ PhoenixDatabaseMetaData.TASK_TYPE + " = " +
taskType.getSerializedValue());
String taskStatus = null;
- if (rs.next()) {
+ while (rs.next()) {
taskStatus = rs.getString(PhoenixDatabaseMetaData.TASK_STATUS);
boolean matchesExpected = (expectedTaskStatus.toString().equals(taskStatus));
if (matchesExpected) {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
index 395fcdf..6352d35 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexMetadataIT.java
@@ -658,7 +658,8 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
upsertStmt.setString(2, "val3");
upsertStmt.execute();
- conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+ conn.createStatement().execute("DELETE " + " FROM " + PhoenixDatabaseMetaData.SYSTEM_TASK_NAME +
+ " WHERE TABLE_NAME ='" + testTable + "'");
conn.commit();
conn.createStatement().execute(
@@ -676,7 +677,7 @@ public class IndexMetadataIT extends ParallelStatsDisabledIT {
// Check task status and other column values.
DropTableWithViewsIT.assertTaskColumns(conn, PTable.TaskStatus.COMPLETED.toString(),
- PTable.TaskType.INDEX_REBUILD, null);
+ PTable.TaskType.INDEX_REBUILD, testTable, null, null, null,indexName);
// Check that the value is updated to correct one
val = getIndexValue(conn, indexName, 2);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 671d888..6721986 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -186,16 +186,19 @@ public class TaskRegionObserver extends BaseRegionObserver {
if (result == null) {
// reread task record. There might be async setting of task status
- taskRecord = Task.queryTaskTable(connForTask, taskRecord.getSchemaName(), taskRecord.getTableName(),
+ taskRecord = Task.queryTaskTable(connForTask, taskRecord.getTimeStamp(),
+ taskRecord.getSchemaName(), taskRecord.getTableName(),
taskType, taskRecord.getTenantId(), null).get(0);
if (taskRecord.getStatus() != null && taskRecord.getStatus().equals(
PTable.TaskStatus.COMPLETED.toString())) {
continue;
}
+
// Change task status to STARTED
Task.addTask(connForTask, taskRecord.getTaskType(), taskRecord.getTenantId(), taskRecord.getSchemaName(),
taskRecord.getTableName(), PTable.TaskStatus.STARTED.toString(),
- taskRecord.getData(), taskRecord.getPriority(), taskRecord.getTimeStamp(), null, true);
+ taskRecord.getData(), taskRecord.getPriority(), taskRecord.getTimeStamp(), null,
+ true);
// invokes the method at runtime
result = (TaskResult) runMethod.invoke(obj, taskRecord);
@@ -237,7 +240,7 @@ public class TaskRegionObserver extends BaseRegionObserver {
}
public static void setEndTaskStatus(PhoenixConnection connForTask, Task.TaskRecord taskRecord, String taskStatus)
- throws IOException {
+ throws IOException, SQLException {
// update data with details.
String data = taskRecord.getData();
if (Strings.isNullOrEmpty(data)) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
index 0069592..a943499 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -163,7 +163,9 @@ public class IndexRebuildTask extends BaseTask {
Cluster cluster = new Cluster(configuration);
Job job = cluster.getJob(org.apache.hadoop.mapreduce.JobID.forName(jobID));
-
+ if (job == null) {
+ return new TaskRegionObserver.TaskResult(TaskRegionObserver.TaskResultCode.SKIPPED, "");
+ }
if (job != null && job.isComplete()) {
if (job.isSuccessful()) {
LOGGER.warn("IndexRebuildTask checkCurrentResult job is successful "
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 5424407..0e40aff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -20,11 +20,9 @@ package org.apache.phoenix.mapreduce.index;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
-import java.sql.Timestamp;
import java.util.List;
import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
@@ -36,7 +34,6 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.task.Task;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,12 +72,12 @@ public class PhoenixIndexImportDirectReducer extends
String schemaName = SchemaUtil.getSchemaNameFromFullName(fullTableName);
String tableName = SchemaUtil.getTableNameFromFullName(fullTableName);
String indexName = PhoenixConfigurationUtil.getDisableIndexes(context.getConfiguration());
- List<Task.TaskRecord> taskRecords = Task.queryTaskTable(connection, schemaName, tableName,
+ List<Task.TaskRecord> taskRecords = Task.queryTaskTable(connection, null, schemaName, tableName,
PTable.TaskType.INDEX_REBUILD, tenantId, indexName);
if (taskRecords != null && taskRecords.size() > 0) {
for (Task.TaskRecord taskRecord : taskRecords) {
TaskRegionObserver.SelfHealingTask.setEndTaskStatus(
- connection.unwrap(PhoenixConnection.class), taskRecords.get(0),
+ connection.unwrap(PhoenixConnection.class), taskRecord,
PTable.TaskStatus.COMPLETED.toString());
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9c183de..3bc7d53 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -4362,7 +4362,7 @@ public class MetaDataClient {
indexRef.setTable(result.getTable());
if (newIndexState == PIndexState.BUILDING && isAsync) {
if (isRebuildAll) {
- List<Task.TaskRecord> tasks = Task.queryTaskTable(connection, schemaName, tableName, PTable.TaskType.INDEX_REBUILD,
+ List<Task.TaskRecord> tasks = Task.queryTaskTable(connection, null, schemaName, tableName, PTable.TaskType.INDEX_REBUILD,
tenantId, indexName);
if (tasks == null || tasks.size() == 0) {
Timestamp ts = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
index 11883d6..1a7bb7f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -35,6 +35,7 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
public class Task {
@@ -202,7 +203,8 @@ public class Task {
return result;
}
- public static List<TaskRecord> queryTaskTable(Connection connection, String schema, String tableName,
+ public static List<TaskRecord> queryTaskTable(Connection connection, Timestamp ts,
+ String schema, String tableName,
PTable.TaskType taskType, String tenantId, String indexName)
throws SQLException {
String taskQuery = "SELECT " +
@@ -230,7 +232,20 @@ public class Task {
taskQuery += " AND " + PhoenixDatabaseMetaData.TASK_DATA + " LIKE '%" + indexName + "%'";
}
- return populateTasks(connection, taskQuery);
+ List<TaskRecord> taskRecords = populateTasks(connection, taskQuery);
+ List<TaskRecord> result = new ArrayList<TaskRecord>();
+ if (ts != null) {
+ // Adding TASK_TS to the where clause did not work. It returns empty when directly querying with the timestamp.
+ for (TaskRecord tr : taskRecords) {
+ if (tr.getTimeStamp().equals(ts)) {
+ result.add(tr);
+ }
+ }
+ } else {
+ result = taskRecords;
+ }
+
+ return result;
}
public static List<TaskRecord> queryTaskTable(Connection connection, String[] excludedTaskStatus)