You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/10/21 17:32:39 UTC

[impala] 03/04: IMPALA-10975: Refactor alter table ddl operation

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit be80b1c442b56ff12c504c210a7a97076cb3672c
Author: Sourabh Goyal <so...@cloudera.com>
AuthorDate: Wed Oct 13 12:01:38 2021 -0700

    IMPALA-10975: Refactor alter table ddl operation
    
    For almost all alter table DDL operations in catalogOpExecutor,
    a table is added to catalog update at a common place in the end
    if reloadMetadata is true. However for few sub ddl operations like ADD
    and DROP partitions, addTableToCatalogUpdate() is called locally.
    This patch is to refactor addTableToCatalogUpdate() and call it at one
    place for all the sub ddls.
    
    After this patch, a table (after alter table ddl) is added to catalog
    update if its old catalog version does not match the current catalog
    version.
    
    This patch also addresses a bug in which a HMS event was getting
    added to delete event log even if event processing is not active.
    
    Testing:
    Relying on existing tests since it is a minor refactoring.
    
    Change-Id: Ifbadcab68b4599ad18b681b1284052a47b74d802
    Reviewed-on: http://gerrit.cloudera.org:8080/17919
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 67 ++++++++++++----------
 1 file changed, 36 insertions(+), 31 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 5728752..71322cf 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -917,6 +917,8 @@ public class CatalogOpExecutor {
               BLACKLISTED_DBS_INCONSISTENT_ERR_STR));
     }
     tryWriteLock(tbl);
+    // get table's catalogVersion before altering it
+    long oldCatalogVersion = tbl.getCatalogVersion();
     // Get a new catalog version to assign to the table being altered.
     long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
     addCatalogServiceIdentifiers(tbl, catalog_.getCatalogServiceId(), newCatalogVersion);
@@ -940,6 +942,7 @@ public class CatalogOpExecutor {
 
       Table refreshedTable = null;
       boolean reloadMetadata = true;
+      String responseSummaryMsg = null;
       catalog_.getLock().writeLock().unlock();
 
       if (tbl instanceof KuduTable && altersKuduTable(params.getAlter_type())) {
@@ -959,16 +962,16 @@ public class CatalogOpExecutor {
               addColParams.isIf_not_exists());
           reloadTableSchema = true;
           if (added) {
-            addSummary(response, "New column(s) have been added to the table.");
+            responseSummaryMsg = "New column(s) have been added to the table.";
           } else {
-            addSummary(response, "No new column(s) have been added to the table.");
+            responseSummaryMsg = "No new column(s) have been added to the table.";
           }
           break;
         case REPLACE_COLUMNS:
           TAlterTableReplaceColsParams replaceColParams = params.getReplace_cols_params();
           alterTableReplaceCols(tbl, replaceColParams.getColumns());
           reloadTableSchema = true;
-          addSummary(response, "Table columns have been replaced.");
+          responseSummaryMsg = "Table columns have been replaced.";
           break;
         case ADD_PARTITION:
           // Create and add HdfsPartition objects to the corresponding HdfsTable and load
@@ -983,23 +986,22 @@ public class CatalogOpExecutor {
             // only add the versions for in-flight events when we are sure that the
             // partition was really added.
             catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
-            addTableToCatalogUpdate(refreshedTable, wantMinimalResult, response.result);
           }
           reloadMetadata = false;
-          addSummary(response, "New partition has been added to the table.");
+          responseSummaryMsg = "New partition has been added to the table.";
           break;
         case DROP_COLUMN:
           TAlterTableDropColParams dropColParams = params.getDrop_col_params();
           alterTableDropCol(tbl, dropColParams.getCol_name());
           reloadTableSchema = true;
-          addSummary(response, "Column has been dropped.");
+          responseSummaryMsg = "Column has been dropped.";
           break;
         case ALTER_COLUMN:
           TAlterTableAlterColParams alterColParams = params.getAlter_col_params();
           alterTableAlterCol(tbl, alterColParams.getCol_name(),
               alterColParams.getNew_col_def());
           reloadTableSchema = true;
-          addSummary(response, "Column has been altered.");
+          responseSummaryMsg = "Column has been altered.";
           break;
         case DROP_PARTITION:
           TAlterTableDropPartitionParams dropPartParams =
@@ -1018,10 +1020,9 @@ public class CatalogOpExecutor {
             // since by the time the event is received, the partition is already
             // removed from catalog and there is nothing to compare against during
             // self-event evaluation
-            addTableToCatalogUpdate(refreshedTable, wantMinimalResult, response.result);
           }
-          addSummary(response,
-              "Dropped " + numUpdatedPartitions.getRef() + " partition(s).");
+          responseSummaryMsg =
+              "Dropped " + numUpdatedPartitions.getRef() + " partition(s).";
           reloadMetadata = false;
           break;
         case RENAME_TABLE:
@@ -1037,10 +1038,10 @@ public class CatalogOpExecutor {
                   fileFormatParams.getFile_format(), numUpdatedPartitions);
 
           if (fileFormatParams.isSetPartition_set()) {
-            addSummary(response,
-                "Updated " + numUpdatedPartitions.getRef() + " partition(s).");
+            responseSummaryMsg =
+                "Updated " + numUpdatedPartitions.getRef() + " partition(s).";
           } else {
-            addSummary(response, "Updated table.");
+            responseSummaryMsg = "Updated table.";
           }
           break;
         case SET_ROW_FORMAT:
@@ -1050,10 +1051,10 @@ public class CatalogOpExecutor {
               rowFormatParams.getPartition_set(), rowFormatParams.getRow_format(),
               numUpdatedPartitions);
           if (rowFormatParams.isSetPartition_set()) {
-            addSummary(response,
-                "Updated " + numUpdatedPartitions.getRef() + " partition(s).");
+            responseSummaryMsg =
+                "Updated " + numUpdatedPartitions.getRef() + " partition(s).";
           } else {
-            addSummary(response, "Updated table.");
+            responseSummaryMsg = "Updated table.";
           }
           break;
         case SET_LOCATION:
@@ -1063,9 +1064,9 @@ public class CatalogOpExecutor {
           reloadFileMetadata = alterTableSetLocation(tbl, partitionSpec,
                setLocationParams.getLocation());
           if (partitionSpec == null) {
-            addSummary(response, "New location has been set.");
+            responseSummaryMsg = "New location has been set.";
           } else {
-            addSummary(response, "New location has been set for the specified partition.");
+            responseSummaryMsg = "New location has been set for the specified partition.";
           }
           break;
         case SET_TBL_PROPERTIES:
@@ -1073,10 +1074,10 @@ public class CatalogOpExecutor {
               numUpdatedPartitions);
           reloadTableSchema = true;
           if (params.getSet_tbl_properties_params().isSetPartition_set()) {
-            addSummary(response,
-                "Updated " + numUpdatedPartitions.getRef() + " partition(s).");
+            responseSummaryMsg =
+                "Updated " + numUpdatedPartitions.getRef() + " partition(s).";
           } else {
-            addSummary(response, "Updated table.");
+            responseSummaryMsg = "Updated table.";
           }
           break;
         case UNSET_TBL_PROPERTIES:
@@ -1084,10 +1085,10 @@ public class CatalogOpExecutor {
             numUpdatedPartitions);
           reloadTableSchema = true;
           if (params.getUnset_tbl_properties_params().isSetPartition_set()) {
-            addSummary(response, "Updated " + numUpdatedPartitions.getRef() +
-              " partition(s).");
+            responseSummaryMsg =
+                "Updated " + numUpdatedPartitions.getRef() + " partition(s).";
           } else {
-            addSummary(response, "Updated table.");
+            responseSummaryMsg = "Updated table.";
           }
           break;
         case UPDATE_STATS:
@@ -1096,8 +1097,8 @@ public class CatalogOpExecutor {
           alterTableUpdateStats(tbl, params.getUpdate_stats_params(),
               numUpdatedPartitions, numUpdatedColumns, debugAction);
           reloadTableSchema = true;
-          addSummary(response, "Updated " + numUpdatedPartitions.getRef() +
-              " partition(s) and " + numUpdatedColumns.getRef() + " column(s).");
+          responseSummaryMsg = "Updated " + numUpdatedPartitions.getRef() +
+              " partition(s) and " + numUpdatedColumns.getRef() + " column(s).";
           break;
         case SET_CACHED:
           Preconditions.checkState(params.isSetSet_cached_params());
@@ -1106,22 +1107,21 @@ public class CatalogOpExecutor {
           if (params.getSet_cached_params().getPartition_set() == null) {
             reloadFileMetadata =
                 alterTableSetCached(tbl, params.getSet_cached_params());
-            addSummary(response, op + "table.");
+            responseSummaryMsg = op + "table.";
           } else {
             alterPartitionSetCached(tbl, params.getSet_cached_params(),
                 numUpdatedPartitions);
-            addSummary(response,
-                op + numUpdatedPartitions.getRef() + " partition(s).");
+            responseSummaryMsg = op + numUpdatedPartitions.getRef() + " partition(s).";
           }
           break;
         case RECOVER_PARTITIONS:
           alterTableRecoverPartitions(tbl, debugAction);
-          addSummary(response, "Partitions have been recovered.");
+          responseSummaryMsg = "Partitions have been recovered.";
           break;
         case SET_OWNER:
           Preconditions.checkState(params.isSetSet_owner_params());
           alterTableOrViewSetOwner(tbl, params.getSet_owner_params(), response);
-          addSummary(response, "Updated table/view.");
+          responseSummaryMsg = "Updated table/view.";
           break;
         default:
           throw new UnsupportedOperationException(
@@ -1136,6 +1136,10 @@ public class CatalogOpExecutor {
         // now that HMS alter operation has succeeded, add this version to list of
         // inflight events in catalog table if event processing is enabled
         catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
+      }
+      addSummary(response, responseSummaryMsg);
+      // add table to catalog update if its old and existing versions do not match
+      if (tbl.getCatalogVersion() != oldCatalogVersion) {
         addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       }
       // Make sure all the modifications are done.
@@ -4613,6 +4617,7 @@ public class CatalogOpExecutor {
     if (!catalog_.isEventProcessingActive()) {
       LOG.trace("Not adding event {}:{} since events processing is not active", eventId,
           objectKey);
+      return;
     }
     catalog_.getMetastoreEventProcessor().getDeleteEventLog()
         .addRemovedObject(eventId, objectKey);