You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2024/01/03 23:26:27 UTC

(impala) branch master updated: IMPALA-12666: Fix incorrect statestore metrics when impalad is down

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new fcda98ad9 IMPALA-12666: Fix incorrect statestore metrics when impalad is down
fcda98ad9 is described below

commit fcda98ad99c13324e3ab09f2e92d331d0304bb8e
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Tue Jan 2 09:15:01 2024 +0800

    IMPALA-12666: Fix incorrect statestore metrics when impalad is down
    
    Statestore::Topic::DeleteIfVersionsMatch() is used when impalad is down.
    It incorrectly bumps the metrics of total value size and total topic
    size. This patch removes the codes.
    
    In order to verify the metrics using the /topics page, this patch
    changes the /topics URL to also return numeric values which can be used
    in the e2e test to verify the /metrics page.
    
    Tests
     - Add e2e test
    
    Change-Id: I3ffcfb45b7cde0b40a87c9ca410ec634cb31cefb
    Reviewed-on: http://gerrit.cloudera.org:8080/20841
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/statestore/statestore.cc        |  7 ++++--
 tests/custom_cluster/test_web_pages.py | 42 +++++++++++++++++++++++++++++++++-
 2 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 0054da633..c171f6ca8 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -446,8 +446,6 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
     // entry
     topic_update_log_.erase(version);
     topic_update_log_.emplace(++last_version_, key);
-    value_size_metric_->Increment(entry_it->second.value().size());
-    topic_size_metric_->Increment(entry_it->second.value().size());
     entry_it->second.SetDeleted(true);
     entry_it->second.SetVersion(last_version_);
   }
@@ -543,6 +541,11 @@ void Statestore::Topic::ToJson(Document* document, Value* topic_json) {
       PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
       document->GetAllocator());
   topic_json->AddMember("total_size", total_size_json, document->GetAllocator());
+
+  topic_json->AddMember("key_size_bytes", key_size, document->GetAllocator());
+  topic_json->AddMember("value_size_bytes", value_size, document->GetAllocator());
+  topic_json->AddMember("total_size_bytes", key_size + value_size,
+      document->GetAllocator());
   topic_json->AddMember("prioritized", IsPrioritizedTopic(topic_id_),
       document->GetAllocator());
 }
diff --git a/tests/custom_cluster/test_web_pages.py b/tests/custom_cluster/test_web_pages.py
index d491452c4..2644f70a2 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -23,7 +23,9 @@ import requests
 import psutil
 import pytest
 
-from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.custom_cluster_test_suite import (
+  DEFAULT_CLUSTER_SIZE,
+  CustomClusterTestSuite)
 from tests.shell.util import run_impala_shell_cmd
 
 
@@ -316,3 +318,41 @@ class TestWebPage(CustomClusterTestSuite):
     assert op["status"] == "FINISHED"
     assert op["catalog_op_name"] == "DROP_DATABASE"
     assert op["target_name"] == unique_database
+
+  def _verify_topic_size_metrics(self):
+    # Calculate the total topic metrics from the /topics page
+    response = requests.get("http://localhost:25010/topics?json")
+    assert response.status_code == requests.codes.ok
+    topics_json = json.loads(response.text)
+    total_key_size = 0
+    total_value_size = 0
+    total_topic_size = 0
+    for topic in topics_json["topics"]:
+      total_key_size += topic["key_size_bytes"]
+      total_value_size += topic["value_size_bytes"]
+      total_topic_size += topic["total_size_bytes"]
+
+    # Retrieve and verify the total topic metrics from the /metrics page
+    response = requests.get("http://localhost:25010/metrics?json")
+    assert response.status_code == requests.codes.ok
+    metrics_json = json.loads(response.text)["metric_group"]["metrics"]
+    for metric in metrics_json:
+      if metric["name"] == "statestore.total-key-size-bytes":
+        assert metric["value"] == total_key_size
+      elif metric["name"] == "statestore.total-value-size-bytes":
+        assert metric["value"] == total_value_size
+      elif metric["name"] == "statestore.total-topic-size-bytes":
+        assert metric["value"] == total_topic_size
+
+  @CustomClusterTestSuite.with_args()
+  def test_transient_topic_size(self):
+    self._verify_topic_size_metrics()
+    # Kill an impalad and wait until it's removed
+    killed_impalad = self.cluster.impalads[2]
+    killed_impalad.kill()
+    # Before we kill an impalad, there are DEFAULT_CLUSTER_SIZE + 1 subscribers
+    # (DEFAULT_CLUSTER_SIZE impalads and 1 for catalogd). After we kill one impalad,
+    # there should be DEFAULT_CLUSTER_SIZE subscribers.
+    self.cluster.statestored.service.wait_for_live_subscribers(DEFAULT_CLUSTER_SIZE)
+    # Verify the topic metrics again
+    self._verify_topic_size_metrics()