You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/04/07 06:53:04 UTC

[impala] branch master updated: IMPALA-6671: Change wait for sync ddl timeout

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 34a183d  IMPALA-6671: Change wait for sync ddl timeout
34a183d is described below

commit 34a183d3d6632c70c19af939d8034af2b5c42f11
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed Mar 31 10:48:00 2021 -0700

    IMPALA-6671: Change wait for sync ddl timeout
    
    When skip locked tables from topic updates is enabled
    (topic_update_tbl_max_wait_time_ms > 0), it is possible that a thread
    waiting for a topic update during a sync ddl execution to terminate
    earlier. This is because the waitForSyncDdlVersion currently statically
    counts the number of instances of topic updates against a maximum
    and bails out when the maximum is reached. With topic update thread
    skipping locked tables, this number of instances of topic updates
    is more likely to hit the maximum attempt limit.
    
    This commit changes the logic so that when locked tables are skipped from
    topic updates, the sync ddl operation waits until a configurable
    timeout. This timeout value is specified in seconds using the configuration
    max_wait_time_for_sync_ddl_s. The default value of this configuration is 0
    which means it waits indefinitely. This makes sure that only the sync ddl
    queries on a table which has been locked wait for extended durations while
    the other unrelated sync ddl queries can finish appropriately.
    
    Also this commit changes the default values of following flags
    which were introduced in the earlier patch for IMPALA-6671.
    
    The current default value of topic_update_tbl_max_wait_time_ms
    of 500ms is too low and may skip the locked tables more
    aggressively than needed. The new defaults were set based on
    analysis of a real world deployment.
    
    topic_update_tbl_max_wait_time_ms = 120000
    catalog_max_lock_skipped_topic_updates = 3
    
    Testing:
    1. Ran existing test test_topic_update_frequency.
    2. Added a new test for the newly added flag.
    
    Change-Id: I79e64cdec0e6aa7b597a47851b4b5c5441ca5528
    Reviewed-on: http://gerrit.cloudera.org:8080/17253
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   | 13 +++++--
 be/src/util/backend-gflag-util.cc                  |  2 ++
 common/thrift/BackendGflags.thrift                 |  2 ++
 .../impala/catalog/CatalogServiceCatalog.java      | 30 ++++++++++++++--
 .../org/apache/impala/service/BackendConfig.java   |  4 +++
 .../custom_cluster/test_topic_update_frequency.py  | 40 ++++++++++++++++++++++
 6 files changed, 86 insertions(+), 5 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index ed0f118..d354deb 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -74,12 +74,12 @@ DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maxim
     "(in seconds) a partial catalog object fetch RPC spends in the queue waiting "
     "to run. Must be set to a value greater than zero.");
 
-DEFINE_int32(catalog_max_lock_skipped_topic_updates, 2, "Maximum number of topic "
+DEFINE_int32(catalog_max_lock_skipped_topic_updates, 3, "Maximum number of topic "
     "updates skipped for a table due to lock contention in catalogd after which it must"
     "be added to the topic the update log. This limit only applies to distinct lock "
     "operations which block the topic update thread.");
 
-DEFINE_int64(topic_update_tbl_max_wait_time_ms, 500, "Maximum time "
+DEFINE_int64(topic_update_tbl_max_wait_time_ms, 120000, "Maximum time "
      "(in milliseconds) catalog's topic update thread will wait to acquire lock on "
      "table. If the topic update thread cannot acquire a table lock it skips the table "
      "from that topic update and processes the table in the next update. However to "
@@ -88,6 +88,15 @@ DEFINE_int64(topic_update_tbl_max_wait_time_ms, 500, "Maximum time "
      "table lock. A value of 0 disables the timeout based locking which means topic "
      "update thread will always block until table lock is acquired.");
 
+DEFINE_int32(max_wait_time_for_sync_ddl_s, 0, "Maximum time (in seconds) until "
+     "which a sync ddl operation will wait for the updated tables "
+     "to be the added to the catalog topic. A value of 0 means sync ddl operation will "
+     "wait as long as necessary until the update is propogated to all the coordinators. "
+     "This flag only takes effect when topic_update_tbl_max_wait_time_ms is enabled."
+     "A value greater than 0 means catalogd will wait until that number of seconds "
+     "before throwing an error indicating that not all the "
+     "coordinators might have applied the changes caused due to the ddl.");
+
 DECLARE_string(debug_actions);
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index fb99457..54e4c39 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -90,6 +90,7 @@ DECLARE_bool(enable_incremental_metadata_updates);
 DECLARE_int64(topic_update_tbl_max_wait_time_ms);
 DECLARE_int32(catalog_max_lock_skipped_topic_updates);
 DECLARE_string(scratch_dirs);
+DECLARE_int32(max_wait_time_for_sync_ddl_s);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -277,6 +278,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_saml2_group_filter(FLAGS_saml2_group_filter);
   cfg.__set_saml2_ee_test_mode(FLAGS_saml2_ee_test_mode);
   cfg.__set_scratch_dirs(FLAGS_scratch_dirs);
+  cfg.__set_max_wait_time_for_sync_ddl_s(FLAGS_max_wait_time_for_sync_ddl_s);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 5dfcf50..7d75c69 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -193,4 +193,6 @@ struct TBackendGflags {
   84: required string scratch_dirs
 
   85: required bool enable_row_filtering
+
+  86: required i32 max_wait_time_for_sync_ddl_s
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 0f7d8f1..3703b43 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -3086,6 +3086,11 @@ public class CatalogServiceCatalog extends Catalog {
     long lastSentTopicUpdate = lastSentTopicUpdate_.get();
     // Maximum number of attempts (topic updates) to find the catalog topic version that
     // an operation using SYNC_DDL must wait for.
+    // this maximum attempt limit is only applicable when topicUpdateTblLockMaxWaitTimeMs_
+    // is disabled (set to 0). This is due to the fact that the topic update thread will
+    // not block until it add the table to the topics when this configuration set to a
+    // non-zero value. In such a case we don't know how many topic updates would be
+    // required before the required table version is added to the topics.
     long maxNumAttempts = 5;
     if (result.isSetUpdated_catalog_objects()) {
       maxNumAttempts = Math.max(maxNumAttempts,
@@ -3125,13 +3130,14 @@ public class CatalogServiceCatalog extends Catalog {
         // threshold.
         if (lastSentTopicUpdate != currentTopicUpdate) {
           ++numAttempts;
-          if (numAttempts > maxNumAttempts) {
+          if (shouldTimeOut(numAttempts, maxNumAttempts, begin)) {
             LOG.error(String.format("Couldn't retrieve the covering topic version for "
-                + "catalog objects. Updated objects: %s, deleted objects: %s",
+                    + "catalog objects. Updated objects: %s, deleted objects: %s",
                 FeCatalogUtils.debugString(result.updated_catalog_objects),
                 FeCatalogUtils.debugString(result.removed_catalog_objects)));
             throw new CatalogException("Couldn't retrieve the catalog topic version " +
-                "for the SYNC_DDL operation after " + maxNumAttempts + " attempts." +
+                "for the SYNC_DDL operation after " + numAttempts + " attempts and " +
+                "elapsed time of " + (System.currentTimeMillis() - begin) + " msec. " +
                 "The operation has been successfully executed but its effects may have " +
                 "not been broadcast to all the coordinators.");
           }
@@ -3149,6 +3155,24 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * This util method determines if the sync ddl operation which is waiting for the
+   * topic update to be available should timeout or not based on given number of attempts
+   * and startTime. If {@code max_wait_time_for_sync_ddl_s} flag is set, it
+   * checks for the time elapsed since the startTime otherwise checks if the numAttempts
+   * is greater than maxAttempts.
+   * @return true if the operation should timeout else false if it needs to wait more.
+   */
+  private boolean shouldTimeOut(long numAttempts, long maxNumAttempts, long startTime) {
+    int timeoutSecs = BackendConfig.INSTANCE.getMaxWaitTimeForSyncDdlSecs();
+    if (topicUpdateTblLockMaxWaitTimeMs_ > 0) {
+      if (timeoutSecs <= 0) return false;
+      return (System.currentTimeMillis() - startTime) > timeoutSecs * 1000L;
+    } else {
+      return numAttempts > maxNumAttempts;
+    }
+  }
+
+  /**
    * Returns the version of the topic update that covers a set of TCatalogObjects.
    * A topic update U covers a TCatalogObject T, corresponding to a catalog object O,
    * if last_sent_version(O) >= catalog_version(T) && catalog_version(U) >=
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 00e9f1c..9513b9b 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -276,4 +276,8 @@ public class BackendConfig {
   public boolean isDedicatedCoordinator() {
     return (backendCfg_.is_executor == false) && (backendCfg_.is_coordinator == true);
   }
+
+  public int getMaxWaitTimeForSyncDdlSecs() {
+    return backendCfg_.max_wait_time_for_sync_ddl_s;
+  }
 }
diff --git a/tests/custom_cluster/test_topic_update_frequency.py b/tests/custom_cluster/test_topic_update_frequency.py
index 66ad692..30069fe 100644
--- a/tests/custom_cluster/test_topic_update_frequency.py
+++ b/tests/custom_cluster/test_topic_update_frequency.py
@@ -212,3 +212,43 @@ class TestTopicUpdateFrequency(CustomClusterTestSuite):
     self.__run_topic_update_test(blocking_query,
       non_blocking_queries, init_queries, blocking_query_options=blocking_query_options,
       expect_topic_updates_to_block=True)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--topic_update_tbl_max_wait_time_ms=120000 "
+                  "--max_wait_time_for_sync_ddl_s=5")
+  def test_topic_sync_ddl_error(self, unique_database):
+    """Test makes sure that if a sync ddl query on a unrelated table is executed with
+     timeout, it errors out if table is not published in topic updates within the timeout
+     value."""
+    sync_ddl_query = "create table {0}.{1} (c int)".format(unique_database, "test1")
+    sync_ddl_query_2 = "create table {0}.{1} (c int)".format(unique_database, "test2")
+    blocking_query = "refresh tpcds.store_sales"
+    debug_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@30"
+    self.client.execute(blocking_query)
+    blocking_query_options = {
+      "debug_action": debug_action,
+      "sync_ddl": "false"
+    }
+    slow_query_pool = ThreadPool(processes=1)
+    # run the slow query on the impalad-0 with the given query options
+    slow_query_future = slow_query_pool.apply_async(self.exec_and_time,
+      args=(blocking_query, blocking_query_options, 0))
+    # wait until the slow query is executing and blocking the topic update thread
+    # to avoid any race conditions in the test
+    time.sleep(1)
+    # now run the sync ddl query; we should expect this sync ddl query to fail
+    # since the timeout value is too low and topic update thread doesn't get unblocked
+    # before the timeout.
+    self.execute_query_expect_failure(self.client, sync_ddl_query, {"sync_ddl": "true"})
+    # wait for the slow query to complete
+    slow_query_future.get()
+    # if query is not sync ddl it should not error out
+    slow_query_future = slow_query_pool.apply_async(self.exec_and_time,
+      args=(blocking_query, blocking_query_options, 0))
+    # wait until the slow query is executing and blocking the topic update thread
+    # to avoid any race conditions in the test
+    time.sleep(1)
+    self.execute_query_expect_success(self.client, sync_ddl_query_2,
+      {"sync_ddl": "false"})
+    slow_query_future.get()