You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by db...@apache.org on 2024/01/31 09:07:17 UTC

(impala) branch master updated: IMPALA-12448: Avoid getting stuck when refreshing a non-existent partition

This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 00247c26e IMPALA-12448: Avoid getting stuck when refreshing a non-existent partition
00247c26e is described below

commit 00247c26e2a73d4ece4962be6fd5078843d938ae
Author: ttttttz <24...@qq.com>
AuthorDate: Wed Jan 31 00:21:32 2024 +0800

    IMPALA-12448: Avoid getting stuck when refreshing a non-existent partition
    
    In some cases, we shouldn't wait for the catalog version of 'result'
    because the table version has not changed after resetting metadata,
    such as refreshing a non-existent partition. If not, the query may be
    stuck for a long time. If the catalog version of 'result' is less
    than or equal to 'oldestTopicUpdateToGc_', return the catalog version
    of 'result' directly.
    
    Tests:
    * added e2e tests
    
    Change-Id: Iace7cdadda300b03896f92415822266354421887
    Reviewed-on: http://gerrit.cloudera.org:8080/20490
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |  4 +
 be/src/util/backend-gflag-util.cc                  |  2 +
 common/thrift/BackendGflags.thrift                 |  2 +
 .../impala/catalog/CatalogServiceCatalog.java      | 35 ++++----
 .../org/apache/impala/catalog/TopicUpdateLog.java  | 21 +++--
 .../test_refresh_invalid_partition.py              | 93 ++++++++++++++++++++++
 6 files changed, 133 insertions(+), 24 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index cc1667731..be0ed3769 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -155,6 +155,10 @@ DEFINE_int32(catalog_operation_log_size, 100, "Number of catalog operation log r
 DEFINE_bool(catalogd_ha_reset_metadata_on_failover, false, "If true, reset all metadata "
     "when the catalogd becomes active.");
 
+DEFINE_int32(topic_update_log_gc_frequency, 1000, "Frequency at which the entries "
+    "of the catalog topic update log are garbage collected. An entry may survive "
+    "for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_port);
 DECLARE_string(state_store_2_host);
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 40414c871..ec7f63876 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -115,6 +115,7 @@ DECLARE_bool(iceberg_allow_datafiles_in_table_location_only);
 DECLARE_int32(catalog_operation_log_size);
 DECLARE_string(hostname);
 DECLARE_bool(allow_catalog_cache_op_from_masked_users);
+DECLARE_int32(topic_update_log_gc_frequency);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -451,6 +452,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_hostname(FLAGS_hostname);
   cfg.__set_allow_catalog_cache_op_from_masked_users(
       FLAGS_allow_catalog_cache_op_from_masked_users);
+  cfg.__set_topic_update_log_gc_frequency(FLAGS_topic_update_log_gc_frequency);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 35aecb63d..a0c41a1b2 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -276,4 +276,6 @@ struct TBackendGflags {
   122: required bool allow_catalog_cache_op_from_masked_users
 
   123: required bool iceberg_allow_datafiles_in_table_location_only
+
+  124: required i32 topic_update_log_gc_frequency
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 83db6f7b9..88753aa14 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -189,7 +189,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  *   modify them. Each entry includes the number of times a catalog object has
  *   skipped a topic update, which version of the object was last sent in a topic update
  *   and what was the version of that topic update. Entries of the topic update log are
- *   garbage-collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates by the topic
+ *   garbage-collected every topicUpdateLogGcFrequency_ topic updates by the topic
  *   update processing thread to prevent the log from growing indefinitely. Metadata
  *   operations using SYNC_DDL are inspecting this log to identify the catalog topic
  *   version that the issuing impalad must wait for in order to ensure that the effects
@@ -200,10 +200,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  *   operations that use SYNC_DDL to hang while waiting for specific topic update log
  *   entries. That could happen if the thread processing the metadata operation stalls
  *   for a long period of time (longer than the time to process
- *   TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates) between the time the operation was
+ *   topicUpdateLogGcFrequency_ topic updates) between the time the operation was
  *   applied in the catalog cache and the time the SYNC_DDL version was checked. To reduce
  *   the probability of such an event, we set the value of the
- *   TOPIC_UPDATE_LOG_GC_FREQUENCY to a large value. Also, to prevent metadata operations
+ *   topicUpdateLogGcFrequency_ to a large value. Also, to prevent metadata operations
  *   from hanging in that path due to unknown issues (e.g. bugs), operations using
  *   SYNC_DDL are not allowed to wait indefinitely for specific topic log entries and an
  *   exception is thrown if the specified max wait time is exceeded. See
@@ -3300,18 +3300,19 @@ public class CatalogServiceCatalog extends Catalog {
   public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
 
   /**
-   * Returns the version of the topic update that an operation using SYNC_DDL must wait
-   * for in order to ensure that its result set ('result') has been broadcast to all the
-   * coordinators. For operations that don't produce a result set, e.g. INVALIDATE
-   * METADATA, return the version specified in 'result.version'.
+   * Returns the catalog version of the topic update that an operation using SYNC_DDL
+   * must wait for in order to ensure that its result set ('result') has been broadcast
+   * to all the coordinators. For operations that don't produce a result set,
+   * e.g. INVALIDATE METADATA, return the version specified in 'result.version'.
    */
   public long waitForSyncDdlVersion(TCatalogUpdateResult result) throws CatalogException {
-    if (!result.isSetUpdated_catalog_objects() &&
-        !result.isSetRemoved_catalog_objects()) {
+    if (result.getVersion() <= topicUpdateLog_.getOldestTopicUpdateToGc()
+        || (!result.isSetUpdated_catalog_objects() &&
+        !result.isSetRemoved_catalog_objects())) {
       return result.getVersion();
     }
     long lastSentTopicUpdate = lastSentTopicUpdate_.get();
-    // Maximum number of attempts (topic updates) to find the catalog topic version that
+    // Maximum number of attempts (topic updates) to find the catalog version that
     // an operation using SYNC_DDL must wait for.
     // this maximum attempt limit is only applicable when topicUpdateTblLockMaxWaitTimeMs_
     // is disabled (set to 0). This is due to the fact that the topic update thread will
@@ -3338,7 +3339,7 @@ public class CatalogServiceCatalog extends Catalog {
       long topicVersionForDeletes =
           getCoveringTopicUpdateVersion(result.getRemoved_catalog_objects());
       if (topicVersionForUpdates == -1 || topicVersionForDeletes == -1) {
-        LOG.info("Topic version for {} not found yet. Last sent topic version: {}. " +
+        LOG.info("Topic update for {} not found yet. Last sent catalog version: {}. " +
                 "Updated objects: {}, deleted objects: {}",
             topicVersionForUpdates == -1 ? "updates" : "deletes",
             lastSentTopicUpdate_.get(),
@@ -3358,11 +3359,11 @@ public class CatalogServiceCatalog extends Catalog {
         if (lastSentTopicUpdate != currentTopicUpdate) {
           ++numAttempts;
           if (shouldTimeOut(numAttempts, maxNumAttempts, begin)) {
-            LOG.error(String.format("Couldn't retrieve the covering topic version for "
+            LOG.error(String.format("Couldn't retrieve the covering topic update for "
                     + "catalog objects. Updated objects: %s, deleted objects: %s",
                 FeCatalogUtils.debugString(result.updated_catalog_objects),
                 FeCatalogUtils.debugString(result.removed_catalog_objects)));
-            throw new CatalogException("Couldn't retrieve the catalog topic version " +
+            throw new CatalogException("Couldn't retrieve the catalog topic update " +
                 "for the SYNC_DDL operation after " + numAttempts + " attempts and " +
                 "elapsed time of " + (System.currentTimeMillis() - begin) + " msec. " +
                 "The operation has been successfully executed but its effects may have " +
@@ -3375,9 +3376,9 @@ public class CatalogServiceCatalog extends Catalog {
       }
     }
     Preconditions.checkState(versionToWaitFor >= 0);
-    LOG.info("Operation using SYNC_DDL is waiting for catalog topic version: " +
-        versionToWaitFor + ". Time to identify topic version (msec): " +
-        (System.currentTimeMillis() - begin));
+    LOG.info("Operation using SYNC_DDL is waiting for catalog version {} " +
+            "to be sent. Time to identify topic update (msec): {}.",
+        versionToWaitFor, (System.currentTimeMillis() - begin));
     return versionToWaitFor;
   }
 
@@ -3421,7 +3422,7 @@ public class CatalogServiceCatalog extends Catalog {
       // a) It corresponds to a new catalog object that hasn't been processed by a catalog
       // update yet.
       // b) It corresponds to a catalog object that hasn't been modified for at least
-      // TOPIC_UPDATE_LOG_GC_FREQUENCY updates and hence its entry was garbage
+      // topicUpdateLogGcFrequency_ updates and hence its entry was garbage
       // collected.
       // In both cases, -1 is returned to indicate that we're waiting for the
       // entry to show up in the topic update log.
diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
index 040e0cf2d..a20abd5ae 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
@@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
+import org.apache.impala.service.BackendConfig;
+
 // A log of topic update information for each catalog object. An entry is added to
 // the log when a catalog object is processed (added/removed/skipped) in a topic
 // update and it is replaced every time the catalog object is processed in a
@@ -33,18 +35,19 @@ import com.google.common.base.Strings;
 //
 // To prevent the log from growing indefinitely, the oldest entries
 // (in terms of last topic update that processed the associated catalog objects) are
-// garbage collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates. That will cause
+// garbage collected every topicUpdateLogGcFrequency_ topic updates. That will cause
 // entries of deleted catalog objects or entries of objects that haven't been processed
-// by the catalog for at least TOPIC_UPDATE_LOG_GC_FREQUENCY updates to be removed from
+// by the catalog for at least topicUpdateLogGcFrequency_ updates to be removed from
 // the log.
 public class TopicUpdateLog {
   private static final Logger LOG = LoggerFactory.getLogger(TopicUpdateLog.class);
   // Frequency at which the entries of the topic update log are garbage collected.
-  // An entry may survive for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.
-  private final static int TOPIC_UPDATE_LOG_GC_FREQUENCY = 1000;
+  // An entry may survive for (2 * topicUpdateLogGcFrequency_) - 1 topic updates.
+  private int topicUpdateLogGcFrequency_ = BackendConfig.INSTANCE
+      .getBackendCfg().topic_update_log_gc_frequency;
 
   // Number of topic updates left to trigger a gc of topic update log entries.
-  private int numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+  private int numTopicUpdatesToGc_ = topicUpdateLogGcFrequency_;
 
   // In the next gc cycle of topic update log entries, all the entries that were last
   // added to a topic update with version less than or equal to
@@ -109,7 +112,7 @@ public class TopicUpdateLog {
 
   /**
    * Garbage-collects topic update log entries. These are entries that haven't been
-   * added to any of the last TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates.
+   * added to any of the last topicUpdateLogGcFrequency_ topic updates.
    */
   public void garbageCollectUpdateLogEntries(long lastTopicUpdateVersion) {
     if (oldestTopicUpdateToGc_ == -1) {
@@ -130,7 +133,7 @@ public class TopicUpdateLog {
           }
         }
       }
-      numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+      numTopicUpdatesToGc_ = topicUpdateLogGcFrequency_;
       oldestTopicUpdateToGc_ = lastTopicUpdateVersion;
       LOG.info("Topic update log GC finished. Removed {} entries.",
           numEntriesRemoved);
@@ -159,5 +162,9 @@ public class TopicUpdateLog {
     if (entry == null) entry = new Entry();
     return entry;
   }
+
+  public long getOldestTopicUpdateToGc() {
+    return oldestTopicUpdateToGc_;
+  }
 }
 
diff --git a/tests/custom_cluster/test_refresh_invalid_partition.py b/tests/custom_cluster/test_refresh_invalid_partition.py
new file mode 100644
index 000000000..57b26fded
--- /dev/null
+++ b/tests/custom_cluster/test_refresh_invalid_partition.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestRefreshInvalidPartition(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(self):
+      return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--topic_update_log_gc_frequency=10")
+  def test_refresh_invalid_partition_with_sync_ddl(self, vector, unique_database):
+    """
+    Regression test for IMPALA-12448. Avoid getting stuck when refreshing a
+    non-existent partition with sync_ddl.
+    """
+    table_name_1 = unique_database + '.' + "partition_test_table_1"
+    table_name_2 = unique_database + '.' + "partition_test_table_2"
+    self.client.execute(
+        'create table %s (y int) partitioned by (x int)' % table_name_1)
+    self.client.execute(
+        'create table %s (y int) partitioned by (x int)' % table_name_2)
+
+    self.client.execute('insert into table %s partition (x=1) values(1)'
+                        % table_name_1)
+    self.client.execute('insert into table %s partition (x=1) values(1)'
+                        % table_name_2)
+
+    assert '1\t1' == self.client.execute(
+        'select * from %s' % table_name_1).get_data()
+    assert '1\t1' == self.client.execute(
+        'select * from %s' % table_name_2).get_data()
+
+    """
+    Run it multiple times so that at least one topic update log GC is triggered.
+    """
+    i = 15
+    while i > 0:
+      self.execute_query('refresh %s' % table_name_1,
+        query_options={"SYNC_DDL": "true"})
+      i -= 1
+
+    """Refresh a non-existent partition with sync_ddl."""
+    self.execute_query_expect_success(self.client, 'refresh %s partition (x=999)'
+        % table_name_2, query_options={"SYNC_DDL": "true", "EXEC_TIME_LIMIT_S": "30"})
+
+  @CustomClusterTestSuite.with_args(
+    statestored_args="--statestore_update_frequency_ms=5000")
+  def test_refresh_missing_partition(self, unique_database):
+    client1 = self.cluster.impalads[1].service.create_beeswax_client()
+    client2 = self.cluster.impalads[2].service.create_beeswax_client()
+    self.client.execute('create table {}.tbl (i int) partitioned by (p int)'
+        .format(unique_database))
+    self.execute_query(
+        'insert into {}.tbl partition(p) values (0,0), (1,1)'.format(unique_database),
+        query_options={"SYNC_DDL": "true"})
+    self.execute_query_expect_success(
+        self.client,
+        'alter table {}.tbl drop partition(p=0)'.format(unique_database),
+        {"SYNC_DDL": "false"})
+    self.execute_query_expect_success(
+        client1,
+        'refresh {}.tbl partition(p=0)'.format(unique_database),
+        {"SYNC_DDL": "true"})
+    show_parts_stmt = 'show partitions {}.tbl'.format(unique_database)
+    res = self.execute_query_expect_success(client2, show_parts_stmt)
+    # First line is the header. Only one partition should be shown so the
+    # result has two lines.
+    assert len(res.data) == 2
+    res = self.execute_query_expect_success(client1, show_parts_stmt)
+    assert len(res.data) == 2
+    res = self.execute_query_expect_success(self.client, show_parts_stmt)
+    assert len(res.data) == 2