You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2024/01/11 18:08:00 UTC
(impala) 04/05: IMPALA-12687: Fix key conflicts in tracking in-flight catalog operations
This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 2d5307418b57efd16196cf0538095339d3b8d96d
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Jan 10 13:58:38 2024 +0800
IMPALA-12687: Fix key conflicts in tracking in-flight catalog operations
In-flight catalog operations are tracked in a map using query id as the
key. It's ok since catalog clients use 0 as the timeout by default (see
--catalog_client_rpc_timeout_ms), i.e. catalog RPCs never timeout, which
means each query will have at most one in-flight catalog RPC at a time.
However, in case catalog_client_rpc_timeout_ms is set to non-zero,
impalad could retry the catalog RPC when it's considered timed out. That
causes several in-flight catalog operations coming from the same query
(so using the same query-id as the map key).
To fix the key conflicts, this patch use the pair of (queryId, threadId)
as the key of the in-flight operations map. 'threadId' comes from the
thrift thread that handles the RPC so it's unique across different
retries.
Tests:
- Add custom-cluster test to verify all retries are shown in the
/operations page.
Change-Id: Icd94ac7532fe7f3d68028c2da82298037be706c4
Reviewed-on: http://gerrit.cloudera.org:8080/20877
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../catalog/monitor/CatalogOperationTracker.java | 115 ++++++++++++++-------
tests/custom_cluster/test_web_pages.py | 70 +++++++++++--
2 files changed, 136 insertions(+), 49 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
index 47dc8d18f..f66f0cfd7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
+++ b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
@@ -29,6 +29,9 @@ import org.apache.impala.thrift.TResetMetadataRequest;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateCatalogRequest;
+import org.apache.impala.util.TUniqueIdUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
@@ -48,29 +51,59 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* are also kept in memory and the size is controlled by 'catalog_operation_log_size'.
*/
public final class CatalogOperationTracker {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CatalogOperationTracker.class);
public final static CatalogOperationTracker INSTANCE = new CatalogOperationTracker();
// Keeps track of the on-going DDL operations
- CatalogDdlCounter catalogDdlCounter;
+ CatalogDdlCounter catalogDdlCounter_;
// Keeps track of the on-going reset metadata requests (refresh/invalidate)
- CatalogResetMetadataCounter catalogResetMetadataCounter;
+ CatalogResetMetadataCounter catalogResetMetadataCounter_;
// Keeps track of the on-going finalize DML requests (insert/CTAS/upgrade)
- CatalogFinalizeDmlCounter catalogFinalizeDmlCounter;
+ CatalogFinalizeDmlCounter catalogFinalizeDmlCounter_;
- private final Map<TUniqueId, TCatalogOpRecord> inFlightOperations =
+ /**
+ * Key to track in-flight catalog operations. Each operation is triggered by an RPC.
+ * Each RPC is identified by the query id and the thrift thread id that handles it.
+ * Note that the thread id is important to identify different RPC retries.
+ */
+ private static class RpcKey {
+ private final TUniqueId queryId_;
+ private final long threadId_;
+
+ public RpcKey(TUniqueId queryId) {
+ queryId_ = queryId;
+ threadId_ = Thread.currentThread().getId();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof RpcKey)) return false;
+ RpcKey key = (RpcKey) o;
+ return queryId_.equals(key.queryId_) && threadId_ == key.threadId_;
+ }
+
+ @Override
+ public int hashCode() {
+ return queryId_.hashCode() * 31 + Long.hashCode(threadId_);
+ }
+ }
+
+ private final Map<RpcKey, TCatalogOpRecord> inFlightOperations_ =
new ConcurrentHashMap<>();
- private final Queue<TCatalogOpRecord> finishedOperations =
+ private final Queue<TCatalogOpRecord> finishedOperations_ =
new ConcurrentLinkedQueue<>();
- private final int catalogOperationLogSize;
+ private final int catalogOperationLogSize_;
private CatalogOperationTracker() {
- catalogDdlCounter = new CatalogDdlCounter();
- catalogResetMetadataCounter = new CatalogResetMetadataCounter();
- catalogFinalizeDmlCounter = new CatalogFinalizeDmlCounter();
- catalogOperationLogSize = BackendConfig.INSTANCE.catalogOperationLogSize();
- Preconditions.checkState(catalogOperationLogSize >= 0);
+ catalogDdlCounter_ = new CatalogDdlCounter();
+ catalogResetMetadataCounter_ = new CatalogResetMetadataCounter();
+ catalogFinalizeDmlCounter_ = new CatalogFinalizeDmlCounter();
+ catalogOperationLogSize_ = BackendConfig.INSTANCE.catalogOperationLogSize();
+ Preconditions.checkState(catalogOperationLogSize_ >= 0);
}
private void addRecord(TCatalogServiceRequestHeader header,
@@ -91,29 +124,33 @@ public final class CatalogOperationTracker {
if (queryId != null) {
TCatalogOpRecord record = new TCatalogOpRecord(Thread.currentThread().getId(),
queryId, clientIp, coordinator, catalogOpName,
- catalogDdlCounter.getTableName(tTableName), user,
+ catalogDdlCounter_.getTableName(tTableName), user,
System.currentTimeMillis(), -1, "STARTED", details);
- inFlightOperations.put(queryId, record);
+ inFlightOperations_.put(new RpcKey(queryId), record);
}
}
private void archiveRecord(TUniqueId queryId, String errorMsg) {
- if (queryId != null && inFlightOperations.containsKey(queryId)) {
- TCatalogOpRecord record = inFlightOperations.remove(queryId);
- if (catalogOperationLogSize == 0) return;
- record.setFinish_time_ms(System.currentTimeMillis());
- if (errorMsg != null) {
- record.setStatus("FAILED");
- record.setDetails(record.getDetails() + ", error=" + errorMsg);
- } else {
- record.setStatus("FINISHED");
- }
- synchronized (finishedOperations) {
- if (finishedOperations.size() >= catalogOperationLogSize) {
- finishedOperations.poll();
- }
- finishedOperations.add(record);
+ if (queryId == null) return;
+ RpcKey key = new RpcKey(queryId);
+ TCatalogOpRecord record = inFlightOperations_.remove(key);
+ if (record == null) {
+ LOG.error("Null record for query {}", TUniqueIdUtil.PrintId(queryId));
+ return;
+ }
+ if (catalogOperationLogSize_ == 0) return;
+ record.setFinish_time_ms(System.currentTimeMillis());
+ if (errorMsg != null) {
+ record.setStatus("FAILED");
+ record.setDetails(record.getDetails() + ", error=" + errorMsg);
+ } else {
+ record.setStatus("FINISHED");
+ }
+ synchronized (finishedOperations_) {
+ if (finishedOperations_.size() >= catalogOperationLogSize_) {
+ finishedOperations_.poll();
}
+ finishedOperations_.add(record);
}
}
@@ -129,13 +166,13 @@ public final class CatalogOperationTracker {
String details = "query_options=" + ddlRequest.query_options.toString();
addRecord(ddlRequest.getHeader(), getDdlType(ddlRequest), tTableName, details);
}
- catalogDdlCounter.incrementOperation(ddlRequest.ddl_type, tTableName);
+ catalogDdlCounter_.incrementOperation(ddlRequest.ddl_type, tTableName);
}
public void decrement(TDdlType tDdlType, TUniqueId queryId,
Optional<TTableName> tTableName, String errorMsg) {
archiveRecord(queryId, errorMsg);
- catalogDdlCounter.decrementOperation(tDdlType, tTableName);
+ catalogDdlCounter_.decrementOperation(tDdlType, tTableName);
}
public void increment(TResetMetadataRequest req) {
@@ -152,14 +189,14 @@ public final class CatalogOperationTracker {
CatalogResetMetadataCounter.getResetMetadataType(req, tTableName).name(),
tTableName, details);
}
- catalogResetMetadataCounter.incrementOperation(req);
+ catalogResetMetadataCounter_.incrementOperation(req);
}
public void decrement(TResetMetadataRequest req, String errorMsg) {
if (req.isSetHeader()) {
archiveRecord(req.getHeader().getQuery_id(), errorMsg);
}
- catalogResetMetadataCounter.decrementOperation(req);
+ catalogResetMetadataCounter_.decrementOperation(req);
}
public void increment(TUpdateCatalogRequest req) {
@@ -181,14 +218,14 @@ public final class CatalogOperationTracker {
CatalogFinalizeDmlCounter.getDmlType(req.getHeader().redacted_sql_stmt).name(),
tTableName, details);
}
- catalogFinalizeDmlCounter.incrementOperation(req);
+ catalogFinalizeDmlCounter_.incrementOperation(req);
}
public void decrement(TUpdateCatalogRequest req, String errorMsg) {
if (req.isSetHeader()) {
archiveRecord(req.getHeader().getQuery_id(), errorMsg);
}
- catalogFinalizeDmlCounter.decrementOperation(req);
+ catalogFinalizeDmlCounter_.decrementOperation(req);
}
/**
@@ -197,14 +234,14 @@ public final class CatalogOperationTracker {
*/
public TGetOperationUsageResponse getOperationMetrics() {
List<TOperationUsageCounter> merged = new ArrayList<>();
- merged.addAll(catalogDdlCounter.getOperationUsage());
- merged.addAll(catalogResetMetadataCounter.getOperationUsage());
- merged.addAll(catalogFinalizeDmlCounter.getOperationUsage());
+ merged.addAll(catalogDdlCounter_.getOperationUsage());
+ merged.addAll(catalogResetMetadataCounter_.getOperationUsage());
+ merged.addAll(catalogFinalizeDmlCounter_.getOperationUsage());
TGetOperationUsageResponse res = new TGetOperationUsageResponse(merged);
- for (TCatalogOpRecord record : inFlightOperations.values()) {
+ for (TCatalogOpRecord record : inFlightOperations_.values()) {
res.addToIn_flight_catalog_operations(record);
}
- List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations);
+ List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations_);
// Reverse the list to show recent operations first.
Collections.reverse(records);
res.setFinished_catalog_operations(records);
diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py
index 2644f70a2..2c5b87015 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -22,7 +22,9 @@ import re
import requests
import psutil
import pytest
+import time
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.custom_cluster_test_suite import (
DEFAULT_CLUSTER_SIZE,
CustomClusterTestSuite)
@@ -260,6 +262,22 @@ class TestWebPage(CustomClusterTestSuite):
assert 'Content-Security-Policy' not in response.headers, \
"CSP header present despite being disabled (port %s)" % port
+ @staticmethod
+ def _get_inflight_catalog_operations():
+ response = requests.get("http://localhost:25020/operations?json")
+ assert response.status_code == requests.codes.ok
+ operations = json.loads(response.text)
+ assert "inflight_catalog_operations" in operations
+ return operations["inflight_catalog_operations"]
+
+ @staticmethod
+ def _get_finished_catalog_operations():
+ response = requests.get("http://localhost:25020/operations?json")
+ assert response.status_code == requests.codes.ok
+ operations = json.loads(response.text)
+ assert "finished_catalog_operations" in operations
+ return operations["finished_catalog_operations"]
+
@CustomClusterTestSuite.with_args(catalogd_args="--catalog_operation_log_size=2")
def test_catalog_operations_limit(self, unique_database):
tbl = unique_database + ".tbl"
@@ -267,11 +285,7 @@ class TestWebPage(CustomClusterTestSuite):
self.execute_query("create table {0}_2 (id int)".format(tbl))
self.execute_query("create table {0}_3 (id int)".format(tbl))
self.execute_query("drop table {0}_1".format(tbl))
- response = requests.get("http://localhost:25020/operations?json")
- assert response.status_code == requests.codes.ok
- operations = json.loads(response.text)
- assert "finished_catalog_operations" in operations
- finished_operations = operations["finished_catalog_operations"]
+ finished_operations = self._get_finished_catalog_operations()
# Verify only 2 operations are shown
assert len(finished_operations) == 2
op = finished_operations[0]
@@ -293,11 +307,7 @@ class TestWebPage(CustomClusterTestSuite):
num = 500
for i in range(num):
self.execute_query("invalidate metadata " + tbl)
- response = requests.get("http://localhost:25020/operations?json")
- assert response.status_code == requests.codes.ok
- operations = json.loads(response.text)
- assert "finished_catalog_operations" in operations
- finished_operations = operations["finished_catalog_operations"]
+ finished_operations = self._get_finished_catalog_operations()
# Verify all operations are in the history. There are one DROP_DATABASE, one
# CREATE_DATABASE, one CREATE_TABLE and 'num' INVALIDATEs in the list.
assert len(finished_operations) == 3 + num
@@ -319,6 +329,46 @@ class TestWebPage(CustomClusterTestSuite):
assert op["catalog_op_name"] == "DROP_DATABASE"
assert op["target_name"] == unique_database
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--catalog_client_rpc_timeout_ms=10 "
+ "--catalog_client_rpc_retry_interval_ms=10 "
+ "--catalog_client_connection_num_retries=2")
+ def test_catalog_operations_with_rpc_retry(self):
+ """Test that catalog RPC retries are all shown in the /operations page"""
+ # Run a DESCRIBE to ensure the table is loaded. So the first RPC attempt will
+ # time out in its real work.
+ self.execute_query("describe functional.alltypes")
+ try:
+ self.execute_query("refresh functional.alltypes", {
+ "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@30"
+ })
+ except ImpalaBeeswaxException as e:
+ assert "RPC recv timed out" in str(e)
+ # In impalad side, the query fails by the above error. However, in catalogd side,
+ # the RPCs are still running. Check the in-flight operations.
+ inflight_operations = self._get_inflight_catalog_operations()
+ assert len(inflight_operations) == 2
+ for op in inflight_operations:
+ assert op["status"] == "STARTED"
+ assert op["catalog_op_name"] == "REFRESH"
+ assert op["target_name"] == "functional.alltypes"
+ assert inflight_operations[0]["query_id"] == inflight_operations[1]["query_id"]
+ assert inflight_operations[0]["thread_id"] != inflight_operations[1]["thread_id"]
+
+ # Wait until the catalog operations finish
+ while len(self._get_inflight_catalog_operations()) != 0:
+ time.sleep(1)
+
+ # Verify both RPC attempts are shown as finished operations.
+ finished_operations = self._get_finished_catalog_operations()
+ assert len(finished_operations) == 2
+ for op in finished_operations:
+ assert op["status"] == "FINISHED"
+ assert op["catalog_op_name"] == "REFRESH"
+ assert op["target_name"] == "functional.alltypes"
+ assert finished_operations[0]["query_id"] == finished_operations[1]["query_id"]
+ assert finished_operations[0]["thread_id"] != finished_operations[1]["thread_id"]
+
def _verify_topic_size_metrics(self):
# Calculate the total topic metrics from the /topics page
response = requests.get("http://localhost:25010/topics?json")