You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/11/12 06:37:53 UTC
[phoenix] branch master updated: PHOENIX-5546: TASK_TS being set as
HConstants.LATEST_TIMESTAMP in SYSTEM.TASK table
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 05e26a3 PHOENIX-5546: TASK_TS being set as HConstants.LATEST_TIMESTAMP in SYSTEM.TASK table
05e26a3 is described below
commit 05e26a31c46d7a77b719647847835683a1f8f491
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Mon Nov 11 16:39:35 2019 -0800
PHOENIX-5546: TASK_TS being set as HConstants.LATEST_TIMESTAMP in SYSTEM.TASK table
---
.../org/apache/phoenix/end2end/ViewMetadataIT.java | 67 +++++++++++++++++++++-
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 5 +-
.../phoenix/coprocessor/TaskRegionObserver.java | 3 +-
.../org/apache/phoenix/execute/MutationState.java | 14 +++--
.../org/apache/phoenix/query/QueryConstants.java | 3 +-
.../org/apache/phoenix/schema/MetaDataClient.java | 34 ++++++-----
.../java/org/apache/phoenix/schema/task/Task.java | 31 +---------
7 files changed, 106 insertions(+), 51 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
index 0e50fc2..944609f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
@@ -18,10 +18,19 @@
package org.apache.phoenix.end2end;
import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
+import static org.apache.phoenix.coprocessor.TaskRegionObserver.TASK_DETAILS;
import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MODIFY_VIEW_PK;
import static org.apache.phoenix.exception.SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.schema.PTable.TaskType.DROP_CHILD_VIEWS;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -30,6 +39,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -37,6 +47,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -340,6 +351,60 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
validateCols(view);
}
+ // Test case to ensure PHOENIX-5546 does not happen
+ @Test
+ public void testRepeatedCreateAndDropCascadeTableWorks() throws Exception {
+ String tableName = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(SCHEMA1, tableName);
+ String fullViewName = SchemaUtil.getTableName(SCHEMA2, generateUniqueName());
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTableViewAndDropCascade(conn, fullTableName, fullViewName);
+ validateViewDoesNotExist(conn, fullViewName);
+ validateSystemTaskContainsCompletedDropChildViewsTasks(conn, SCHEMA1, tableName, 1);
+
+ // Repeat this and check that the view still doesn't exist
+ createTableViewAndDropCascade(conn, fullTableName, fullViewName);
+ validateViewDoesNotExist(conn, fullViewName);
+ validateSystemTaskContainsCompletedDropChildViewsTasks(conn, SCHEMA1, tableName, 2);
+ }
+ }
+
+ private void createTableViewAndDropCascade(Connection conn, String fullTableName,
+ String fullViewName) throws SQLException {
+ String tableDdl = "CREATE TABLE " + fullTableName +
+ " (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)";
+ conn.createStatement().execute(tableDdl);
+ String ddl = "CREATE VIEW " + fullViewName +
+ " (v2 VARCHAR) AS SELECT * FROM " + fullTableName + " WHERE k > 5";
+ conn.createStatement().execute(ddl);
+
+ // drop table cascade should succeed
+ conn.createStatement().execute("DROP TABLE " + fullTableName + " CASCADE");
+ runDropChildViewsTask();
+ }
+
+ private void validateSystemTaskContainsCompletedDropChildViewsTasks(Connection conn,
+ String schemaName, String tableName, int numTasks) throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + SYSTEM_TASK_NAME +
+ " WHERE " + TASK_TYPE + "=" + DROP_CHILD_VIEWS.getSerializedValue() +
+ " AND " + TENANT_ID + " IS NULL" +
+ " AND " + TABLE_SCHEM + "='" + schemaName +
+ "' AND " + TABLE_NAME + "='" + tableName + "'");
+ assertTrue(rs.next());
+ for (int i = 0; i < numTasks; i++) {
+ Timestamp maxTs = new Timestamp(HConstants.LATEST_TIMESTAMP);
+ assertNotEquals("Should have got a valid timestamp", maxTs, rs.getTimestamp(2));
+ assertTrue("Task should be completed",
+ PTable.TaskStatus.COMPLETED.toString().equals(rs.getString(6)));
+ assertNotNull("Task end time should not be null", rs.getTimestamp(7));
+ String taskData = rs.getString(9);
+ assertTrue("Task data should contain final status", taskData != null &&
+ taskData.contains(TASK_DETAILS) &&
+ taskData.contains(PTable.TaskStatus.COMPLETED.toString()));
+ }
+ }
+
@Test
public void testViewAndTableInDifferentSchemasWithNamespaceMappingEnabled() throws Exception {
testViewAndTableInDifferentSchemas(true);
@@ -351,7 +416,7 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
}
- public void testViewAndTableInDifferentSchemas(boolean isNamespaceMapped) throws Exception {
+ private void testViewAndTableInDifferentSchemas(boolean isNamespaceMapped) throws Exception {
Properties props = new Properties();
props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
Connection conn = DriverManager.getConnection(getUrl(),props);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 10455f2..c142e63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -2408,7 +2408,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
.unwrap(PhoenixConnection.class);
Task.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS,
Bytes.toString(tenantId), Bytes.toString(schemaName),
- Bytes.toString(tableName), this.accessCheckEnabled);
+ Bytes.toString(tableName),
+ PTable.TaskStatus.CREATED.toString(),
+ null, null, null, null,
+ this.accessCheckEnabled);
} catch (Throwable t) {
LOGGER.error("Adding a task to drop child views failed!", t);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 872ea3b..34e2413 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory;
public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
public static final Logger LOGGER = LoggerFactory.getLogger(TaskRegionObserver.class);
+ public static final String TASK_DETAILS = "TaskDetails";
protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(TaskType.values().length);
private long timeInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS;
@@ -252,7 +253,7 @@ public class TaskRegionObserver implements RegionObserver, RegionCoprocessor {
}
JsonParser jsonParser = new JsonParser();
JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject();
- jsonObject.addProperty("TaskDetails", taskStatus);
+ jsonObject.addProperty(TASK_DETAILS, taskStatus);
data = jsonObject.toString();
Timestamp endTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 10f29d0..8c38f1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -84,7 +83,6 @@ import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaData;
import org.apache.phoenix.schema.PName;
-import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableRef;
@@ -104,9 +102,7 @@ import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -777,8 +773,14 @@ public class MutationState implements SQLCloseable {
// If we're auto committing, we've already validated the schema when we got the ColumnResolver,
// so no need to do it again here.
PTable table = tableRef.getTable();
- MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName()
- .getString());
+
+ // We generally don't re-resolve SYSTEM tables, but if it relies on ROW_TIMESTAMP, we must
+ // get the latest timestamp in order to upsert data with the correct server-side timestamp
+ // in case the ROW_TIMESTAMP is not provided in the UPSERT statement.
+ boolean hitServerForLatestTimestamp =
+ table.getRowTimestampColPos() != -1 && table.getType() == PTableType.SYSTEM;
+ MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(),
+ table.getTableName().getString(), hitServerForLatestTimestamp);
PTable resolvedTable = result.getTable();
if (resolvedTable == null) { throw new TableNotFoundException(table.getSchemaName().getString(), table
.getTableName().getString()); }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 659993c..8d727ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -399,5 +399,6 @@ public interface QueryConstants {
HConstants.VERSIONS + "=%s,\n" +
ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
ColumnFamilyDescriptorBuilder.TTL + "=" + TASK_TABLE_TTL + ",\n" + // 10 days
- PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
+ PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
+ STORE_NULLS + "=" + Boolean.TRUE;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index da38328..7311d41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -520,14 +520,11 @@ public class MetaDataClient {
return updateCache(schemaName, tableName, false);
}
- private MetaDataMutationResult updateCache(String schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
+ public MetaDataMutationResult updateCache(String schemaName, String tableName,
+ boolean alwaysHitServer) throws SQLException {
return updateCache(connection.getTenantId(), schemaName, tableName, alwaysHitServer);
}
- public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName) throws SQLException {
- return updateCache(tenantId, schemaName, tableName, false);
- }
-
public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
return updateCache(tenantId, schemaName, tableName, alwaysHitServer, null);
}
@@ -598,14 +595,11 @@ public class MetaDataClient {
connection.getMutationState().startTransaction(table.getTransactionProvider());
}
resolvedTimestamp = resolvedTimestamp==null ? TransactionUtil.getResolvedTimestamp(connection, isTransactional, HConstants.LATEST_TIMESTAMP) : resolvedTimestamp;
- // Do not make rpc to getTable if
- // 1. table is a system table
- // 2. table was already resolved as of that timestamp
- // 3. table does not have a ROW_TIMESTAMP column and age is less then UPDATE_CACHE_FREQUENCY
- if (table != null && !alwaysHitServer
- && (systemTable || resolvedTimestamp == tableResolvedTimestamp ||
- (table.getRowTimestampColPos() == -1 && connection.getMetaDataCache().getAge(tableRef) < table.getUpdateCacheFrequency() ))) {
- return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, QueryConstants.UNSET_TIMESTAMP, table);
+
+ if (avoidRpcToGetTable(alwaysHitServer, resolvedTimestamp, systemTable, table, tableRef,
+ tableResolvedTimestamp)) {
+ return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
+ QueryConstants.UNSET_TIMESTAMP, table);
}
MetaDataMutationResult result;
@@ -724,6 +718,20 @@ public class MetaDataClient {
return result;
}
+ // Do not make rpc to getTable if
+ // 1. table is a system table that does not have a ROW_TIMESTAMP column OR
+ // 2. table was already resolved as of that timestamp OR
+ // 3. table does not have a ROW_TIMESTAMP column and age is less then UPDATE_CACHE_FREQUENCY
+ private boolean avoidRpcToGetTable(boolean alwaysHitServer, Long resolvedTimestamp,
+ boolean systemTable, PTable table, PTableRef tableRef, long tableResolvedTimestamp) {
+ return table != null && !alwaysHitServer &&
+ (systemTable && table.getRowTimestampColPos() == -1 ||
+ resolvedTimestamp == tableResolvedTimestamp ||
+ (table.getRowTimestampColPos() == -1 &&
+ connection.getMetaDataCache().getAge(tableRef) <
+ table.getUpdateCacheFrequency()));
+ }
+
public MetaDataMutationResult updateCache(String schemaName) throws SQLException {
return updateCache(schemaName, false);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
index 4161667..988004b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -35,7 +35,6 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.List;
public class Task {
@@ -73,7 +72,8 @@ public class Task {
}
private static PreparedStatement setValuesToAddTaskPS(PreparedStatement stmt, PTable.TaskType taskType,
- String tenantId, String schemaName, String tableName) throws SQLException {
+ String tenantId, String schemaName, String tableName, String taskStatus, String data,
+ Integer priority, Timestamp startTs, Timestamp endTs) throws SQLException {
stmt.setByte(1, taskType.getSerializedValue());
if (tenantId != null) {
stmt.setString(2, tenantId);
@@ -86,13 +86,6 @@ public class Task {
stmt.setNull(3, Types.VARCHAR);
}
stmt.setString(4, tableName);
- return stmt;
- }
-
- private static PreparedStatement setValuesToAddTaskPS(PreparedStatement stmt, PTable.TaskType taskType,
- String tenantId, String schemaName, String tableName, String taskStatus, String data,
- Integer priority, Timestamp startTs, Timestamp endTs) throws SQLException {
- stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName);
if (taskStatus != null) {
stmt.setString(5, taskStatus);
} else {
@@ -127,28 +120,10 @@ public class Task {
}
public static void addTask(PhoenixConnection conn, PTable.TaskType taskType, String tenantId, String schemaName,
- String tableName, boolean accessCheckEnabled)
- throws IOException {
- PreparedStatement stmt = null;
- try {
- stmt = conn.prepareStatement("UPSERT INTO " +
- PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
- PhoenixDatabaseMetaData.TASK_TYPE + ", " +
- PhoenixDatabaseMetaData.TENANT_ID + ", " +
- PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
- PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)");
- stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
- }
-
- public static void addTask(PhoenixConnection conn, PTable.TaskType taskType, String tenantId, String schemaName,
String tableName, String taskStatus, String data, Integer priority, Timestamp startTs, Timestamp endTs,
boolean accessCheckEnabled)
throws IOException {
- PreparedStatement stmt = null;
+ PreparedStatement stmt;
try {
stmt = conn.prepareStatement("UPSERT INTO " +
PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +