You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/11/12 06:32:32 UTC

[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5546: TASK_TS being set as HConstants.LATEST_TIMESTAMP in SYSTEM.TASK table

This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
     new fcb9e9d  PHOENIX-5546: TASK_TS being set as HConstants.LATEST_TIMESTAMP in SYSTEM.TASK table
fcb9e9d is described below

commit fcb9e9d93554d117fd857fab5a1766dcdd9d86f2
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Mon Nov 11 16:39:35 2019 -0800

    PHOENIX-5546: TASK_TS being set as HConstants.LATEST_TIMESTAMP in SYSTEM.TASK table
---
 .../org/apache/phoenix/end2end/ViewMetadataIT.java | 67 +++++++++++++++++++++-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  5 +-
 .../phoenix/coprocessor/TaskRegionObserver.java    |  5 +-
 .../org/apache/phoenix/execute/MutationState.java  | 15 ++---
 .../org/apache/phoenix/query/QueryConstants.java   |  3 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  | 34 ++++++-----
 .../java/org/apache/phoenix/schema/task/Task.java  | 31 +---------
 7 files changed, 106 insertions(+), 54 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
index 3d46393..8c0eafb 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewMetadataIT.java
@@ -18,10 +18,19 @@
 package org.apache.phoenix.end2end;
 
 import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
+import static org.apache.phoenix.coprocessor.TaskRegionObserver.TASK_DETAILS;
 import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_MODIFY_VIEW_PK;
 import static org.apache.phoenix.exception.SQLExceptionCode.NOT_NULLABLE_COLUMN_IN_ROW_KEY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TASK_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID;
+import static org.apache.phoenix.schema.PTable.TaskType.DROP_CHILD_VIEWS;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -30,6 +39,7 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -37,6 +47,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -340,6 +351,60 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
         validateCols(view);
     }
 
+    // Test case to ensure PHOENIX-5546 does not happen
+    @Test
+    public void testRepeatedCreateAndDropCascadeTableWorks() throws Exception {
+        String tableName = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(SCHEMA1, tableName);
+        String fullViewName = SchemaUtil.getTableName(SCHEMA2, generateUniqueName());
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            createTableViewAndDropCascade(conn, fullTableName, fullViewName);
+            validateViewDoesNotExist(conn, fullViewName);
+            validateSystemTaskContainsCompletedDropChildViewsTasks(conn, SCHEMA1, tableName, 1);
+
+            // Repeat this and check that the view still doesn't exist
+            createTableViewAndDropCascade(conn, fullTableName, fullViewName);
+            validateViewDoesNotExist(conn, fullViewName);
+            validateSystemTaskContainsCompletedDropChildViewsTasks(conn, SCHEMA1, tableName, 2);
+        }
+    }
+
+    private void createTableViewAndDropCascade(Connection conn, String fullTableName,
+            String fullViewName) throws SQLException {
+        String tableDdl = "CREATE TABLE " + fullTableName +
+                "  (k INTEGER NOT NULL PRIMARY KEY, v1 DATE)";
+        conn.createStatement().execute(tableDdl);
+        String ddl = "CREATE VIEW " + fullViewName +
+                " (v2 VARCHAR) AS SELECT * FROM " + fullTableName + " WHERE k > 5";
+        conn.createStatement().execute(ddl);
+
+        // drop table cascade should succeed
+        conn.createStatement().execute("DROP TABLE " + fullTableName + " CASCADE");
+        runDropChildViewsTask();
+    }
+
+    private void validateSystemTaskContainsCompletedDropChildViewsTasks(Connection conn,
+            String schemaName, String tableName, int numTasks) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + SYSTEM_TASK_NAME +
+                " WHERE " + TASK_TYPE + "=" + DROP_CHILD_VIEWS.getSerializedValue() +
+                " AND " + TENANT_ID + " IS NULL" +
+                " AND " + TABLE_SCHEM + "='" + schemaName +
+                "' AND " + TABLE_NAME + "='" + tableName + "'");
+        assertTrue(rs.next());
+        for (int i = 0; i < numTasks; i++) {
+            Timestamp maxTs = new Timestamp(HConstants.LATEST_TIMESTAMP);
+            assertNotEquals("Should have got a valid timestamp", maxTs, rs.getTimestamp(2));
+            assertTrue("Task should be completed",
+                    PTable.TaskStatus.COMPLETED.toString().equals(rs.getString(6)));
+            assertNotNull("Task end time should not be null", rs.getTimestamp(7));
+            String taskData = rs.getString(9);
+            assertTrue("Task data should contain final status", taskData != null &&
+                    taskData.contains(TASK_DETAILS) &&
+                    taskData.contains(PTable.TaskStatus.COMPLETED.toString()));
+        }
+    }
+
     @Test
     public void testViewAndTableInDifferentSchemasWithNamespaceMappingEnabled() throws Exception {
         testViewAndTableInDifferentSchemas(true);
@@ -351,7 +416,7 @@ public class ViewMetadataIT extends SplitSystemCatalogIT {
 
     }
 
-    public void testViewAndTableInDifferentSchemas(boolean isNamespaceMapped) throws Exception {
+    private void testViewAndTableInDifferentSchemas(boolean isNamespaceMapped) throws Exception {
         Properties props = new Properties();
         props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
         Connection conn = DriverManager.getConnection(getUrl(),props);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5e6ad9f..9b85be6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -2392,7 +2392,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                                 .unwrap(PhoenixConnection.class);
                                 Task.addTask(conn, PTable.TaskType.DROP_CHILD_VIEWS,
                                         Bytes.toString(tenantId), Bytes.toString(schemaName),
-                                        Bytes.toString(tableName), this.accessCheckEnabled);
+                                        Bytes.toString(tableName),
+                                        PTable.TaskStatus.CREATED.toString(),
+                                        null, null, null, null,
+                                        this.accessCheckEnabled);
                             } catch (Throwable t) {
                                 LOGGER.error("Adding a task to drop child views failed!", t);
                             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
index 6721986..731ca62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TaskRegionObserver.java
@@ -22,10 +22,8 @@ import java.lang.reflect.Method;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.TimerTask;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -61,6 +59,7 @@ import org.slf4j.LoggerFactory;
 
 public class TaskRegionObserver extends BaseRegionObserver {
     public static final Logger LOGGER = LoggerFactory.getLogger(TaskRegionObserver.class);
+    public static final String TASK_DETAILS = "TaskDetails";
 
     protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(TaskType.values().length);
     private long timeInterval = QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS;
@@ -248,7 +247,7 @@ public class TaskRegionObserver extends BaseRegionObserver {
             }
             JsonParser jsonParser = new JsonParser();
             JsonObject jsonObject = jsonParser.parse(data).getAsJsonObject();
-            jsonObject.addProperty("TaskDetails", taskStatus);
+            jsonObject.addProperty(TASK_DETAILS, taskStatus);
             data = jsonObject.toString();
 
             Timestamp endTs = new Timestamp(EnvironmentEdgeManager.currentTimeMillis());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 77bec54..d31dda7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -42,7 +42,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.concurrent.Immutable;
 
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -83,8 +82,6 @@ import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PMetaData;
 import org.apache.phoenix.schema.PName;
-import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.PNameImpl;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableRef;
@@ -105,8 +102,6 @@ import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
-import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -776,8 +771,14 @@ public class MutationState implements SQLCloseable {
         // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
         // so no need to do it again here.
         PTable table = tableRef.getTable();
-        MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(), table.getTableName()
-                .getString());
+
+        // We generally don't re-resolve SYSTEM tables, but if it relies on ROW_TIMESTAMP, we must
+        // get the latest timestamp in order to upsert data with the correct server-side timestamp
+        // in case the ROW_TIMESTAMP is not provided in the UPSERT statement.
+        boolean hitServerForLatestTimestamp =
+                table.getRowTimestampColPos() != -1 && table.getType() == PTableType.SYSTEM;
+        MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(),
+                table.getTableName().getString(), hitServerForLatestTimestamp);
         PTable resolvedTable = result.getTable();
         if (resolvedTable == null) { throw new TableNotFoundException(table.getSchemaName().getString(), table
                 .getTableName().getString()); }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 910ce92..1b04706 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -394,5 +394,6 @@ public interface QueryConstants {
             HConstants.VERSIONS + "=%s,\n" +
             HColumnDescriptor.KEEP_DELETED_CELLS + "=%s,\n" +
             HColumnDescriptor.TTL + "=" + TASK_TABLE_TTL + ",\n" +     // 10 days
-            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
+            PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
+            STORE_NULLS + "=" + Boolean.TRUE;
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 28e3441..8cd1a59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -520,14 +520,11 @@ public class MetaDataClient {
         return updateCache(schemaName, tableName, false);
     }
 
-    private MetaDataMutationResult updateCache(String schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
+    public MetaDataMutationResult updateCache(String schemaName, String tableName,
+            boolean alwaysHitServer) throws SQLException {
         return updateCache(connection.getTenantId(), schemaName, tableName, alwaysHitServer);
     }
 
-    public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName) throws SQLException {
-        return updateCache(tenantId, schemaName, tableName, false);
-    }
-
     public MetaDataMutationResult updateCache(PName tenantId, String schemaName, String tableName, boolean alwaysHitServer) throws SQLException {
         return updateCache(tenantId, schemaName, tableName, alwaysHitServer, null);
     }
@@ -598,14 +595,11 @@ public class MetaDataClient {
             connection.getMutationState().startTransaction(table.getTransactionProvider());
         }
         resolvedTimestamp = resolvedTimestamp==null ? TransactionUtil.getResolvedTimestamp(connection, isTransactional, HConstants.LATEST_TIMESTAMP) : resolvedTimestamp;
-        // Do not make rpc to getTable if
-        // 1. table is a system table
-        // 2. table was already resolved as of that timestamp
-        // 3. table does not have a ROW_TIMESTAMP column and age is less then UPDATE_CACHE_FREQUENCY
-        if (table != null && !alwaysHitServer
-                && (systemTable || resolvedTimestamp == tableResolvedTimestamp || 
-                (table.getRowTimestampColPos() == -1 && connection.getMetaDataCache().getAge(tableRef) < table.getUpdateCacheFrequency() ))) {
-            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, QueryConstants.UNSET_TIMESTAMP, table);
+
+        if (avoidRpcToGetTable(alwaysHitServer, resolvedTimestamp, systemTable, table, tableRef,
+                tableResolvedTimestamp)) {
+            return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS,
+                    QueryConstants.UNSET_TIMESTAMP, table);
         }
 
         MetaDataMutationResult result;
@@ -724,6 +718,20 @@ public class MetaDataClient {
         return result;
     }
 
+    // Do not make rpc to getTable if
+    // 1. table is a system table that does not have a ROW_TIMESTAMP column OR
+    // 2. table was already resolved as of that timestamp OR
+    // 3. table does not have a ROW_TIMESTAMP column and age is less then UPDATE_CACHE_FREQUENCY
+    private boolean avoidRpcToGetTable(boolean alwaysHitServer, Long resolvedTimestamp,
+            boolean systemTable, PTable table, PTableRef tableRef, long tableResolvedTimestamp) {
+        return table != null && !alwaysHitServer &&
+                (systemTable && table.getRowTimestampColPos() == -1 ||
+                        resolvedTimestamp == tableResolvedTimestamp ||
+                        (table.getRowTimestampColPos() == -1 &&
+                                connection.getMetaDataCache().getAge(tableRef) <
+                                        table.getUpdateCacheFrequency()));
+    }
+
     public MetaDataMutationResult updateCache(String schemaName) throws SQLException {
         return updateCache(schemaName, false);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
index 1a7bb7f..c810b44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/task/Task.java
@@ -35,7 +35,6 @@ import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
 
 public class Task {
@@ -73,7 +72,8 @@ public class Task {
     }
 
     private static  PreparedStatement setValuesToAddTaskPS(PreparedStatement stmt, PTable.TaskType taskType,
-            String tenantId, String schemaName, String tableName) throws SQLException {
+            String tenantId, String schemaName, String tableName, String taskStatus, String data,
+            Integer priority, Timestamp startTs, Timestamp endTs) throws SQLException {
         stmt.setByte(1, taskType.getSerializedValue());
         if (tenantId != null) {
             stmt.setString(2, tenantId);
@@ -86,13 +86,6 @@ public class Task {
             stmt.setNull(3, Types.VARCHAR);
         }
         stmt.setString(4, tableName);
-        return stmt;
-    }
-
-    private static  PreparedStatement setValuesToAddTaskPS(PreparedStatement stmt, PTable.TaskType taskType,
-            String tenantId, String schemaName, String tableName, String taskStatus, String data,
-            Integer priority, Timestamp startTs, Timestamp endTs) throws SQLException {
-        stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName);
         if (taskStatus != null) {
             stmt.setString(5, taskStatus);
         } else {
@@ -127,28 +120,10 @@ public class Task {
     }
 
     public static void addTask(PhoenixConnection conn, PTable.TaskType taskType, String tenantId, String schemaName,
-            String tableName, boolean accessCheckEnabled)
-            throws IOException {
-        PreparedStatement stmt = null;
-        try {
-            stmt = conn.prepareStatement("UPSERT INTO " +
-                    PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +
-                    PhoenixDatabaseMetaData.TASK_TYPE + ", " +
-                    PhoenixDatabaseMetaData.TENANT_ID + ", " +
-                    PhoenixDatabaseMetaData.TABLE_SCHEM + ", " +
-                    PhoenixDatabaseMetaData.TABLE_NAME + " ) VALUES(?,?,?,?)");
-            stmt = setValuesToAddTaskPS(stmt, taskType, tenantId, schemaName, tableName);
-        } catch (SQLException e) {
-            throw new IOException(e);
-        }
-        mutateSystemTaskTable(conn, stmt, accessCheckEnabled);
-    }
-
-    public static void addTask(PhoenixConnection conn, PTable.TaskType taskType, String tenantId, String schemaName,
             String tableName, String taskStatus, String data, Integer priority, Timestamp startTs, Timestamp endTs,
             boolean accessCheckEnabled)
             throws IOException {
-        PreparedStatement stmt = null;
+        PreparedStatement stmt;
         try {
             stmt = conn.prepareStatement("UPSERT INTO " +
                     PhoenixDatabaseMetaData.SYSTEM_TASK_NAME + " ( " +