You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/09/19 21:11:46 UTC

[impala] 02/03: IMPALA-11572: deflake test_mt_dop_skew_lpt

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 190b5e41b1e9d24a91432cc470c91e6fff84a041
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Mon Sep 12 15:49:46 2022 +0200

    IMPALA-11572: deflake test_mt_dop_skew_lpt
    
    test_mt_dop_skew_lpt was flaky. Also, it calculated the
    min(bytes_read) / max(bytes_read) globally across all fragment
    insteances, not just among the intra-node fragment instances.
    
    To deflake the test, this test:
     * calculate intra-node min(bytes_read) / max(bytes_read) ratios
       instead of global ones
     * print out the ratios so we'll know the numbers when the test fails
     * eliminate compression codec test dimension which is not used anyway
    
    Change-Id: I823542c21fe8f10f43a501fe4175da883eaf2f99
    Reviewed-on: http://gerrit.cloudera.org:8080/18970
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/query_test/test_scanners.py | 39 ++++++++++++++++++++++++---------------
 1 file changed, 24 insertions(+), 15 deletions(-)

diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 45865bee6..f49ebbd53 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -375,7 +375,8 @@ class TestHdfsScannerSkew(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestHdfsScannerSkew, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        v.get_value('table_format').file_format in ('text'))
+        v.get_value('table_format').file_format in ('text') and
+        v.get_value('table_format').compression_codec == 'none')
 
   @SkipIfLocal.multiple_impalad
   def test_mt_dop_skew_lpt(self, vector, unique_database):
@@ -384,32 +385,41 @@ class TestHdfsScannerSkew(ImpalaTestSuite):
        load balancing with a shared queue between the instances. With IMPALA-11539
        the items in the queue are ordered by scan sizes from largest to smallest, i.e.
        we are doing Longest-Processing Time (LPT) scheduling."""
-    def bytes_read_statistics(profile):
+    def count_intra_node_skew(profile):
+      SKEW_THRESHOLD = 0.85
       lines = [line.strip() for line in profile.splitlines() if "- BytesRead: " in line]
       assert len(lines) == 7  # Averaged fragment + 6 fragment
-      min = None
-      max = None
+      bytes_read_array = []
       for i in range(1, len(lines)):
         # A line looks like:
         # - BytesRead: 202.77 MB (212617555)
         # we only need '212617555' from it
         bytes_read_str = re.findall(r'\((\d+)\)', lines[i])[0]
         bytes_read = int(bytes_read_str)
-        if min is None and max is None:
-          min = max = bytes_read
-          continue
-        if bytes_read < min: min = bytes_read
-        if bytes_read > max: max = bytes_read
-      return [min, max]
+        bytes_read_array.append(bytes_read)
+      count_skew = 0
+      # MT_DOP fragments are next to each other in the profile, so fragment instances
+      # belonging to a single executor starts at 0, 2, 4
+      for i in [0, 2, 4]:
+        a = bytes_read_array[i]
+        b = bytes_read_array[i + 1]
+        if a < b:
+          ratio = float(a) / float(b)
+        else:
+          ratio = float(b) / float(a)
+        print "Intra-node bytes read ratio:", ratio
+        if ratio < SKEW_THRESHOLD:
+          count_skew += 1
+      return count_skew
 
     tbl_name = unique_database + ".lineitem_skew"
     with self.create_impala_client() as imp_client:
       imp_client.set_configuration_option('mt_dop', '2')
       imp_client.execute("""create table {} like tpch.lineitem""".format(tbl_name))
       # Create a couple of small data files
-      for i in range(1, 5):
+      for i in range(1, 11):
         imp_client.execute("""insert into {} select * from tpch.lineitem
-                              where l_orderkey % 5 = 0""".format(tbl_name))
+                              where l_orderkey % 11 = 0""".format(tbl_name))
       # Create a couple of large files
       imp_client.execute("insert into {} select * from tpch.lineitem".format(tbl_name))
 
@@ -423,9 +433,8 @@ class TestHdfsScannerSkew(ImpalaTestSuite):
                       min(l_receiptdate),min(l_shipinstruct),min(l_shipmode),min(l_comment)
                from {}""".format(tbl_name))
         profile = results.runtime_profile
-        [min, max] = bytes_read_statistics(profile)
-        if float(min) / float(max) < 0.5: cnt_fail += 1
-      assert cnt_fail < 3
+        cnt_fail += count_intra_node_skew(profile)
+      assert cnt_fail <= 5
 
 
 class TestHudiParquet(ImpalaTestSuite):