You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2024/01/11 18:08:00 UTC

(impala) 04/05: IMPALA-12687: Fix key conflicts in tracking in-flight catalog operations

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2d5307418b57efd16196cf0538095339d3b8d96d
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Jan 10 13:58:38 2024 +0800

    IMPALA-12687: Fix key conflicts in tracking in-flight catalog operations
    
    In-flight catalog operations are tracked in a map using query id as the
    key. It's ok since catalog clients use 0 as the timeout by default (see
    --catalog_client_rpc_timeout_ms), i.e. catalog RPCs never timeout, which
    means each query will have at most one in-flight catalog RPC at a time.
    
    However, in case catalog_client_rpc_timeout_ms is set to non-zero,
    impalad could retry the catalog RPC when it's considered timed out. That
    causes several in-flight catalog operations coming from the same query
    (so using the same query-id as the map key).
    
    To fix the key conflicts, this patch use the pair of (queryId, threadId)
    as the key of the in-flight operations map. 'threadId' comes from the
    thrift thread that handles the RPC so it's unique across different
    retries.
    
    Tests:
     - Add custom-cluster test to verify all retries are shown in the
       /operations page.
    
    Change-Id: Icd94ac7532fe7f3d68028c2da82298037be706c4
    Reviewed-on: http://gerrit.cloudera.org:8080/20877
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../catalog/monitor/CatalogOperationTracker.java   | 115 ++++++++++++++-------
 tests/custom_cluster/test_web_pages.py             |  70 +++++++++++--
 2 files changed, 136 insertions(+), 49 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
index 47dc8d18f..f66f0cfd7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
+++ b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogOperationTracker.java
@@ -29,6 +29,9 @@ import org.apache.impala.thrift.TResetMetadataRequest;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
+import org.apache.impala.util.TUniqueIdUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -48,29 +51,59 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  *  are also kept in memory and the size is controlled by 'catalog_operation_log_size'.
  */
 public final class CatalogOperationTracker {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CatalogOperationTracker.class);
   public final static CatalogOperationTracker INSTANCE = new CatalogOperationTracker();
 
   // Keeps track of the on-going DDL operations
-  CatalogDdlCounter catalogDdlCounter;
+  CatalogDdlCounter catalogDdlCounter_;
 
   // Keeps track of the on-going reset metadata requests (refresh/invalidate)
-  CatalogResetMetadataCounter catalogResetMetadataCounter;
+  CatalogResetMetadataCounter catalogResetMetadataCounter_;
 
   // Keeps track of the on-going finalize DML requests (insert/CTAS/upgrade)
-  CatalogFinalizeDmlCounter catalogFinalizeDmlCounter;
+  CatalogFinalizeDmlCounter catalogFinalizeDmlCounter_;
 
-  private final Map<TUniqueId, TCatalogOpRecord> inFlightOperations =
+  /**
+   * Key to track in-flight catalog operations. Each operation is triggered by an RPC.
+   * Each RPC is identified by the query id and the thrift thread id that handles it.
+   * Note that the thread id is important to identify different RPC retries.
+   */
+  private static class RpcKey {
+    private final TUniqueId queryId_;
+    private final long threadId_;
+
+    public RpcKey(TUniqueId queryId) {
+      queryId_ = queryId;
+      threadId_ = Thread.currentThread().getId();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof RpcKey)) return false;
+      RpcKey key = (RpcKey) o;
+      return queryId_.equals(key.queryId_) && threadId_ == key.threadId_;
+    }
+
+    @Override
+    public int hashCode() {
+      return queryId_.hashCode() * 31 + Long.hashCode(threadId_);
+    }
+  }
+
+  private final Map<RpcKey, TCatalogOpRecord> inFlightOperations_ =
       new ConcurrentHashMap<>();
-  private final Queue<TCatalogOpRecord> finishedOperations =
+  private final Queue<TCatalogOpRecord> finishedOperations_ =
       new ConcurrentLinkedQueue<>();
-  private final int catalogOperationLogSize;
+  private final int catalogOperationLogSize_;
 
   private CatalogOperationTracker() {
-    catalogDdlCounter = new CatalogDdlCounter();
-    catalogResetMetadataCounter = new CatalogResetMetadataCounter();
-    catalogFinalizeDmlCounter = new CatalogFinalizeDmlCounter();
-    catalogOperationLogSize = BackendConfig.INSTANCE.catalogOperationLogSize();
-    Preconditions.checkState(catalogOperationLogSize >= 0);
+    catalogDdlCounter_ = new CatalogDdlCounter();
+    catalogResetMetadataCounter_ = new CatalogResetMetadataCounter();
+    catalogFinalizeDmlCounter_ = new CatalogFinalizeDmlCounter();
+    catalogOperationLogSize_ = BackendConfig.INSTANCE.catalogOperationLogSize();
+    Preconditions.checkState(catalogOperationLogSize_ >= 0);
   }
 
   private void addRecord(TCatalogServiceRequestHeader header,
@@ -91,29 +124,33 @@ public final class CatalogOperationTracker {
     if (queryId != null) {
       TCatalogOpRecord record = new TCatalogOpRecord(Thread.currentThread().getId(),
           queryId, clientIp, coordinator, catalogOpName,
-          catalogDdlCounter.getTableName(tTableName), user,
+          catalogDdlCounter_.getTableName(tTableName), user,
           System.currentTimeMillis(), -1, "STARTED", details);
-      inFlightOperations.put(queryId, record);
+      inFlightOperations_.put(new RpcKey(queryId), record);
     }
   }
 
   private void archiveRecord(TUniqueId queryId, String errorMsg) {
-    if (queryId != null && inFlightOperations.containsKey(queryId)) {
-      TCatalogOpRecord record = inFlightOperations.remove(queryId);
-      if (catalogOperationLogSize == 0) return;
-      record.setFinish_time_ms(System.currentTimeMillis());
-      if (errorMsg != null) {
-        record.setStatus("FAILED");
-        record.setDetails(record.getDetails() + ", error=" + errorMsg);
-      } else {
-        record.setStatus("FINISHED");
-      }
-      synchronized (finishedOperations) {
-        if (finishedOperations.size() >= catalogOperationLogSize) {
-          finishedOperations.poll();
-        }
-        finishedOperations.add(record);
+    if (queryId == null) return;
+    RpcKey key = new RpcKey(queryId);
+    TCatalogOpRecord record = inFlightOperations_.remove(key);
+    if (record == null) {
+      LOG.error("Null record for query {}", TUniqueIdUtil.PrintId(queryId));
+      return;
+    }
+    if (catalogOperationLogSize_ == 0) return;
+    record.setFinish_time_ms(System.currentTimeMillis());
+    if (errorMsg != null) {
+      record.setStatus("FAILED");
+      record.setDetails(record.getDetails() + ", error=" + errorMsg);
+    } else {
+      record.setStatus("FINISHED");
+    }
+    synchronized (finishedOperations_) {
+      if (finishedOperations_.size() >= catalogOperationLogSize_) {
+        finishedOperations_.poll();
       }
+      finishedOperations_.add(record);
     }
   }
 
@@ -129,13 +166,13 @@ public final class CatalogOperationTracker {
       String details = "query_options=" + ddlRequest.query_options.toString();
       addRecord(ddlRequest.getHeader(), getDdlType(ddlRequest), tTableName, details);
     }
-    catalogDdlCounter.incrementOperation(ddlRequest.ddl_type, tTableName);
+    catalogDdlCounter_.incrementOperation(ddlRequest.ddl_type, tTableName);
   }
 
   public void decrement(TDdlType tDdlType, TUniqueId queryId,
       Optional<TTableName> tTableName, String errorMsg) {
     archiveRecord(queryId, errorMsg);
-    catalogDdlCounter.decrementOperation(tDdlType, tTableName);
+    catalogDdlCounter_.decrementOperation(tDdlType, tTableName);
   }
 
   public void increment(TResetMetadataRequest req) {
@@ -152,14 +189,14 @@ public final class CatalogOperationTracker {
           CatalogResetMetadataCounter.getResetMetadataType(req, tTableName).name(),
           tTableName, details);
     }
-    catalogResetMetadataCounter.incrementOperation(req);
+    catalogResetMetadataCounter_.incrementOperation(req);
   }
 
   public void decrement(TResetMetadataRequest req, String errorMsg) {
     if (req.isSetHeader()) {
       archiveRecord(req.getHeader().getQuery_id(), errorMsg);
     }
-    catalogResetMetadataCounter.decrementOperation(req);
+    catalogResetMetadataCounter_.decrementOperation(req);
   }
 
   public void increment(TUpdateCatalogRequest req) {
@@ -181,14 +218,14 @@ public final class CatalogOperationTracker {
           CatalogFinalizeDmlCounter.getDmlType(req.getHeader().redacted_sql_stmt).name(),
           tTableName, details);
     }
-    catalogFinalizeDmlCounter.incrementOperation(req);
+    catalogFinalizeDmlCounter_.incrementOperation(req);
   }
 
   public void decrement(TUpdateCatalogRequest req, String errorMsg) {
     if (req.isSetHeader()) {
       archiveRecord(req.getHeader().getQuery_id(), errorMsg);
     }
-    catalogFinalizeDmlCounter.decrementOperation(req);
+    catalogFinalizeDmlCounter_.decrementOperation(req);
   }
 
   /**
@@ -197,14 +234,14 @@ public final class CatalogOperationTracker {
    */
   public TGetOperationUsageResponse getOperationMetrics() {
     List<TOperationUsageCounter> merged = new ArrayList<>();
-    merged.addAll(catalogDdlCounter.getOperationUsage());
-    merged.addAll(catalogResetMetadataCounter.getOperationUsage());
-    merged.addAll(catalogFinalizeDmlCounter.getOperationUsage());
+    merged.addAll(catalogDdlCounter_.getOperationUsage());
+    merged.addAll(catalogResetMetadataCounter_.getOperationUsage());
+    merged.addAll(catalogFinalizeDmlCounter_.getOperationUsage());
     TGetOperationUsageResponse res = new TGetOperationUsageResponse(merged);
-    for (TCatalogOpRecord record : inFlightOperations.values()) {
+    for (TCatalogOpRecord record : inFlightOperations_.values()) {
       res.addToIn_flight_catalog_operations(record);
     }
-    List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations);
+    List<TCatalogOpRecord> records = new ArrayList<>(finishedOperations_);
     // Reverse the list to show recent operations first.
     Collections.reverse(records);
     res.setFinished_catalog_operations(records);
diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py
index 2644f70a2..2c5b87015 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -22,7 +22,9 @@ import re
 import requests
 import psutil
 import pytest
+import time
 
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.custom_cluster_test_suite import (
   DEFAULT_CLUSTER_SIZE,
   CustomClusterTestSuite)
@@ -260,6 +262,22 @@ class TestWebPage(CustomClusterTestSuite):
       assert 'Content-Security-Policy' not in response.headers, \
         "CSP header present despite being disabled (port %s)" % port
 
+  @staticmethod
+  def _get_inflight_catalog_operations():
+    response = requests.get("http://localhost:25020/operations?json")
+    assert response.status_code == requests.codes.ok
+    operations = json.loads(response.text)
+    assert "inflight_catalog_operations" in operations
+    return operations["inflight_catalog_operations"]
+
+  @staticmethod
+  def _get_finished_catalog_operations():
+    response = requests.get("http://localhost:25020/operations?json")
+    assert response.status_code == requests.codes.ok
+    operations = json.loads(response.text)
+    assert "finished_catalog_operations" in operations
+    return operations["finished_catalog_operations"]
+
   @CustomClusterTestSuite.with_args(catalogd_args="--catalog_operation_log_size=2")
   def test_catalog_operations_limit(self, unique_database):
     tbl = unique_database + ".tbl"
@@ -267,11 +285,7 @@ class TestWebPage(CustomClusterTestSuite):
     self.execute_query("create table {0}_2 (id int)".format(tbl))
     self.execute_query("create table {0}_3 (id int)".format(tbl))
     self.execute_query("drop table {0}_1".format(tbl))
-    response = requests.get("http://localhost:25020/operations?json")
-    assert response.status_code == requests.codes.ok
-    operations = json.loads(response.text)
-    assert "finished_catalog_operations" in operations
-    finished_operations = operations["finished_catalog_operations"]
+    finished_operations = self._get_finished_catalog_operations()
     # Verify only 2 operations are shown
     assert len(finished_operations) == 2
     op = finished_operations[0]
@@ -293,11 +307,7 @@ class TestWebPage(CustomClusterTestSuite):
     num = 500
     for i in range(num):
       self.execute_query("invalidate metadata " + tbl)
-    response = requests.get("http://localhost:25020/operations?json")
-    assert response.status_code == requests.codes.ok
-    operations = json.loads(response.text)
-    assert "finished_catalog_operations" in operations
-    finished_operations = operations["finished_catalog_operations"]
+    finished_operations = self._get_finished_catalog_operations()
     # Verify all operations are in the history. There are one DROP_DATABASE, one
     # CREATE_DATABASE, one CREATE_TABLE and 'num' INVALIDATEs in the list.
     assert len(finished_operations) == 3 + num
@@ -319,6 +329,46 @@ class TestWebPage(CustomClusterTestSuite):
     assert op["catalog_op_name"] == "DROP_DATABASE"
     assert op["target_name"] == unique_database
 
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--catalog_client_rpc_timeout_ms=10 "
+                 "--catalog_client_rpc_retry_interval_ms=10 "
+                 "--catalog_client_connection_num_retries=2")
+  def test_catalog_operations_with_rpc_retry(self):
+    """Test that catalog RPC retries are all shown in the /operations page"""
+    # Run a DESCRIBE to ensure the table is loaded. So the first RPC attempt will
+    # time out in its real work.
+    self.execute_query("describe functional.alltypes")
+    try:
+      self.execute_query("refresh functional.alltypes", {
+        "debug_action": "catalogd_refresh_hdfs_listing_delay:SLEEP@30"
+      })
+    except ImpalaBeeswaxException as e:
+      assert "RPC recv timed out" in str(e)
+    # In impalad side, the query fails by the above error. However, in catalogd side,
+    # the RPCs are still running. Check the in-flight operations.
+    inflight_operations = self._get_inflight_catalog_operations()
+    assert len(inflight_operations) == 2
+    for op in inflight_operations:
+      assert op["status"] == "STARTED"
+      assert op["catalog_op_name"] == "REFRESH"
+      assert op["target_name"] == "functional.alltypes"
+    assert inflight_operations[0]["query_id"] == inflight_operations[1]["query_id"]
+    assert inflight_operations[0]["thread_id"] != inflight_operations[1]["thread_id"]
+
+    # Wait until the catalog operations finish
+    while len(self._get_inflight_catalog_operations()) != 0:
+      time.sleep(1)
+
+    # Verify both RPC attempts are shown as finished operations.
+    finished_operations = self._get_finished_catalog_operations()
+    assert len(finished_operations) == 2
+    for op in finished_operations:
+      assert op["status"] == "FINISHED"
+      assert op["catalog_op_name"] == "REFRESH"
+      assert op["target_name"] == "functional.alltypes"
+    assert finished_operations[0]["query_id"] == finished_operations[1]["query_id"]
+    assert finished_operations[0]["thread_id"] != finished_operations[1]["thread_id"]
+
   def _verify_topic_size_metrics(self):
     # Calculate the total topic metrics from the /topics page
     response = requests.get("http://localhost:25010/topics?json")