You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by db...@apache.org on 2024/01/31 09:07:17 UTC
(impala) branch master updated: IMPALA-12448: Avoid getting stuck when refreshing a non-existent partition
This is an automated email from the ASF dual-hosted git repository.
dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 00247c26e IMPALA-12448: Avoid getting stuck when refreshing a non-existent partition
00247c26e is described below
commit 00247c26e2a73d4ece4962be6fd5078843d938ae
Author: ttttttz <24...@qq.com>
AuthorDate: Wed Jan 31 00:21:32 2024 +0800
IMPALA-12448: Avoid getting stuck when refreshing a non-existent partition
In some cases, we shouldn't wait for the catalog version of 'result'
because the table version has not changed after resetting metadata,
such as refreshing a non-existent partition. If not, the query may be
stuck for a long time. If the catalog version of 'result' is less
than or equal to 'oldestTopicUpdateToGc_', return the catalog version
of 'result' directly.
Tests:
* added e2e tests
Change-Id: Iace7cdadda300b03896f92415822266354421887
Reviewed-on: http://gerrit.cloudera.org:8080/20490
Reviewed-by: Quanlong Huang <hu...@gmail.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/catalog/catalog-server.cc | 4 +
be/src/util/backend-gflag-util.cc | 2 +
common/thrift/BackendGflags.thrift | 2 +
.../impala/catalog/CatalogServiceCatalog.java | 35 ++++----
.../org/apache/impala/catalog/TopicUpdateLog.java | 21 +++--
.../test_refresh_invalid_partition.py | 93 ++++++++++++++++++++++
6 files changed, 133 insertions(+), 24 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index cc1667731..be0ed3769 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -155,6 +155,10 @@ DEFINE_int32(catalog_operation_log_size, 100, "Number of catalog operation log r
DEFINE_bool(catalogd_ha_reset_metadata_on_failover, false, "If true, reset all metadata "
"when the catalogd becomes active.");
+DEFINE_int32(topic_update_log_gc_frequency, 1000, "Frequency at which the entries "
+ "of the catalog topic update log are garbage collected. An entry may survive "
+ "for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.");
+
DECLARE_string(state_store_host);
DECLARE_int32(state_store_port);
DECLARE_string(state_store_2_host);
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 40414c871..ec7f63876 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -115,6 +115,7 @@ DECLARE_bool(iceberg_allow_datafiles_in_table_location_only);
DECLARE_int32(catalog_operation_log_size);
DECLARE_string(hostname);
DECLARE_bool(allow_catalog_cache_op_from_masked_users);
+DECLARE_int32(topic_update_log_gc_frequency);
// HS2 SAML2.0 configuration
// Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -451,6 +452,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_hostname(FLAGS_hostname);
cfg.__set_allow_catalog_cache_op_from_masked_users(
FLAGS_allow_catalog_cache_op_from_masked_users);
+ cfg.__set_topic_update_log_gc_frequency(FLAGS_topic_update_log_gc_frequency);
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 35aecb63d..a0c41a1b2 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -276,4 +276,6 @@ struct TBackendGflags {
122: required bool allow_catalog_cache_op_from_masked_users
123: required bool iceberg_allow_datafiles_in_table_location_only
+
+ 124: required i32 topic_update_log_gc_frequency
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 83db6f7b9..88753aa14 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -189,7 +189,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* modify them. Each entry includes the number of times a catalog object has
* skipped a topic update, which version of the object was last sent in a topic update
* and what was the version of that topic update. Entries of the topic update log are
- * garbage-collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates by the topic
+ * garbage-collected every topicUpdateLogGcFrequency_ topic updates by the topic
* update processing thread to prevent the log from growing indefinitely. Metadata
* operations using SYNC_DDL are inspecting this log to identify the catalog topic
* version that the issuing impalad must wait for in order to ensure that the effects
@@ -200,10 +200,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
* operations that use SYNC_DDL to hang while waiting for specific topic update log
* entries. That could happen if the thread processing the metadata operation stalls
* for a long period of time (longer than the time to process
- * TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates) between the time the operation was
+ * topicUpdateLogGcFrequency_ topic updates) between the time the operation was
* applied in the catalog cache and the time the SYNC_DDL version was checked. To reduce
* the probability of such an event, we set the value of the
- * TOPIC_UPDATE_LOG_GC_FREQUENCY to a large value. Also, to prevent metadata operations
+ * topicUpdateLogGcFrequency_ to a large value. Also, to prevent metadata operations
* from hanging in that path due to unknown issues (e.g. bugs), operations using
* SYNC_DDL are not allowed to wait indefinitely for specific topic log entries and an
* exception is thrown if the specified max wait time is exceeded. See
@@ -3300,18 +3300,19 @@ public class CatalogServiceCatalog extends Catalog {
public CatalogDeltaLog getDeleteLog() { return deleteLog_; }
/**
- * Returns the version of the topic update that an operation using SYNC_DDL must wait
- * for in order to ensure that its result set ('result') has been broadcast to all the
- * coordinators. For operations that don't produce a result set, e.g. INVALIDATE
- * METADATA, return the version specified in 'result.version'.
+ * Returns the catalog version of the topic update that an operation using SYNC_DDL
+ * must wait for in order to ensure that its result set ('result') has been broadcast
+ * to all the coordinators. For operations that don't produce a result set,
+ * e.g. INVALIDATE METADATA, return the version specified in 'result.version'.
*/
public long waitForSyncDdlVersion(TCatalogUpdateResult result) throws CatalogException {
- if (!result.isSetUpdated_catalog_objects() &&
- !result.isSetRemoved_catalog_objects()) {
+ if (result.getVersion() <= topicUpdateLog_.getOldestTopicUpdateToGc()
+ || (!result.isSetUpdated_catalog_objects() &&
+ !result.isSetRemoved_catalog_objects())) {
return result.getVersion();
}
long lastSentTopicUpdate = lastSentTopicUpdate_.get();
- // Maximum number of attempts (topic updates) to find the catalog topic version that
+ // Maximum number of attempts (topic updates) to find the catalog version that
// an operation using SYNC_DDL must wait for.
// this maximum attempt limit is only applicable when topicUpdateTblLockMaxWaitTimeMs_
// is disabled (set to 0). This is due to the fact that the topic update thread will
@@ -3338,7 +3339,7 @@ public class CatalogServiceCatalog extends Catalog {
long topicVersionForDeletes =
getCoveringTopicUpdateVersion(result.getRemoved_catalog_objects());
if (topicVersionForUpdates == -1 || topicVersionForDeletes == -1) {
- LOG.info("Topic version for {} not found yet. Last sent topic version: {}. " +
+ LOG.info("Topic update for {} not found yet. Last sent catalog version: {}. " +
"Updated objects: {}, deleted objects: {}",
topicVersionForUpdates == -1 ? "updates" : "deletes",
lastSentTopicUpdate_.get(),
@@ -3358,11 +3359,11 @@ public class CatalogServiceCatalog extends Catalog {
if (lastSentTopicUpdate != currentTopicUpdate) {
++numAttempts;
if (shouldTimeOut(numAttempts, maxNumAttempts, begin)) {
- LOG.error(String.format("Couldn't retrieve the covering topic version for "
+ LOG.error(String.format("Couldn't retrieve the covering topic update for "
+ "catalog objects. Updated objects: %s, deleted objects: %s",
FeCatalogUtils.debugString(result.updated_catalog_objects),
FeCatalogUtils.debugString(result.removed_catalog_objects)));
- throw new CatalogException("Couldn't retrieve the catalog topic version " +
+ throw new CatalogException("Couldn't retrieve the catalog topic update " +
"for the SYNC_DDL operation after " + numAttempts + " attempts and " +
"elapsed time of " + (System.currentTimeMillis() - begin) + " msec. " +
"The operation has been successfully executed but its effects may have " +
@@ -3375,9 +3376,9 @@ public class CatalogServiceCatalog extends Catalog {
}
}
Preconditions.checkState(versionToWaitFor >= 0);
- LOG.info("Operation using SYNC_DDL is waiting for catalog topic version: " +
- versionToWaitFor + ". Time to identify topic version (msec): " +
- (System.currentTimeMillis() - begin));
+ LOG.info("Operation using SYNC_DDL is waiting for catalog version {} " +
+ "to be sent. Time to identify topic update (msec): {}.",
+ versionToWaitFor, (System.currentTimeMillis() - begin));
return versionToWaitFor;
}
@@ -3421,7 +3422,7 @@ public class CatalogServiceCatalog extends Catalog {
// a) It corresponds to a new catalog object that hasn't been processed by a catalog
// update yet.
// b) It corresponds to a catalog object that hasn't been modified for at least
- // TOPIC_UPDATE_LOG_GC_FREQUENCY updates and hence its entry was garbage
+ // topicUpdateLogGcFrequency_ updates and hence its entry was garbage
// collected.
// In both cases, -1 is returned to indicate that we're waiting for the
// entry to show up in the topic update log.
diff --git a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
index 040e0cf2d..a20abd5ae 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TopicUpdateLog.java
@@ -26,6 +26,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import org.apache.impala.service.BackendConfig;
+
// A log of topic update information for each catalog object. An entry is added to
// the log when a catalog object is processed (added/removed/skipped) in a topic
// update and it is replaced every time the catalog object is processed in a
@@ -33,18 +35,19 @@ import com.google.common.base.Strings;
//
// To prevent the log from growing indefinitely, the oldest entries
// (in terms of last topic update that processed the associated catalog objects) are
-// garbage collected every TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates. That will cause
+// garbage collected every topicUpdateLogGcFrequency_ topic updates. That will cause
// entries of deleted catalog objects or entries of objects that haven't been processed
-// by the catalog for at least TOPIC_UPDATE_LOG_GC_FREQUENCY updates to be removed from
+// by the catalog for at least topicUpdateLogGcFrequency_ updates to be removed from
// the log.
public class TopicUpdateLog {
private static final Logger LOG = LoggerFactory.getLogger(TopicUpdateLog.class);
// Frequency at which the entries of the topic update log are garbage collected.
- // An entry may survive for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.
- private final static int TOPIC_UPDATE_LOG_GC_FREQUENCY = 1000;
+ // An entry may survive for (2 * topicUpdateLogGcFrequency_) - 1 topic updates.
+ private int topicUpdateLogGcFrequency_ = BackendConfig.INSTANCE
+ .getBackendCfg().topic_update_log_gc_frequency;
// Number of topic updates left to trigger a gc of topic update log entries.
- private int numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+ private int numTopicUpdatesToGc_ = topicUpdateLogGcFrequency_;
// In the next gc cycle of topic update log entries, all the entries that were last
// added to a topic update with version less than or equal to
@@ -109,7 +112,7 @@ public class TopicUpdateLog {
/**
* Garbage-collects topic update log entries. These are entries that haven't been
- * added to any of the last TOPIC_UPDATE_LOG_GC_FREQUENCY topic updates.
+ * added to any of the last topicUpdateLogGcFrequency_ topic updates.
*/
public void garbageCollectUpdateLogEntries(long lastTopicUpdateVersion) {
if (oldestTopicUpdateToGc_ == -1) {
@@ -130,7 +133,7 @@ public class TopicUpdateLog {
}
}
}
- numTopicUpdatesToGc_ = TOPIC_UPDATE_LOG_GC_FREQUENCY;
+ numTopicUpdatesToGc_ = topicUpdateLogGcFrequency_;
oldestTopicUpdateToGc_ = lastTopicUpdateVersion;
LOG.info("Topic update log GC finished. Removed {} entries.",
numEntriesRemoved);
@@ -159,5 +162,9 @@ public class TopicUpdateLog {
if (entry == null) entry = new Entry();
return entry;
}
+
+ public long getOldestTopicUpdateToGc() {
+ return oldestTopicUpdateToGc_;
+ }
}
diff --git a/tests/custom_cluster/test_refresh_invalid_partition.py b/tests/custom_cluster/test_refresh_invalid_partition.py
new file mode 100644
index 000000000..57b26fded
--- /dev/null
+++ b/tests/custom_cluster/test_refresh_invalid_partition.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+
+class TestRefreshInvalidPartition(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ catalogd_args="--topic_update_log_gc_frequency=10")
+ def test_refresh_invalid_partition_with_sync_ddl(self, vector, unique_database):
+ """
+ Regression test for IMPALA-12448. Avoid getting stuck when refreshing a
+ non-existent partition with sync_ddl.
+ """
+ table_name_1 = unique_database + '.' + "partition_test_table_1"
+ table_name_2 = unique_database + '.' + "partition_test_table_2"
+ self.client.execute(
+ 'create table %s (y int) partitioned by (x int)' % table_name_1)
+ self.client.execute(
+ 'create table %s (y int) partitioned by (x int)' % table_name_2)
+
+ self.client.execute('insert into table %s partition (x=1) values(1)'
+ % table_name_1)
+ self.client.execute('insert into table %s partition (x=1) values(1)'
+ % table_name_2)
+
+ assert '1\t1' == self.client.execute(
+ 'select * from %s' % table_name_1).get_data()
+ assert '1\t1' == self.client.execute(
+ 'select * from %s' % table_name_2).get_data()
+
+ """
+ Run it multiple times so that at least one topic update log GC is triggered.
+ """
+ i = 15
+ while i > 0:
+ self.execute_query('refresh %s' % table_name_1,
+ query_options={"SYNC_DDL": "true"})
+ i -= 1
+
+ """Refresh a non-existent partition with sync_ddl."""
+ self.execute_query_expect_success(self.client, 'refresh %s partition (x=999)'
+ % table_name_2, query_options={"SYNC_DDL": "true", "EXEC_TIME_LIMIT_S": "30"})
+
+ @CustomClusterTestSuite.with_args(
+ statestored_args="--statestore_update_frequency_ms=5000")
+ def test_refresh_missing_partition(self, unique_database):
+ client1 = self.cluster.impalads[1].service.create_beeswax_client()
+ client2 = self.cluster.impalads[2].service.create_beeswax_client()
+ self.client.execute('create table {}.tbl (i int) partitioned by (p int)'
+ .format(unique_database))
+ self.execute_query(
+ 'insert into {}.tbl partition(p) values (0,0), (1,1)'.format(unique_database),
+ query_options={"SYNC_DDL": "true"})
+ self.execute_query_expect_success(
+ self.client,
+ 'alter table {}.tbl drop partition(p=0)'.format(unique_database),
+ {"SYNC_DDL": "false"})
+ self.execute_query_expect_success(
+ client1,
+ 'refresh {}.tbl partition(p=0)'.format(unique_database),
+ {"SYNC_DDL": "true"})
+ show_parts_stmt = 'show partitions {}.tbl'.format(unique_database)
+ res = self.execute_query_expect_success(client2, show_parts_stmt)
+ # First line is the header. Only one partition should be shown so the
+ # result has two lines.
+ assert len(res.data) == 2
+ res = self.execute_query_expect_success(client1, show_parts_stmt)
+ assert len(res.data) == 2
+ res = self.execute_query_expect_success(self.client, show_parts_stmt)
+ assert len(res.data) == 2