You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/10/17 21:32:21 UTC

[5/5] impala git commit: IMPALA-7639: Move concurrent UDF tests to a custom cluster test

IMPALA-7639: Move concurrent UDF tests to a custom cluster test

Two test_udfs.py tests (test_native_functions_race and
test_concurrent_jar_drop_use) spawn dozens of connections to
test Impala behavior under concurrency. These connections
use up frontend service threads and can cause shell tests
to timeout when trying to connect.

This moves both tests to a new TestUdfConcurrency custom
cluster test. The new custom cluster test uses a larger
fe_service_threads value to allow full concurrency. The
tests run serially and cannot impact other tests.

This also reduces the test dimensions for test_native_functions_race
so that it runs one configuration rather than eight.

Change-Id: I3f255823167a4dd807a07276f630ef02435900a3
Reviewed-on: http://gerrit.cloudera.org:8080/11701
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/6399a65a
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/6399a65a
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/6399a65a

Branch: refs/heads/master
Commit: 6399a65a00cfb6b48da29acbb0921a360bf3a019
Parents: de7f09d
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Tue Oct 16 13:07:35 2018 -0700
Committer: Joe McDonnell <jo...@cloudera.com>
Committed: Wed Oct 17 21:31:24 2018 +0000

----------------------------------------------------------------------
 tests/custom_cluster/test_udf_concurrency.py | 206 ++++++++++++++++++++++
 tests/query_test/test_udfs.py                | 162 -----------------
 2 files changed, 206 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/6399a65a/tests/custom_cluster/test_udf_concurrency.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_udf_concurrency.py b/tests/custom_cluster/test_udf_concurrency.py
new file mode 100644
index 0000000..f0fac27
--- /dev/null
+++ b/tests/custom_cluster/test_udf_concurrency.py
@@ -0,0 +1,206 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+import random
+import threading
+import time
+from subprocess import check_call
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import ImpalaCluster
+from tests.util.filesystem_utils import get_fs_path
+
+# This custom cluster test splits out concurrency tests to allow running with
+# a higher fe_service_threads (and thus higher concurrency). This also avoids
+# side-effects for other tests (see IMPALA-7639).
+
+
+class TestUdfConcurrency(CustomClusterTestSuite):
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestUdfConcurrency, cls).add_test_dimensions()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="--fe_service_threads=1000")
+  def test_native_functions_race(self, vector, unique_database):
+    """ IMPALA-6488: stress concurrent adds, uses, and deletes of native functions.
+        Exposes a crash caused by use-after-free in lib-cache."""
+
+    # Native function used by a query. Stresses lib-cache during analysis and
+    # backend expressions.
+    create_fn_to_use = \
+      """create function {0}.use_it(string) returns string
+         LOCATION '{1}'
+         SYMBOL='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE'"""
+    use_fn = """select * from (select max(int_col) from functional.alltypesagg
+                where {0}.use_it(string_col) = 'blah' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(String_col) > '1' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(string_col) > '1'))) v"""
+    # Reference to another native function from the same 'so' file. Creating/dropping
+    # stresses lib-cache lookup, add, and refresh.
+    create_another_fn = """create function if not exists {0}.other(float)
+                           returns float location '{1}' symbol='Identity'"""
+    drop_another_fn = """drop function if exists {0}.other(float)"""
+    udf_path = get_fs_path('/test-warehouse/libTestUdfs.so')
+
+    # Tracks number of impalads prior to tests to check that none have crashed.
+    # All impalads are assumed to be coordinators.
+    cluster = ImpalaCluster()
+    exp_num_coordinators = cluster.num_responsive_coordinators()
+
+    setup_client = self.create_impala_client()
+    setup_query = create_fn_to_use.format(unique_database, udf_path)
+    try:
+      setup_client.execute(setup_query)
+    except Exception as e:
+      print "Unable to create initial function: {0}".format(setup_query)
+      raise
+
+    errors = []
+
+    def use_fn_method():
+      time.sleep(1 + random.random())
+      client = self.create_impala_client()
+      query = use_fn.format(unique_database)
+      try:
+        client.execute(query)
+      except Exception as e:
+        errors.append(e)
+
+    def load_fn_method():
+      time.sleep(1 + random.random())
+      client = self.create_impala_client()
+      drop = drop_another_fn.format(unique_database)
+      create = create_another_fn.format(unique_database, udf_path)
+      try:
+        client.execute(drop)
+        client.execute(create)
+      except Exception as e:
+        errors.append(e)
+
+    # number of uses/loads needed to reliably reproduce the bug.
+    num_uses = 200
+    num_loads = 200
+
+    # create threads to use native function.
+    runner_threads = []
+    for i in xrange(num_uses):
+      runner_threads.append(threading.Thread(target=use_fn_method))
+
+    # create threads to drop/create native functions.
+    for i in xrange(num_loads):
+      runner_threads.append(threading.Thread(target=load_fn_method))
+
+    # launch all runner threads.
+    for t in runner_threads: t.start()
+
+    # join all threads.
+    for t in runner_threads: t.join()
+
+    for e in errors: print e
+
+    # Checks that no impalad has crashed.
+    assert cluster.num_responsive_coordinators() == exp_num_coordinators
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="--fe_service_threads=1000")
+  def test_concurrent_jar_drop_use(self, vector, unique_database):
+    """IMPALA-6215: race between dropping/using java udf's defined in the same jar.
+       This test runs concurrent drop/use threads that result in class not found
+       exceptions when the race is present.
+    """
+    udf_src_path = os.path.join(
+      os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar")
+    udf_tgt_path = get_fs_path(
+      '/test-warehouse/{0}.db/impala-hive-udfs.jar'.format(unique_database))
+
+    create_fn_to_drop = """create function {0}.foo_{1}() returns string
+                           LOCATION '{2}' SYMBOL='org.apache.impala.TestUpdateUdf'"""
+    create_fn_to_use = """create function {0}.use_it(string) returns string
+                          LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'"""
+    drop_fn = "drop function if exists {0}.foo_{1}()"
+    use_fn = """select * from (select max(int_col) from functional.alltypesagg
+                where {0}.use_it(string_col) = 'blah' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(String_col) > '1' union all
+                (select max(int_col) from functional.alltypesagg
+                 where {0}.use_it(string_col) > '1'))) v"""
+    num_drops = 100
+    num_uses = 100
+
+    # use a unique jar for this test to avoid interactions with other tests
+    # that use the same jar
+    check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path])
+
+    # create all the functions.
+    setup_client = self.create_impala_client()
+    try:
+      s = create_fn_to_use.format(unique_database, udf_tgt_path)
+      setup_client.execute(s)
+    except Exception as e:
+      print e
+      assert False
+    for i in range(0, num_drops):
+      try:
+        setup_client.execute(create_fn_to_drop.format(unique_database, i, udf_tgt_path))
+      except Exception as e:
+        print e
+        assert False
+
+    errors = []
+
+    def use_fn_method():
+      time.sleep(5 + random.random())
+      client = self.create_impala_client()
+      try:
+        client.execute(use_fn.format(unique_database))
+      except Exception as e: errors.append(e)
+
+    def drop_fn_method(i):
+      time.sleep(1 + random.random())
+      client = self.create_impala_client()
+      try:
+        client.execute(drop_fn.format(unique_database, i))
+      except Exception as e: errors.append(e)
+
+    # create threads to use functions.
+    runner_threads = []
+    for i in range(0, num_uses):
+      runner_threads.append(threading.Thread(target=use_fn_method))
+
+    # create threads to drop functions.
+    for i in range(0, num_drops):
+      runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, )))
+
+    # launch all runner threads.
+    for t in runner_threads: t.start()
+
+    # join all threads.
+    for t in runner_threads: t.join()
+
+    # Check for any errors.
+    for e in errors: print e
+    assert len(errors) == 0

http://git-wip-us.apache.org/repos/asf/impala/blob/6399a65a/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index e93f511..6bb9b94 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -18,9 +18,6 @@
 from copy import copy
 import os
 import pytest
-import random
-import threading
-import time
 import tempfile
 from subprocess import call, check_call
 
@@ -298,86 +295,6 @@ class TestUdfExecution(TestUdfBase):
       self.run_test_case('QueryTest/udf-non-deterministic', vector,
           use_db=unique_database)
 
-  def test_native_functions_race(self, vector, unique_database):
-    """ IMPALA-6488: stress concurrent adds, uses, and deletes of native functions.
-        Exposes a crash caused by use-after-free in lib-cache."""
-
-    # Native function used by a query. Stresses lib-cache during analysis and
-    # backend expressions.
-    create_fn_to_use = """create function {0}.use_it(string) returns string
-                          LOCATION '{1}'
-                          SYMBOL='_Z8IdentityPN10impala_udf15FunctionContextERKNS_9StringValE'"""
-    use_fn = """select * from (select max(int_col) from functional.alltypesagg
-                where {0}.use_it(string_col) = 'blah' union all
-                (select max(int_col) from functional.alltypesagg
-                 where {0}.use_it(String_col) > '1' union all
-                (select max(int_col) from functional.alltypesagg
-                 where {0}.use_it(string_col) > '1'))) v"""
-    # Reference to another native function from the same 'so' file. Creating/dropping
-    # stresses lib-cache lookup, add, and refresh.
-    create_another_fn = """create function if not exists {0}.other(float)
-                           returns float location '{1}' symbol='Identity'"""
-    drop_another_fn = """drop function if exists {0}.other(float)"""
-    udf_path = get_fs_path('/test-warehouse/libTestUdfs.so')
-
-    # Tracks number of impalads prior to tests to check that none have crashed.
-    # All impalads are assumed to be coordinators.
-    cluster = ImpalaCluster()
-    exp_num_coordinators = cluster.num_responsive_coordinators()
-
-    setup_client = self.create_impala_client()
-    setup_query = create_fn_to_use.format(unique_database, udf_path)
-    try:
-      setup_client.execute(setup_query)
-    except Exception as e:
-      print "Unable to create initial function: {0}".format(setup_query)
-      raise
-
-    errors = []
-    def use_fn_method():
-      time.sleep(1 + random.random())
-      client = self.create_impala_client()
-      query = use_fn.format(unique_database)
-      try:
-        client.execute(query)
-      except Exception as e:
-        errors.append(e)
-
-    def load_fn_method():
-      time.sleep(1 + random.random())
-      client = self.create_impala_client()
-      drop = drop_another_fn.format(unique_database)
-      create = create_another_fn.format(unique_database, udf_path)
-      try:
-        client.execute(drop)
-        client.execute(create)
-      except Exception as e:
-        errors.append(e)
-
-    # number of uses/loads needed to reliably reproduce the bug.
-    num_uses = 200
-    num_loads = 200
-
-    # create threads to use native function.
-    runner_threads = []
-    for i in xrange(num_uses):
-      runner_threads.append(threading.Thread(target=use_fn_method))
-
-    # create threads to drop/create native functions.
-    for i in xrange(num_loads):
-      runner_threads.append(threading.Thread(target=load_fn_method))
-
-    # launch all runner threads.
-    for t in runner_threads: t.start()
-
-    # join all threads.
-    for t in runner_threads: t.join();
-
-    for e in errors: print e
-
-    # Checks that no impalad has crashed.
-    assert cluster.num_responsive_coordinators() == exp_num_coordinators
-
   def test_ir_functions(self, vector, unique_database):
     if vector.get_value('exec_option')['disable_codegen']:
       # IR functions require codegen to be enabled.
@@ -516,85 +433,6 @@ class TestUdfTargeted(TestUdfBase):
       assert "Unable to find class" in str(ex)
     self.client.execute(drop_fn_stmt)
 
-  def test_concurrent_jar_drop_use(self, vector, unique_database):
-    """IMPALA-6215: race between dropping/using java udf's defined in the same jar.
-       This test runs concurrent drop/use threads that result in class not found
-       exceptions when the race is present.
-    """
-    udf_src_path = os.path.join(
-      os.environ['IMPALA_HOME'], "testdata/udfs/impala-hive-udfs.jar")
-    udf_tgt_path = get_fs_path(
-      '/test-warehouse/{0}.db/impala-hive-udfs.jar'.format(unique_database))
-
-    create_fn_to_drop = """create function {0}.foo_{1}() returns string
-                           LOCATION '{2}' SYMBOL='org.apache.impala.TestUpdateUdf'"""
-    create_fn_to_use = """create function {0}.use_it(string) returns string
-                          LOCATION '{1}' SYMBOL='org.apache.impala.TestUdf'"""
-    drop_fn = "drop function if exists {0}.foo_{1}()"
-    use_fn = """select * from (select max(int_col) from functional.alltypesagg
-                where {0}.use_it(string_col) = 'blah' union all
-                (select max(int_col) from functional.alltypesagg
-                 where {0}.use_it(String_col) > '1' union all
-                (select max(int_col) from functional.alltypesagg
-                 where {0}.use_it(string_col) > '1'))) v"""
-    num_drops = 100
-    num_uses = 100
-
-    # use a unique jar for this test to avoid interactions with other tests
-    # that use the same jar
-    check_call(["hadoop", "fs", "-put", "-f", udf_src_path, udf_tgt_path])
-
-    # create all the functions.
-    setup_client = self.create_impala_client()
-    try:
-      s = create_fn_to_use.format(unique_database, udf_tgt_path)
-      setup_client.execute(s)
-    except Exception as e:
-      print e
-      assert False
-    for i in range(0, num_drops):
-      try:
-        setup_client.execute(create_fn_to_drop.format(unique_database, i, udf_tgt_path))
-      except Exception as e:
-        print e
-        assert False
-
-    errors = []
-    def use_fn_method():
-      time.sleep(5 + random.random())
-      client = self.create_impala_client()
-      try:
-        client.execute(use_fn.format(unique_database))
-      except Exception as e: errors.append(e)
-
-    def drop_fn_method(i):
-      time.sleep(1 + random.random())
-      client = self.create_impala_client()
-      try:
-        client.execute(drop_fn.format(unique_database, i))
-      except Exception as e: errors.append(e)
-
-    # create threads to use functions.
-    runner_threads = []
-    for i in range(0, num_uses):
-      runner_threads.append(threading.Thread(target=use_fn_method))
-
-    # create threads to drop functions.
-    drop_threads = []
-    for i in range(0, num_drops):
-      runner_threads.append(threading.Thread(target=drop_fn_method, args=(i, )))
-
-    # launch all runner threads.
-    for t in runner_threads: t.start()
-
-    # join all threads.
-    for t in runner_threads: t.join();
-
-    # Check for any errors.
-    for e in errors: print e
-    assert len(errors) == 0
-
-
   @SkipIfLocal.multiple_impalad
   def test_hive_udfs_missing_jar(self, vector, unique_database):
     """ IMPALA-2365: Impalad shouldn't crash if the udf jar isn't present