You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2019/08/06 04:18:56 UTC

[impala] 03/04: IMPALA-8781: Result spooling tests to cover edge cases and cancellation

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bbec8fa74961755269298706302477780019e7d5
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Mon Jul 22 11:27:33 2019 -0700

    IMPALA-8781: Result spooling tests to cover edge cases and cancellation
    
    Adds additional tests to test_result_spooling.py to cover various edge
    cases when fetching query results (ensure all Impala types are returned
    properly, UDFs are evaluated correctly, etc.). A new QueryTest file
    result-spooling.test is added to encapsulate all these tests. Tests with
    a decreased ROW_BATCH_SIZE are added as well to validate that
    BufferedPlanRootSink buffers row batches correctly.
    
    BufferedPlanRootSink requires careful synchronization of the producer
    and consumer threads, especially when queries are cancelled. The
    TestResultSpoolingCancellation class is dedicated to running
    cancellation tests with SPOOL_QUERY_RESULTS = true. The implementation
    is heavily borrowed from test_cancellation.py and some of the logic is
    re-factored into a new utility class called cancel_utils.py to avoid
    code duplication between test_cancellation.py and
    test_result_spooling.py.
    
    Testing:
    * Looped test_result_spooling.py overnight with no failures
    * Core tests passed
    
    Change-Id: Ib3b3a1539c4a5fa9b43c8ca315cea16c9701e283
    Reviewed-on: http://gerrit.cloudera.org:8080/13907
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../queries/QueryTest/result-spooling.test         | 113 +++++++++++++++++++++
 tests/hs2/test_fetch_first.py                      |  19 +++-
 tests/query_test/test_cancellation.py              |  58 +----------
 tests/query_test/test_result_spooling.py           | 101 ++++++++++++++++--
 tests/util/cancel_util.py                          |  87 ++++++++++++++++
 5 files changed, 314 insertions(+), 64 deletions(-)

diff --git a/testdata/workloads/functional-query/queries/QueryTest/result-spooling.test b/testdata/workloads/functional-query/queries/QueryTest/result-spooling.test
new file mode 100644
index 0000000..4a80805
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/result-spooling.test
@@ -0,0 +1,113 @@
+====
+---- QUERY
+# Validate reading all types from a table when result spooling is enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select * from alltypes order by id limit 10
+---- RESULTS
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+2,true,2,2,2,20,2.200000047683716,20.2,'01/01/09','2',2009-01-01 00:02:00.100000000,2009,1
+3,false,3,3,3,30,3.299999952316284,30.3,'01/01/09','3',2009-01-01 00:03:00.300000000,2009,1
+4,true,4,4,4,40,4.400000095367432,40.4,'01/01/09','4',2009-01-01 00:04:00.600000000,2009,1
+5,false,5,5,5,50,5.5,50.5,'01/01/09','5',2009-01-01 00:05:00.100000000,2009,1
+6,true,6,6,6,60,6.599999904632568,60.59999999999999,'01/01/09','6',2009-01-01 00:06:00.150000000,2009,1
+7,false,7,7,7,70,7.699999809265137,70.7,'01/01/09','7',2009-01-01 00:07:00.210000000,2009,1
+8,true,8,8,8,80,8.800000190734863,80.8,'01/01/09','8',2009-01-01 00:08:00.280000000,2009,1
+9,false,9,9,9,90,9.899999618530273,90.89999999999999,'01/01/09','9',2009-01-01 00:09:00.360000000,2009,1
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT
+====
+---- QUERY
+# Validates that column ordering is preserved when result spooling is enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select month, year, timestamp_col, string_col, date_string_col, double_col, float_col,
+bigint_col, int_col, smallint_col, tinyint_col, bool_col, id from alltypes
+order by id limit 10
+---- RESULTS
+1,2009,2009-01-01 00:00:00,'0','01/01/09',0,0,0,0,0,0,true,0
+1,2009,2009-01-01 00:01:00,'1','01/01/09',10.1,1.100000023841858,10,1,1,1,false,1
+1,2009,2009-01-01 00:02:00.100000000,'2','01/01/09',20.2,2.200000047683716,20,2,2,2,true,2
+1,2009,2009-01-01 00:03:00.300000000,'3','01/01/09',30.3,3.299999952316284,30,3,3,3,false,3
+1,2009,2009-01-01 00:04:00.600000000,'4','01/01/09',40.4,4.400000095367432,40,4,4,4,true,4
+1,2009,2009-01-01 00:05:00.100000000,'5','01/01/09',50.5,5.5,50,5,5,5,false,5
+1,2009,2009-01-01 00:06:00.150000000,'6','01/01/09',60.59999999999999,6.599999904632568,60,6,6,6,true,6
+1,2009,2009-01-01 00:07:00.210000000,'7','01/01/09',70.7,7.699999809265137,70,7,7,7,false,7
+1,2009,2009-01-01 00:08:00.280000000,'8','01/01/09',80.8,8.800000190734863,80,8,8,8,true,8
+1,2009,2009-01-01 00:09:00.360000000,'9','01/01/09',90.89999999999999,9.899999618530273,90,9,9,9,false,9
+---- TYPES
+INT,INT,TIMESTAMP,STRING,STRING,DOUBLE,FLOAT,BIGINT,INT,SMALLINT,TINYINT,BOOLEAN,INT
+====
+---- QUERY
+# Validates that UDFs are properly evaluated when result spooling is enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select abs(id), abs(int_col) + abs(int_col) from alltypes order by id limit 10
+---- RESULTS
+0,0
+1,2
+2,4
+3,6
+4,8
+5,10
+6,12
+7,14
+8,16
+9,18
+---- TYPES
+BIGINT,BIGINT
+====
+---- QUERY
+# Validates that queries with Agg nodes return correct results when result spooling is
+# enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select avg(int_col) + avg(bigint_col) from alltypes
+---- RESULTS
+49.5
+---- TYPES
+DOUBLE
+====
+---- QUERY
+# Validates that queries with Join nodes return correct results when result spooling is
+# enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select j.test_name, d.name from jointbl j inner join dimtbl d on
+(j.test_id = d.id)
+---- RESULTS
+'Name1','Name1'
+'Name2','Name2'
+'Name3','Name3'
+'Name4','Name4'
+'Name5','Name5'
+'Name16','Name6'
+'Name6','Name6'
+'Name16','Name6'
+'Name16','Name6'
+'Name6','Name6'
+'Name16','Name6'
+---- TYPES
+STRING,STRING
+====
+---- QUERY
+# Validates that NULLs are properly handled when result spooling is enabled.
+SET SPOOL_QUERY_RESULTS=true;
+select * from nullrows order by id limit 10
+---- RESULTS
+'a','','NULL',NULL,NULL,'a','a',true
+'b','','NULL',NULL,NULL,'a','NULL',false
+'c','','NULL',NULL,NULL,'a','NULL',NULL
+'d','','NULL',NULL,NULL,'a','NULL',NULL
+'e','','NULL',NULL,NULL,'a','NULL',NULL
+'f','','NULL',NULL,NULL,'f','f',true
+'g','','NULL',NULL,NULL,'f','NULL',false
+'h','','NULL',NULL,NULL,'f','NULL',NULL
+'i','','NULL',NULL,NULL,'f','NULL',NULL
+'j','','NULL',NULL,NULL,'f','NULL',NULL
+---- TYPES
+STRING,STRING,STRING,INT,DOUBLE,STRING,STRING,BOOLEAN
+====
+---- QUERY
+SET SPOOL_QUERY_RESULTS=true;
+select * from emptytable;
+---- RESULTS
+---- TYPES
+STRING,INT
+====
diff --git a/tests/hs2/test_fetch_first.py b/tests/hs2/test_fetch_first.py
index 12451bb..42feee8 100644
--- a/tests/hs2/test_fetch_first.py
+++ b/tests/hs2/test_fetch_first.py
@@ -104,14 +104,26 @@ class TestFetchFirst(HS2TestSuite):
   @pytest.mark.execute_serially
   @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6)
   def test_query_stmts_v6(self):
-    self.run_query_stmts_test();
+    self.run_query_stmts_test()
 
   @pytest.mark.execute_serially
   @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
   def test_query_stmts_v1(self):
-    self.run_query_stmts_test();
+    self.run_query_stmts_test()
 
-  def run_query_stmts_test(self):
+  @pytest.mark.xfail(reason="Unsupported until IMPALA-8819 is completed")
+  @pytest.mark.execute_serially
+  @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6)
+  def test_query_stmts_v6_with_result_spooling(self):
+    self.run_query_stmts_test({'spool_query_results': 'true'})
+
+  @pytest.mark.xfail(reason="Unsupported until IMPALA-8819 is completed")
+  @pytest.mark.execute_serially
+  @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1)
+  def test_query_stmts_v1_with_result_spooling(self):
+    self.run_query_stmts_test({'spool_query_results': 'true'})
+
+  def run_query_stmts_test(self, conf_overlay=dict()):
     """Tests Impala's limited support for the FETCH_FIRST fetch orientation for queries.
     Impala permits FETCH_FIRST for a particular query iff result caching is enabled
     via the 'impala.resultset.cache.size' confOverlay option. FETCH_FIRST will succeed as
@@ -125,6 +137,7 @@ class TestFetchFirst(HS2TestSuite):
     execute_statement_req = TCLIService.TExecuteStatementReq()
     execute_statement_req.sessionHandle = self.session_handle
     execute_statement_req.confOverlay = dict()
+    execute_statement_req.confOverlay.update(conf_overlay)
     execute_statement_req.statement =\
       "SELECT * FROM functional.alltypessmall ORDER BY id LIMIT 30"
     execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index ff21a7a..2fa5f03 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -25,6 +25,7 @@ from RuntimeProfile.ttypes import TRuntimeProfileFormat
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
 from tests.common.test_vector import ImpalaTestDimension
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.util.cancel_util import cancel_query_and_validate_state
 from tests.verifiers.metric_verifier import MetricVerifier
 
 # PRIMARY KEY for lineitem
@@ -135,7 +136,6 @@ class TestCancellation(ImpalaTestSuite):
         query = "create table ctas_cancel stored as %sfile as %s" %\
             (file_format, query)
 
-    join_before_close = vector.get_value('join_before_close')
     wait_action = vector.get_value('wait_action')
     fail_rpc_action = vector.get_value('fail_rpc_action')
 
@@ -148,63 +148,13 @@ class TestCancellation(ImpalaTestSuite):
 
     # Execute the query multiple times, cancelling it each time.
     for i in xrange(NUM_CANCELATION_ITERATIONS):
-      handle = self.execute_query_async(query, vector.get_value('exec_option'),
-                                        table_format=vector.get_value('table_format'))
-
-      def fetch_results():
-        threading.current_thread().fetch_results_error = None
-        threading.current_thread().query_profile = None
-        try:
-          new_client = self.create_impala_client()
-          new_client.fetch(query, handle)
-        except ImpalaBeeswaxException as e:
-          threading.current_thread().fetch_results_error = e
-
-      thread = threading.Thread(target=fetch_results)
-      thread.start()
-
-      sleep(vector.get_value('cancel_delay'))
-      assert self.client.get_state(handle) != self.client.QUERY_STATES['EXCEPTION']
-      cancel_result = self.client.cancel(handle)
-      assert cancel_result.status_code == 0,\
-          'Unexpected status code from cancel request: %s' % cancel_result
-
-      if join_before_close:
-        thread.join()
-
-      close_error = None
-      try:
-        self.client.close_query(handle)
-      except ImpalaBeeswaxException as e:
-        close_error = e
-
-      # Before accessing fetch_results_error we need to join the fetch thread
-      thread.join()
-
-      if thread.fetch_results_error is None:
-        # If the fetch rpc didn't result in CANCELLED (and auto-close the query) then
-        # the close rpc should have succeeded.
-        assert close_error is None
-      elif close_error is None:
-        # If the close rpc succeeded, then the fetch rpc should have either succeeded,
-        # failed with 'Cancelled' or failed with 'Invalid query handle' (if the close
-        # rpc occured before the fetch rpc).
-        if thread.fetch_results_error is not None:
-          assert 'Cancelled' in str(thread.fetch_results_error) or \
-            ('Invalid query handle' in str(thread.fetch_results_error) \
-             and not join_before_close)
-      else:
-        # If the close rpc encountered an exception, then it must be due to fetch
-        # noticing the cancellation and doing the auto-close.
-        assert 'Invalid or unknown query handle' in str(close_error)
-        assert 'Cancelled' in str(thread.fetch_results_error)
+      cancel_query_and_validate_state(self.client, query,
+          vector.get_value('exec_option'), vector.get_value('table_format'),
+          vector.get_value('cancel_delay'), vector.get_value('join_before_close'))
 
       if query_type == "CTAS":
         self.cleanup_test_table(vector.get_value('table_format'))
 
-      # TODO: Add some additional verification to check to make sure the query was
-      # actually canceled
-
     # Executing the same query without canceling should work fine. Only do this if the
     # query has a limit or aggregation
     if not debug_action and ('count' in query or 'limit' in query):
diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py
index 40b55b6..470ad45 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -15,18 +15,105 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from time import sleep
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_vector import ImpalaTestDimension
+from tests.util.cancel_util import cancel_query_and_validate_state
+
+# Queries to execute, use the TPC-H dataset because tables are large so queries take some
+# time to execute.
+CANCELLATION_QUERIES = ['select l_returnflag from tpch_parquet.lineitem',
+                        'select * from tpch_parquet.lineitem limit 50',
+                        'select * from tpch_parquet.lineitem order by l_orderkey']
+
+# Time to sleep between issuing query and canceling.
+CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4]
 
 
 class TestResultSpooling(ImpalaTestSuite):
   @classmethod
+  def add_test_dimensions(cls):
+    super(TestResultSpooling, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet')
+
+  @classmethod
   def get_workload(cls):
     return 'functional-query'
 
-  def test_result_spooling(self):
-    """Tests that setting SPOOL_QUERY_RESULTS = true for simple queries returns the
-    correct number of results."""
-    query_opts = {"spool_query_results": "true"}
-    query = "select * from functional.alltypes limit 10"
-    result = self.execute_query_expect_success(self.client, query, query_opts)
-    assert(len(result.data) == 10)
+  def test_result_spooling(self, vector):
+    self.run_test_case('QueryTest/result-spooling', vector)
+
+  def test_multi_batches(self, vector):
+    """Validates that reading multiple row batches works when result spooling is
+    enabled."""
+    vector.get_value('exec_option')['batch_size'] = 10
+    self.validate_query("select id from alltypes order by id limit 1000",
+        vector.get_value('exec_option'))
+
+  def validate_query(self, query, exec_options):
+    """Compares the results of the given query with and without result spooling
+    enabled."""
+    exec_options = exec_options.copy()
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Failed to run {0} when result spooling is " \
+                           "disabled".format(query)
+    base_data = result.data
+    exec_options['spool_query_results'] = 'true'
+    result = self.execute_query(query, exec_options)
+    assert result.success, "Failed to run {0} when result spooling is " \
+                           "enabled".format(query)
+    assert len(result.data) == len(base_data), "{0} returned a different number of " \
+                                               "results when result spooling was " \
+                                               "enabled".format(query)
+    assert result.data == base_data, "{0} returned different results when result " \
+                                     "spooling was enabled".format(query)
+
+
+class TestResultSpoolingCancellation(ImpalaTestSuite):
+  """Test cancellation of queries when result spooling is enabled. This class heavily
+  borrows from the cancellation tests in test_cancellation.py. It uses the following test
+  dimensions: 'query' and 'cancel_delay'. 'query' is a list of queries to run
+  asynchronously and then cancel. 'cancel_delay' controls how long a query should run
+  before being cancelled.
+  """
+
+  @classmethod
+  def get_workload(cls):
+    return 'tpch'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestResultSpoolingCancellation, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('query',
+        *CANCELLATION_QUERIES))
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('cancel_delay',
+        *CANCEL_DELAY_IN_SECONDS))
+
+    # Result spooling should be independent of file format, so only testing for
+    # table_format=parquet/none in order to avoid a test dimension explosion.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  def test_cancellation(self, vector):
+    vector.get_value('exec_option')['spool_query_results'] = 'true'
+    cancel_query_and_validate_state(self.client, vector.get_value('query'),
+        vector.get_value('exec_option'), vector.get_value('table_format'),
+        vector.get_value('cancel_delay'))
+
+  def test_cancel_no_fetch(self, vector):
+    """Test cancelling a query before any results are fetched. Unlike the
+    test_cancellation test, the query is cancelled before results are
+    fetched (there is no fetch thread)."""
+    vector.get_value('exec_option')['spool_query_results'] = 'true'
+    handle = None
+    try:
+      handle = self.execute_query_async(vector.get_value('query'),
+          vector.get_value('exec_option'))
+      sleep(vector.get_value('cancel_delay'))
+      cancel_result = self.client.cancel(handle)
+      assert cancel_result.status_code == 0,\
+          'Unexpected status code from cancel request: {0}'.format(cancel_result)
+    finally:
+      if handle: self.client.close_query(handle)
diff --git a/tests/util/cancel_util.py b/tests/util/cancel_util.py
new file mode 100644
index 0000000..5c08d6a
--- /dev/null
+++ b/tests/util/cancel_util.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import threading
+from time import sleep
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+def cancel_query_and_validate_state(client, query, exec_option, table_format,
+    cancel_delay, join_before_close=False):
+  """Runs the given query asynchronously and then cancels it after the specified delay.
+  The query is run with the given 'exec_options' against the specified 'table_format'. A
+  separate async thread is launched to fetch the results of the query. The method
+  validates that the query was successfully cancelled and that the error messages for the
+  calls to ImpalaConnection#fetch and #close are consistent. If 'join_before_close' is
+  True the method will join against the fetch results thread before closing the query.
+  """
+  if exec_option: client.set_configuration(exec_option)
+  if table_format: ImpalaTestSuite.change_database(client, table_format)
+  handle = client.execute_async(query)
+
+  thread = threading.Thread(target=__fetch_results, args=(query, handle))
+  thread.start()
+
+  sleep(cancel_delay)
+  assert client.get_state(handle) != client.QUERY_STATES['EXCEPTION']
+  cancel_result = client.cancel(handle)
+  assert cancel_result.status_code == 0,\
+      'Unexpected status code from cancel request: %s' % cancel_result
+
+  if join_before_close:
+    thread.join()
+
+  close_error = None
+  try:
+    client.close_query(handle)
+  except ImpalaBeeswaxException as e:
+    close_error = e
+
+  # Before accessing fetch_results_error we need to join the fetch thread
+  thread.join()
+
+  if thread.fetch_results_error is None:
+    # If the fetch rpc didn't result in CANCELLED (and auto-close the query) then
+    # the close rpc should have succeeded.
+    assert close_error is None
+  elif close_error is None:
+    # If the close rpc succeeded, then the fetch rpc should have either succeeded,
+    # failed with 'Cancelled' or failed with 'Invalid query handle' (if the close
+    # rpc occured before the fetch rpc).
+    if thread.fetch_results_error is not None:
+      assert 'Cancelled' in str(thread.fetch_results_error) or \
+        ('Invalid query handle' in str(thread.fetch_results_error)
+         and not join_before_close)
+  else:
+    # If the close rpc encountered an exception, then it must be due to fetch
+    # noticing the cancellation and doing the auto-close.
+    assert 'Invalid or unknown query handle' in str(close_error)
+    assert 'Cancelled' in str(thread.fetch_results_error)
+
+  # TODO: Add some additional verification to check to make sure the query was
+  # actually canceled
+
+
+def __fetch_results(query, handle):
+  threading.current_thread().fetch_results_error = None
+  threading.current_thread().query_profile = None
+  try:
+    new_client = ImpalaTestSuite.create_impala_client()
+    new_client.fetch(query, handle)
+  except ImpalaBeeswaxException as e:
+    threading.current_thread().fetch_results_error = e