You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/03/17 05:40:27 UTC

[impala] branch master updated: IMPALA-10983: Wait more in wait_for_event_processing if there is progress

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new a6333aed6 IMPALA-10983: Wait more in wait_for_event_processing if there is progress
a6333aed6 is described below

commit a6333aed6b0fe2cf355aeaa1952735b9208b2f43
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Mon Mar 13 15:30:50 2023 +0100

    IMPALA-10983: Wait more in wait_for_event_processing if there is progress
    
    There are some flaky tests where wait_for_event_processing timeouts,
    e.g. TestEventProcessing.test_insert_events. My theory is that this
    is caused by parallel tests with DDL/DML statements that can also
    fire HMS events that have to be processed by catalogd.
    
    The change bumps (default: 10 sec->100 sec) the timeout in case
    there is some progress in event processing. If still the same
    event is processed then the old timeout is used.
    
    An alternative approach would be to mark the related test as serial,
    but I would prefer to avoid this as it would make test jobs slower.
    
    The event processor status is also checked to timeout earlier if
    the event processor is without hope of recovery.
    
    Change-Id: I676854f7df9aea5fa10fb6ecf6381195bc8fa4b8
    Reviewed-on: http://gerrit.cloudera.org:8080/19614
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/util/event_processor_utils.py | 60 ++++++++++++++++++++++++++-----------
 1 file changed, 42 insertions(+), 18 deletions(-)

diff --git a/tests/util/event_processor_utils.py b/tests/util/event_processor_utils.py
index 4d2703cb1..08223f9e8 100644
--- a/tests/util/event_processor_utils.py
+++ b/tests/util/event_processor_utils.py
@@ -37,33 +37,58 @@ class EventProcessorUtils(object):
 
   DEFAULT_CATALOG_URL = "http://localhost:25020"
 
+  @staticmethod
+  def wait_for_synced_event_id(timeout, target_event_id):
+    LOG.info("Waiting until events processor syncs to event id:" + str(
+        target_event_id))
+    # Wait more than timeout in case there is some progress in synced events.
+    # The goal is to decrease the effect of parallel DML/DDL tests that can
+    # also fire events that have to be processed and delay syncing up to the
+    # last event.
+    TIMEOUT_MULTIPLIER_IF_THERE_IS_PROGRESS = 10
+    total_timeot = timeout * TIMEOUT_MULTIPLIER_IF_THERE_IS_PROGRESS
+    end_time = time.time() + total_timeot
+    last_synced_id = EventProcessorUtils.get_last_synced_event_id()
+    last_synced_time = time.time()
+    while True:
+      t = time.time()
+      current_synced_id = EventProcessorUtils.get_last_synced_event_id()
+      if current_synced_id >= target_event_id:
+        LOG.debug(
+            "Metric last-synced-event-id has reached the desired value: %d",
+            target_event_id)
+        break
+      status = EventProcessorUtils.get_event_processor_status()
+      if status not in ["ACTIVE", "PAUSED"]:
+        raise Exception("Event processor is not working. Status: {0}".format(status))
+      made_progress = current_synced_id > last_synced_id
+      if t >= end_time:
+        raise Exception(
+            "Event processor did not sync till last known event id {0} \
+            within {1} seconds".format(target_event_id, total_timeot))
+      elif not made_progress and t >= last_synced_time + timeout:
+        raise Exception(
+            "Event processor did not make progress since event id {0} \
+            within {1} seconds".format(last_synced_id, timeout))
+      if made_progress:
+        LOG.debug(
+            "Metric last-synced-event-id has been increased to %d but has not yet \
+            reached the desired value: %d", current_synced_id, target_event_id)
+        last_synced_id = current_synced_id
+        last_synced_time = t
+      time.sleep(0.1)
+
   @staticmethod
   def wait_for_event_processing(test_suite, timeout=10):
     """Waits till the event processor has synced to the latest event id from metastore
     or the timeout value in seconds whichever is earlier"""
     if EventProcessorUtils.get_event_processor_status() == "DISABLED":
       return
-    success = False
     assert timeout > 0
     assert test_suite.hive_client is not None
     current_event_id = EventProcessorUtils.get_current_notification_id(
       test_suite.hive_client)
-    LOG.info("Waiting until events processor syncs to event id:" + str(
-      current_event_id))
-    end_time = time.time() + timeout
-    while time.time() < end_time:
-      last_synced_id = EventProcessorUtils.get_last_synced_event_id()
-      if last_synced_id >= current_event_id:
-        LOG.debug(
-          "Metric last-synced-event-id has reached the desired value:" + str(
-            last_synced_id))
-        success = True
-        break
-      time.sleep(0.1)
-    if not success:
-      raise Exception(
-        "Event processor did not sync till last known event id {0} \
-        within {1} seconds".format(current_event_id, timeout))
+    EventProcessorUtils.wait_for_synced_event_id(timeout, current_event_id)
     if isinstance(test_suite, CustomClusterTestSuite):
       impala_cluster = test_suite.cluster
     else:
@@ -73,7 +98,6 @@ class EventProcessorUtils(object):
     for impalad in impala_cluster.impalads:
       impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version,
         allow_greater=True)
-    return success
 
   @staticmethod
   def get_event_processor_metrics():