You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2019/07/21 23:36:23 UTC
[impala] 05/05: IMPALA-8776: Bump timeout in test_event_processing
This is an automated email from the ASF dual-hosted git repository.
lv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 51092260b69b9082f0ecb2a9212e680f5dbc6f43
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Sat Jul 20 11:47:21 2019 -0700
IMPALA-8776: Bump timeout in test_event_processing
This change increases the timeout in test_event_processing and adds a
delay between repeated reads of the /events page. The latter should
reduce load on the catalog, which should further reduce the chance of
hitting a timeout.
Change-Id: I42d278209b9c01eba44decf1972930afd72522e9
Reviewed-on: http://gerrit.cloudera.org:8080/13888
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
tests/custom_cluster/test_event_processing.py | 89 +++++++++++++++------------
1 file changed, 49 insertions(+), 40 deletions(-)
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index 7984664..4397e25 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -18,10 +18,11 @@
import pytest
import json
import time
-from tests.common.environ import build_flavor_timeout
import requests
-from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
+
+from tests.common.environ import build_flavor_timeout
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.util.hive_utils import HiveDbWrapper
@@ -31,14 +32,16 @@ from tests.util.hive_utils import HiveDbWrapper
@SkipIfIsilon.hive
@SkipIfLocal.hive
class TestEventProcessing(CustomClusterTestSuite):
+ """This class contains tests that exercise the event processing mechanism in the
+ catalog."""
+ CATALOG_URL = "http://localhost:25020"
+ PROCESSING_TIMEOUT_S = 10
+
@pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args(
- catalogd_args="--hms_event_polling_interval_s=2"
- )
+ @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=2")
def test_insert_events(self):
- """
- Test for insert event processing. Events are created in Hive and processed in Impala.
- The following cases are tested :
+ """Test for insert event processing. Events are created in Hive and processed in
+ Impala. The following cases are tested :
Insert into table --> for partitioned and non-partitioned table
Insert overwrite table --> for partitioned and non-partitioned table
Insert into partition --> for partitioned table
@@ -53,18 +56,20 @@ class TestEventProcessing(CustomClusterTestSuite):
# Test insert into table, this will fire an insert event.
self.run_stmt_in_hive("insert into %s.%s values(101, 200)"
% (db_name, TBL_INSERT_NOPART))
- # With MetastoreEventProcessor running, the insert event will be processsed. Query
- # the table from Impala
+ # With MetastoreEventProcessor running, the insert event will be processed. Query the
+ # table from Impala.
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+ # Verify that the data is present in Impala.
data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
assert data.split('\t') == ['101', '200']
- # Test insert overwrite. Overwite the existing value.
+
+ # Test insert overwrite. Overwrite the existing value.
last_synced_event_id = self.get_last_synced_event_id()
self.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
% (db_name, TBL_INSERT_NOPART))
# Make sure the event has been processed.
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
- # Query table from Impala
+ # Verify that the data is present in Impala.
data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_NOPART))
assert data.split('\t') == ['101', '201']
@@ -73,12 +78,12 @@ class TestEventProcessing(CustomClusterTestSuite):
TBL_INSERT_PART = 'tbl_insert_part'
self.run_stmt_in_hive("create table %s.%s (id int, name string) "
"partitioned by(day int, month int, year int)" % (db_name, TBL_INSERT_PART))
- # Insert data into partitions
+ # Insert data into partitions.
self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
"values(101, 'x')" % (db_name, TBL_INSERT_PART))
- # Make sure the event is processed.
+ # Make sure the event has been processed.
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
- # Test if the data is present in Impala
+ # Verify that the data is present in Impala.
data = self.execute_scalar("select * from %s.%s" % (db_name, TBL_INSERT_PART))
assert data.split('\t') == ['101', 'x', '28', '3', '2019']
@@ -87,6 +92,7 @@ class TestEventProcessing(CustomClusterTestSuite):
self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
"values(102, 'y')" % (db_name, TBL_INSERT_PART))
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+ # Verify that the data is present in Impala.
data = self.execute_scalar("select count(*) from %s.%s where day=28 and month=3 "
"and year=2019" % (db_name, TBL_INSERT_PART))
assert data.split('\t') == ['2']
@@ -96,44 +102,47 @@ class TestEventProcessing(CustomClusterTestSuite):
self.run_stmt_in_hive("insert overwrite table %s.%s partition(day=28, month=03, "
"year=2019)" "values(101, 'z')" % (db_name, TBL_INSERT_PART))
assert self.wait_for_insert_event_processing(last_synced_event_id) is True
+ # Verify that the data is present in Impala.
data = self.execute_scalar("select * from %s.%s where day=28 and month=3 and"
" year=2019 and id=101" % (db_name, TBL_INSERT_PART))
assert data.split('\t') == ['101', 'z', '28', '3', '2019']
def wait_for_insert_event_processing(self, previous_event_id):
- """ Wait till the event processor has finished processing insert events. This is
- detected by scrapping the /events webpage for changes in last_synced_event_id.
- Since two events are created for every insert done through hive, we wait till the
- event id is incremented by at least two. Returns true if at least two events were
- processed within 10 sec. False otherwise.
+ """Waits until the event processor has finished processing insert events. Since two
+ events are created for every insert done through hive, we wait until the event id is
+ incremented by at least two. Returns true if at least two events were processed within
+ self.PROCESSING_TIMEOUT_S, False otherwise.
"""
new_event_id = self.get_last_synced_event_id()
- success = True
- start_time = time.time()
- while new_event_id - previous_event_id < 2:
+ success = False
+ end_time = time.time() + self.PROCESSING_TIMEOUT_S
+ while time.time() < end_time:
new_event_id = self.get_last_synced_event_id()
- # Prevent infinite loop
- time_delta = time.time() - start_time
- if time_delta > 10:
- success = False
+ if new_event_id - previous_event_id >= 2:
+ success = True
break
+ time.sleep(0.1)
# Wait for catalog update to be propagated.
time.sleep(build_flavor_timeout(2, slow_build_timeout=4))
return success
- def get_last_synced_event_id(self):
- """
- Scrape the /events webpage and return the last_synced_event_id.
- """
- response = requests.get("http://localhost:25020/events?json")
+ def get_event_processor_metrics(self):
+ """Scrapes the catalog's /events webpage and return a dictionary with the event
+ processor metrics."""
+ response = requests.get("%s/events?json" % self.CATALOG_URL)
assert response.status_code == requests.codes.ok
varz_json = json.loads(response.text)
- metrics = varz_json["event_processor_metrics"].strip().split('\n')
- kv_map = {}
- for kv in metrics:
- if len(kv) > 0:
- pair = kv.split(':')
- kv_map[pair[0].strip()] = pair[1].strip()
+ metrics = varz_json["event_processor_metrics"].strip().splitlines()
- last_synced_event_id = int(kv_map['last-synced-event-id'])
- return last_synced_event_id
+ # Helper to strip a pair of elements
+ def strip_pair(p):
+ return (p[0].strip(), p[1].strip())
+
+ pairs = [strip_pair(kv.split(':')) for kv in metrics if kv]
+ return dict(pairs)
+
+ def get_last_synced_event_id(self):
+ """Returns the last_synced_event_id."""
+ metrics = self.get_event_processor_metrics()
+ assert 'last-synced-event-id' in metrics.keys()
+ return int(metrics['last-synced-event-id'])