You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/07/14 13:11:49 UTC

[impala] 01/03: IMPALA-10490: Fix illegalStateException in drop stats

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 85ebfe220aa6fe654b860d5f3fe1d8902940de0f
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed Jul 7 14:58:07 2021 -0700

    IMPALA-10490: Fix illegalStateException in drop stats
    
    If a table was created and incremental stats were computed on the table
    while events processing is turned off, then after the events processing
    is turned on a drop stats or truncate table command on such a table
    fails with IllegalStateException. This happens because the
    catalog service identifiers are not found in the table properties
    while partition for the table are being altered. This patch adds
    the catalog service identifiers for drop stats, truncate and comment on
    statement code paths to fix the error.
    
    Testing:
    1. Added more statements to the test_self_events test to cover the
    newly added logic.
    2. Manually tested the following scenario which was previously failing
      a. starting catalogd without events processing.
      b. create partitioned table and compute incremental stats on it.
      c. restart catalogd with events processing turned on.
      d. issue drop stats command.
    
    Change-Id: Iaa0b4043879370c52049d22acb49c847b0be1c68
    Reviewed-on: http://gerrit.cloudera.org:8080/17659
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/service/CatalogOpExecutor.java    | 12 ++++++++++++
 tests/custom_cluster/test_event_processing.py           | 17 ++++++++++++++---
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 1fc20fc..3b77cc9 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2099,6 +2099,8 @@ public class CatalogOpExecutor {
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
+      addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(),
+          newCatalogVersion);
       if (params.getPartition_set() == null) {
         // TODO: Report the number of updated partitions/columns to the user?
         // TODO: bulk alter the partitions.
@@ -2124,6 +2126,7 @@ public class CatalogOpExecutor {
       }
       loadTableMetadata(table, newCatalogVersion, /*reloadFileMetadata=*/false,
           /*reloadTableSchema=*/true, /*partitionsToUpdate=*/null, "DROP STATS");
+      catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion);
       addTableToCatalogUpdate(table, wantMinimalResult, resp.result);
       addSummary(resp, "Stats have been dropped.");
     } finally {
@@ -2667,6 +2670,7 @@ public class CatalogOpExecutor {
       Preconditions.checkState(newCatalogVersion > 0);
       addSummary(resp, "Table has been truncated.");
       loadTableMetadata(table, newCatalogVersion, true, true, null, "TRUNCATE");
+      catalog_.addVersionsForInflightEvents(false, table, newCatalogVersion);
       addTableToCatalogUpdate(table, wantMinimalResult, resp.result);
     } finally {
       UnlockWriteLockIfErronouslyLocked();
@@ -2710,6 +2714,8 @@ public class CatalogOpExecutor {
             table.getFullName(), sw.elapsed(TimeUnit.MILLISECONDS));
         newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
         catalog_.getLock().writeLock().unlock();
+        addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(),
+            newCatalogVersion);
         TblTransaction tblTxn = MetastoreShim.createTblTransaction(hmsClient,
             table.getMetaStoreTable(), txn.getId());
         HdfsTable hdfsTable = (HdfsTable) table;
@@ -2810,6 +2816,8 @@ public class CatalogOpExecutor {
     Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
     long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
     catalog_.getLock().writeLock().unlock();
+    addCatalogServiceIdentifiers(table, catalog_.getCatalogServiceId(),
+        newCatalogVersion);
     HdfsTable hdfsTable = (HdfsTable) table;
     boolean isTableBeingReplicated = false;
     Stopwatch sw = Stopwatch.createStarted();
@@ -6334,6 +6342,7 @@ public class CatalogOpExecutor {
       }
       applyAlterTable(msTbl);
       loadTableMetadata(tbl, newCatalogVersion, false, false, null, "ALTER COMMENT");
+      catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
       addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       addSummary(response, String.format("Updated %s.", (isView) ? "view" : "table"));
     } finally {
@@ -6350,6 +6359,8 @@ public class CatalogOpExecutor {
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
+      addCatalogServiceIdentifiers(tbl, catalog_.getCatalogServiceId(),
+          newCatalogVersion);
       if (tbl instanceof KuduTable) {
         TColumn new_col = new TColumn(columnName,
             tbl.getColumn(columnName).getType().toThrift());
@@ -6369,6 +6380,7 @@ public class CatalogOpExecutor {
       }
       loadTableMetadata(tbl, newCatalogVersion, false, true, null,
           "ALTER COLUMN COMMENT");
+      catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion);
       addTableToCatalogUpdate(tbl, wantMinimalResult, response.result);
       addSummary(response, "Column has been altered.");
     } finally {
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index c9679e6..e5ed34d 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -659,7 +659,10 @@ class TestEventProcessing(CustomClusterTestSuite):
           "alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name),
           "alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name),
           "alter table {0}.{1} ALTER COLUMN C1 set comment 'c1 comment'".format(db_name,
-                                                                                tbl_name),
+            tbl_name),
+          "comment on table {0}.{1} IS 'table level comment'".format(db_name, tbl_name),
+          "comment on column {0}.{1}.C1 IS 'column level comment'".format(db_name,
+            tbl_name),
           "alter table {0}.{1} ADD COLUMNS (c2 int, c3 string)".format(db_name, tbl_name),
           "alter table {0}.{1} DROP COLUMN c1".format(db_name, tbl_name),
           "alter table {0}.{1} DROP COLUMN c2".format(db_name, tbl_name),
@@ -670,6 +673,8 @@ class TestEventProcessing(CustomClusterTestSuite):
           "alter view {0}.{1} set owner role `test-view-role`".format(db_name, view_name),
           # compute stats will generates ALTER_PARTITION
           "compute stats {0}.{1}".format(db_name, tbl_name),
+          "compute incremental stats {0}.{1}".format(db_name, tbl_name),
+          "drop stats {0}.{1}".format(db_name, tbl_name),
           # insert into a existing partition; generates INSERT self-event
           "insert into table {0}.{1} partition "
           "(year, month) select * from functional.alltypessmall where year=2009 "
@@ -677,8 +682,14 @@ class TestEventProcessing(CustomClusterTestSuite):
           # insert overwrite query from Impala also generates a INSERT self-event
           "insert overwrite table {0}.{1} partition "
           "(year, month) select * from functional.alltypessmall where year=2009 "
-          "and month=1".format(db_name, tbl_name)],
-      # Queries which will not increment the events-skipped counter
+          "and month=1".format(db_name, tbl_name),
+          # events processor doesn't process delete column stats events currently,
+          # however, in case of incremental stats, there could be alter table and
+          # alter partition events which should be ignored. Hence we run compute stats
+          # before to make sure that the truncate table command generated alter events
+          # are ignored.
+          "compute incremental stats {0}.{1}".format(db_name, tbl_name),
+          "truncate table {0}.{1}".format(db_name, tbl_name)],
       False: [
           "create table {0}.{1} like functional.alltypessmall "
           "stored as parquet".format(db_name, tbl_name),