You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2019/07/21 23:36:23 UTC

[impala] 05/05: IMPALA-8776: Bump timeout in test_event_processing

This is an automated email from the ASF dual-hosted git repository.

lv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 51092260b69b9082f0ecb2a9212e680f5dbc6f43
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Sat Jul 20 11:47:21 2019 -0700

    IMPALA-8776: Bump timeout in test_event_processing
    
    This change increases the timeout in test_event_processing and adds a
    delay between repeated reads of the /events page. The latter should
    reduce load on the catalog, which should further reduce the chance of
    hitting a timeout.
    
    Change-Id: I42d278209b9c01eba44decf1972930afd72522e9
    Reviewed-on: http://gerrit.cloudera.org:8080/13888
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/custom_cluster/test_event_processing.py | 89 +++++++++++++++------------
 1 file changed, 49 insertions(+), 40 deletions(-)

diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 7984664..4397e25 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -18,10 +18,11 @@
 import pytest
 import json
 import time
-from tests.common.environ import build_flavor_timeout
 import requests
-from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
+
+from tests.common.environ import build_flavor_timeout
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.util.hive_utils import HiveDbWrapper
 
 
@@ -31,14 +32,16 @@ from tests.util.hive_utils import HiveDbWrapper
 @SkipIfIsilon.hive
 @SkipIfLocal.hive
 class TestEventProcessing(CustomClusterTestSuite):
+  """This class contains tests that exercise the event processing mechanism in the
+  catalog."""
+  CATALOG_URL = "http://localhost:25020"
+  PROCESSING_TIMEOUT_S = 10
+
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(
-    catalogd_args="--hms_event_polling_interval_s=2"
-  )
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
   def test_insert_events(self):
-    """
-    Test for insert event processing. Events are created in Hive and processed in Impala.
-    The following cases are tested :
+    """Test for insert event processing. Events are created in Hive and processed in
+    Impala. The following cases are tested :
     Insert into table --> for partitioned and non-partitioned table
     Insert overwrite table --> for partitioned and non-partitioned table
     Insert into partition --> for partitioned table
@@ -53,18 +56,20 @@ class TestEventProcessing(CustomClusterTestSuite):
      # Test insert into table, this will fire an insert event.
      self.run_stmt_in_hive("insert into %s.%s values(101, 200)"
          % (db_name, TBL_INSERT_NOPART))
-     # With MetastoreEventProcessor running, the insert event will be processsed. Query
-     # the table from Impala
+     # With MetastoreEventProcessor running, the insert event will be processed. Query the
+     # table from Impala.
      assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+     # Verify that the data is present in Impala.
      data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
      assert data.split('\t') == ['101', '200']
-     # Test insert overwrite. Overwite the existing value.
+
+     # Test insert overwrite. Overwrite the existing value.
      last_synced_event_id = self.get_last_synced_event_id()
      self.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
          % (db_name, TBL_INSERT_NOPART))
      # Make sure the event has been processed.
      assert self.wait_for_insert_event_processing(last_synced_event_id) is True
-     # Query table from Impala
+     # Verify that the data is present in Impala.
      data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
      assert data.split('\t') == ['101', '201']
 
@@ -73,12 +78,12 @@ class TestEventProcessing(CustomClusterTestSuite):
      TBL_INSERT_PART = 'tbl_insert_part'
      self.run_stmt_in_hive("create table %s.%s (id int, name string) "
          "partitioned by(day int, month int, year int)" % (db_name, TBL_INSERT_PART))
-     # Insert data into partitions
+     # Insert data into partitions.
      self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
          "values(101, 'x')" % (db_name, TBL_INSERT_PART))
-     #  Make sure the event is  processed.
+     # Make sure the event has been processed.
      assert self.wait_for_insert_event_processing(last_synced_event_id) is True
-     # Test if the data is present in Impala
+     # Verify that the data is present in Impala.
      data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_PART))
      assert data.split('\t') == ['101', 'x', '28', '3', '2019']
 
@@ -87,6 +92,7 @@ class TestEventProcessing(CustomClusterTestSuite):
      self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
          "values(102, 'y')" % (db_name, TBL_INSERT_PART))
      assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+     # Verify that the data is present in Impala.
      data = self.execute_scalar("select count(*) from %s.%s where day=28 and month=3 "
          "and year=2019" % (db_name, TBL_INSERT_PART))
      assert data.split('\t') == ['2']
@@ -96,44 +102,47 @@ class TestEventProcessing(CustomClusterTestSuite):
      self.run_stmt_in_hive("insert overwrite table %s.%s partition(day=28, month=03, "
          "year=2019)" "values(101, 'z')" % (db_name, TBL_INSERT_PART))
      assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+     # Verify that the data is present in Impala.
      data = self.execute_scalar("select * from %s.%s where day=28 and month=3 and"
          " year=2019 and id=101" % (db_name, TBL_INSERT_PART))
      assert data.split('\t') == ['101', 'z', '28', '3', '2019']
 
   def wait_for_insert_event_processing(self, previous_event_id):
-    """ Wait till the event processor has finished processing insert events. This is
-    detected by scrapping the /events webpage for changes in last_synced_event_id.
-    Since two events are created for every insert done through hive, we wait till the
-    event id is incremented by at least two. Returns true if at least two events were
-    processed within 10 sec. False otherwise.
+    """Waits until the event processor has finished processing insert events. Since two
+    events are created for every insert done through hive, we wait until the event id is
+    incremented by at least two. Returns true if at least two events were processed within
+    self.PROCESSING_TIMEOUT_S, False otherwise.
     """
     new_event_id = self.get_last_synced_event_id()
-    success = True
-    start_time = time.time()
-    while new_event_id - previous_event_id < 2:
+    success = False
+    end_time = time.time() + self.PROCESSING_TIMEOUT_S
+    while time.time() < end_time:
       new_event_id = self.get_last_synced_event_id()
-      # Prevent infinite loop
-      time_delta = time.time() - start_time
-      if time_delta > 10:
-        success = False
+      if new_event_id - previous_event_id >= 2:
+        success = True
         break
+      time.sleep(0.1)
     # Wait for catalog update to be propagated.
     time.sleep(build_flavor_timeout(2, slow_build_timeout=4))
     return success
 
-  def get_last_synced_event_id(self):
-    """
-    Scrape the /events webpage and return the last_synced_event_id.
-    """
-    response = requests.get("http://localhost:25020/events?json")
+  def get_event_processor_metrics(self):
+    """Scrapes the catalog's /events webpage and return a dictionary with the event
+    processor metrics."""
+    response = requests.get("%s/events?json" % self.CATALOG_URL)
     assert response.status_code == requests.codes.ok
     varz_json = json.loads(response.text)
-    metrics = varz_json["event_processor_metrics"].strip().split('\n')
-    kv_map = {}
-    for kv in metrics:
-      if len(kv) > 0:
-        pair = kv.split(':')
-        kv_map[pair[0].strip()] = pair[1].strip()
+    metrics = varz_json["event_processor_metrics"].strip().splitlines()
 
-    last_synced_event_id = int(kv_map['last-synced-event-id'])
-    return last_synced_event_id
+    # Helper to strip a pair of elements
+    def strip_pair(p):
+      return (p[0].strip(), p[1].strip())
+
+    pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
+    return dict(pairs)
+
+  def get_last_synced_event_id(self):
+    """Returns the last_synced_event_id."""
+    metrics = self.get_event_processor_metrics()
+    assert 'last-synced-event-id' in metrics.keys()
+    return int(metrics['last-synced-event-id'])