You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/20 06:30:07 UTC

incubator-impala git commit: IMPALA-3729: batch_size=1 coverage for avro scanner

Repository: incubator-impala
Updated Branches:
  refs/heads/master 3fbd9c338 -> bc8c55afc


IMPALA-3729: batch_size=1 coverage for avro scanner

Also fix a stale comment in the avro scanner header.

The main work here is to fix the handling of empty result sets in the
test result verifier. This is a problem because we wanted to verify
that the results in the test file were a superset of the rows
returned, and this was thrown off by superflous '' rows in the expected
and actual result sets.

The basic problem is that the way test file sections
was parsed conflated an empty result section with non-empty result
section that had a single empty string. I.e.:

---- RESULTS
====

vs
---- RESULTS

====

both got resolved to [''].

Change-Id: Ia007e558d92c7e4ce30be90446fdbb1f50a0ebc4
Reviewed-on: http://gerrit.cloudera.org:8080/3413
Tested-by: Internal Jenkins
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bc8c55af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bc8c55af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bc8c55af

Branch: refs/heads/master
Commit: bc8c55afcd24054ad57a61f878e4a4669c190bf0
Parents: 3fbd9c3
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Jun 21 16:56:21 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Jul 19 23:30:02 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hdfs-avro-scanner.h                 |  4 +--
 bin/load-data.py                                |  7 ++--
 testdata/bin/generate-schema-statements.py      |  8 ++---
 .../queries/DataErrorsTest/avro-errors.test     |  6 ++--
 .../queries/QueryTest/load.test                 |  2 --
 .../QueryTest/test-unmatched-schema.test        |  5 ---
 tests/beeswax/impala_beeswax.py                 |  2 +-
 tests/common/test_result_verifier.py            | 36 ++++++++++++++------
 tests/data_errors/test_data_errors.py           |  8 +++++
 tests/unittests/test_file_parser.py             | 22 ++++++------
 tests/util/test_file_parser.py                  | 25 ++++++++++++--
 11 files changed, 84 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index 961b2da..ac2ba40 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -260,8 +260,8 @@ class HdfsAvroScanner : public BaseSequenceScanner {
       bool write_slot, void* slot, MemPool* pool);
 
   /// Helper function for some of the above. Returns the the length of certain varlen
-  /// types and updates 'data'. Returns true on success, returns false and updates
-  /// parse_status_ on error.
+  /// types and updates 'data'. If an error is encountered returns a non-ok result and
+  /// updates parse_status_.
   ReadWriteUtil::ZLongResult ReadFieldLen(uint8_t** data, uint8_t* data_end);
 
   /// Same as the above functions, except takes the size of the decimal slot (i.e. 4, 8, or

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/bin/load-data.py
----------------------------------------------------------------------
diff --git a/bin/load-data.py b/bin/load-data.py
index b0fb535..7c0bbf6 100755
--- a/bin/load-data.py
+++ b/bin/load-data.py
@@ -5,6 +5,7 @@
 # all data via Hive except for parquet data which needs to be loaded via Impala.
 # Most ddl commands are executed by Impala.
 import collections
+import getpass
 import os
 import re
 import sqlparse
@@ -12,7 +13,7 @@ import subprocess
 import sys
 import tempfile
 import time
-import getpass
+import traceback
 from itertools import product
 from optparse import OptionParser
 from Queue import Queue
@@ -137,9 +138,11 @@ def exec_impala_query_from_file(file_name):
     for query in queries:
       query = sqlparse.format(query.rstrip(';'), strip_comments=True)
       print '(%s):\n%s\n' % (file_name, query.strip())
-      result = impala_client.execute(query)
+      if query.strip() != "":
+        result = impala_client.execute(query)
   except Exception as e:
     print "Data Loading from Impala failed with error: %s" % str(e)
+    traceback.print_exc()
     is_success = False
   finally:
     impala_client.close_connection()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/testdata/bin/generate-schema-statements.py
----------------------------------------------------------------------
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index f372b18..959bb4d 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -479,7 +479,7 @@ def generate_statements(output_name, test_vectors, sections,
         [row.file_format, row.dataset, row.compression_codec, row.compression_type]
     table_format = '%s/%s/%s' % (file_format, codec, compression_type)
     for section in sections:
-      table_name = section['BASE_TABLE_NAME']
+      table_name = section['BASE_TABLE_NAME'].strip()
       db_suffix = build_db_suffix(file_format, codec, compression_type)
       db_name = '{0}{1}'.format(data_set, options.scale_factor)
       db = '{0}{1}'.format(db_name, db_suffix)
@@ -523,9 +523,9 @@ def generate_statements(output_name, test_vectors, sections,
       if not options.scale_factor and section['LOAD_LOCAL']:
         load = section['LOAD_LOCAL']
 
-      columns = eval_section(section['COLUMNS'])
-      partition_columns = section['PARTITION_COLUMNS']
-      row_format = section['ROW_FORMAT']
+      columns = eval_section(section['COLUMNS']).strip()
+      partition_columns = section['PARTITION_COLUMNS'].strip()
+      row_format = section['ROW_FORMAT'].strip()
 
       # Force reloading of the table if the user specified the --force option or
       # if the table is partitioned and there was no ALTER section specified. This is to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test b/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
index 87d2930..aaf59e9 100644
--- a/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
+++ b/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test
@@ -2,7 +2,8 @@
 ---- QUERY
 # Read from the corrupt files. We may get partial results.
 select * from bad_avro_snap_strings
----- RESULTS
+---- RESULTS: VERIFY_IS_SUPERSET
+'valid'
 ---- TYPES
 string
 ---- ERRORS
@@ -15,7 +16,8 @@ row_regex: .*File '.*/bad_avro_snap_strings_avro_snap/invalid_union.avro' is cor
 ---- QUERY
 # Read from the corrupt files. We may get partial results.
 select * from bad_avro_snap_floats
----- RESULTS
+---- RESULTS: VERIFY_IS_SUPERSET
+1
 ---- TYPES
 float
 ---- ERRORS

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/testdata/workloads/functional-query/queries/QueryTest/load.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/load.test b/testdata/workloads/functional-query/queries/QueryTest/load.test
index 5719d96..060868f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/load.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/load.test
@@ -3,13 +3,11 @@
 alter table functional.test_load add partition
 (year=2009, month=1)
 ---- RESULTS
-
 ====
 ---- QUERY
 alter table functional.test_load add partition
 (year=2010, month=1)
 ---- RESULTS
-
 ====
 ---- QUERY
 # Insert some data into one of the partitions, used to verify we are not clobbering

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/testdata/workloads/functional-query/queries/QueryTest/test-unmatched-schema.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/test-unmatched-schema.test b/testdata/workloads/functional-query/queries/QueryTest/test-unmatched-schema.test
index 2fb30f3..926c5bf 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/test-unmatched-schema.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/test-unmatched-schema.test
@@ -29,7 +29,6 @@ BIGINT, STRING, INT, INT
 ---- QUERY
 alter table jointbl_test add columns(new_col string)
 ---- RESULTS
-
 ====
 ---- QUERY
 select * from jointbl_test
@@ -59,7 +58,6 @@ BIGINT, STRING, INT, INT, STRING
 ---- QUERY
 alter table jointbl_test add columns(new_int_col int)
 ---- RESULTS
-
 ====
 ---- QUERY
 select * from jointbl_test
@@ -89,7 +87,6 @@ BIGINT, STRING, INT, INT, STRING, INT
 ---- QUERY
 alter table jointbl_test drop column new_int_col
 ---- RESULTS
-
 ====
 ---- QUERY
 select * from jointbl_test
@@ -119,12 +116,10 @@ BIGINT, STRING, INT, INT, STRING
 ---- QUERY
 alter table jointbl_test drop column new_col
 ---- RESULTS
-
 ====
 ---- QUERY
 alter table jointbl_test drop column alltypes_id
 ---- RESULTS
-
 ====
 ---- QUERY
 select * from jointbl_test

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/tests/beeswax/impala_beeswax.py
----------------------------------------------------------------------
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index 983e1fa..319d04c 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -369,7 +369,7 @@ class ImpalaBeeswaxClient(object):
     if query_type == 'use':
       # TODO: "use <database>" does not currently throw an error. Need to update this
       # to handle the error case once that behavior has been changed.
-      return ImpalaBeeswaxResult(query=query_string, success=True, data=[''])
+      return ImpalaBeeswaxResult(query=query_string, success=True, data=[])
 
     # Result fetching for insert is different from other queries.
     exec_result = None

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/tests/common/test_result_verifier.py
----------------------------------------------------------------------
diff --git a/tests/common/test_result_verifier.py b/tests/common/test_result_verifier.py
index 86db19e..4ec4883 100644
--- a/tests/common/test_result_verifier.py
+++ b/tests/common/test_result_verifier.py
@@ -6,7 +6,9 @@ import logging
 import math
 import re
 
-from tests.util.test_file_parser import remove_comments
+from functools import wraps
+from tests.util.test_file_parser import (join_section_lines, remove_comments,
+    split_section_lines)
 
 logging.basicConfig(level=logging.INFO, format='%(threadName)s: %(message)s')
 LOG = logging.getLogger('test_result_verfier')
@@ -189,12 +191,25 @@ def assert_args_not_none(*args):
   for arg in args:
     assert arg is not None
 
-def verify_query_result_is_subset(expected_results, actual_results):
+def convert_results_to_sets(expected_results, actual_results):
   assert_args_not_none(expected_results, actual_results)
   expected_set = set(map(str, expected_results.rows))
   actual_set = set(map(str, actual_results.rows))
+  return expected_set, actual_set
+
+def verify_query_result_is_subset(expected_results, actual_results):
+  """Check whether the results in expected_results are a subset of the results in
+  actual_results. This uses set semantics, i.e. any duplicates are ignored."""
+  expected_set, actual_set = convert_results_to_sets(expected_results, actual_results)
   assert expected_set <= actual_set
 
+def verify_query_result_is_superset(expected_results, actual_results):
+  """Check whether the results in expected_results are a superset of the results in
+  actual_results. This uses set semantics, i.e. any duplicates are ignored."""
+  expected_set, actual_set = convert_results_to_sets(expected_results, actual_results)
+  assert expected_set >= actual_set
+
+
 def verify_query_result_is_equal(expected_results, actual_results):
   assert_args_not_none(expected_results, actual_results)
   assert expected_results == actual_results
@@ -210,6 +225,7 @@ def verify_query_result_is_not_in(expected_results, actual_results):
 # add more verifiers in the future. If a tag is not found, it defaults to verifying
 # equality.
 VERIFIER_MAP = {'VERIFY_IS_SUBSET' : verify_query_result_is_subset,
+                'VERIFY_IS_SUPERSET' : verify_query_result_is_superset,
                 'VERIFY_IS_EQUAL_SORTED'  : verify_query_result_is_equal,
                 'VERIFY_IS_EQUAL'  : verify_query_result_is_equal,
                 'VERIFY_IS_NOT_IN' : verify_query_result_is_not_in,
@@ -271,13 +287,13 @@ def verify_raw_results(test_section, exec_result, file_format, update_section=Fa
     return
 
   if 'ERRORS' in test_section:
-    expected_errors = remove_comments(test_section['ERRORS']).split('\n')
+    expected_errors = split_section_lines(remove_comments(test_section['ERRORS']))
     actual_errors = apply_error_match_filter(exec_result.log.split('\n'))
     try:
       verify_errors(expected_errors, actual_errors)
     except AssertionError:
       if update_section:
-        test_section['ERRORS'] = '\n'.join(actual_errors)
+        test_section['ERRORS'] = join_section_lines(actual_errors)
       else:
         raise
 
@@ -285,7 +301,7 @@ def verify_raw_results(test_section, exec_result, file_format, update_section=Fa
     # Distinguish between an empty list and a list with an empty string.
     expected_types = list()
     if test_section.get('TYPES'):
-      expected_types = [c.strip().upper() for c in test_section['TYPES'].split(',')]
+      expected_types = [c.strip().upper() for c in test_section['TYPES'].rstrip('\n').split(',')]
 
     # Avro does not support as many types as Hive, so the Avro test tables may
     # have different column types than we expect (e.g., INT instead of
@@ -305,7 +321,7 @@ def verify_raw_results(test_section, exec_result, file_format, update_section=Fa
       verify_results(expected_types, actual_types, order_matters=True)
     except AssertionError:
       if update_section:
-        test_section['TYPES'] = ', '.join(actual_types)
+        test_section['TYPES'] = join_section_lines([', '.join(actual_types)])
       else:
         raise
   else:
@@ -327,7 +343,7 @@ def verify_raw_results(test_section, exec_result, file_format, update_section=Fa
       verify_results(expected_labels, actual_labels, order_matters=True)
     except AssertionError:
       if update_section:
-        test_section['LABELS'] = ', '.join(actual_labels)
+        test_section['LABELS'] = join_section_lines([', '.join(actual_labels)])
       else:
         raise
 
@@ -351,7 +367,7 @@ def verify_raw_results(test_section, exec_result, file_format, update_section=Fa
     expected_results_list = map(lambda s: s.replace('\n', '\\n'),
         re.findall(r'\[(.*?)\]', expected_results, flags=re.DOTALL))
   else:
-    expected_results_list = expected_results.split('\n')
+    expected_results_list = split_section_lines(expected_results)
   expected = QueryTestResult(expected_results_list, expected_types,
       actual_labels, order_matters)
   actual = QueryTestResult(parse_result_rows(exec_result), actual_types,
@@ -361,7 +377,7 @@ def verify_raw_results(test_section, exec_result, file_format, update_section=Fa
     VERIFIER_MAP[verifier](expected, actual)
   except AssertionError:
     if update_section:
-      test_section['RESULTS'] = '\n'.join(actual.result_list)
+      test_section['RESULTS'] = join_section_lines(actual.result_list)
     else:
       raise
 
@@ -392,7 +408,7 @@ def parse_result_rows(exec_result):
   """
   raw_result = exec_result.data
   if not raw_result:
-    return ['']
+    return []
 
   # If the schema is 'None' assume this is an insert statement
   if exec_result.schema is None:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/tests/data_errors/test_data_errors.py
----------------------------------------------------------------------
diff --git a/tests/data_errors/test_data_errors.py b/tests/data_errors/test_data_errors.py
index 1e94a54..2e84423 100644
--- a/tests/data_errors/test_data_errors.py
+++ b/tests/data_errors/test_data_errors.py
@@ -16,14 +16,22 @@
 # Tests Impala properly handles errors when reading and writing data.
 
 import pytest
+import random
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfS3, SkipIfLocal
+from tests.common.test_dimensions import create_exec_option_dimension
 
 class TestDataErrors(ImpalaTestSuite):
+  # batch_size of 1 can expose some interesting corner cases at row batch boundaries.
+  BATCH_SIZES = [0, 1]
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestDataErrors, cls).add_test_dimensions()
+    cls.TestMatrix.add_dimension(
+        create_exec_option_dimension(batch_sizes=cls.BATCH_SIZES))
+
 
   @classmethod
   def get_workload(self):

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/tests/unittests/test_file_parser.py
----------------------------------------------------------------------
diff --git a/tests/unittests/test_file_parser.py b/tests/unittests/test_file_parser.py
index b6f0860..7714020 100644
--- a/tests/unittests/test_file_parser.py
+++ b/tests/unittests/test_file_parser.py
@@ -48,8 +48,8 @@ class TestTestFileParser(BaseTestSuite):
     results = parse_test_file_text(test_text, VALID_SECTIONS)
     assert len(results) == 3
     print results[0]
-    expected_results = {'QUERY': '# comment\nSELECT blah from Foo\ns',
-                        'TYPES': 'string', 'RESULTS': "'Hi'"}
+    expected_results = {'QUERY': '# comment\nSELECT blah from Foo\ns\n',
+                        'TYPES': 'string\n', 'RESULTS': "'Hi'\n"}
     assert results[0] == expected_results
 
   def test_invalid_section(self):
@@ -57,8 +57,8 @@ class TestTestFileParser(BaseTestSuite):
     valid_sections = ['QUERY', 'RESULTS']
     results = parse_test_file_text(test_text, valid_sections, skip_unknown_sections=True)
     assert len(results) == 3
-    expected_results = {'QUERY': '# comment\nSELECT blah from Foo\ns',
-                        'RESULTS': "'Hi'"}
+    expected_results = {'QUERY': '# comment\nSELECT blah from Foo\ns\n',
+                        'RESULTS': "'Hi'\n"}
     assert results[0] == expected_results
 
     # In this case, instead of ignoring the invalid section we should get an error
@@ -72,20 +72,20 @@ class TestTestFileParser(BaseTestSuite):
   def test_parse_query_name(self):
     results = parse_test_file_text(test_text, VALID_SECTIONS, False)
     assert len(results) == 3
-    expected_results = {'QUERY': 'SELECT int_col from Bar',
-                        'TYPES': 'int', 'RESULTS': '231',
+    expected_results = {'QUERY': 'SELECT int_col from Bar\n',
+                        'TYPES': 'int\n', 'RESULTS': '231\n',
                         'QUERY_NAME': 'TEST_WORKLOAD_Q2'}
     assert results[2] == expected_results
 
   def test_parse_commented_out_test_as_comment(self):
     results = parse_test_file_text(test_text, VALID_SECTIONS)
     assert len(results) == 3
-    expected_results = {'QUERY': 'SELECT 2', 'RESULTS': "'Hello'",
+    expected_results = {'QUERY': 'SELECT 2\n', 'RESULTS': "'Hello'\n",
                         'TYPES': "string\n#====\n"\
-                        "# SHOULD PARSE COMMENTED OUT TEST PROPERLY\n"\
-                        "#---- QUERY: TEST_WORKLOAD_Q2\n"\
-                        "#SELECT int_col from Bar\n"\
-                        "#---- RESULTS\n#231\n#---- TYPES\n#int"}
+                        "# SHOULD PARSE COMMENTED OUT TEST PROPERLY\n"
+                        "#---- QUERY: TEST_WORKLOAD_Q2\n"
+                        "#SELECT int_col from Bar\n"
+                        "#---- RESULTS\n#231\n#---- TYPES\n#int\n"}
     print expected_results
     print results[1]
     assert results[1] == expected_results

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bc8c55af/tests/util/test_file_parser.py
----------------------------------------------------------------------
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index 5e73756..c19125c 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -21,9 +21,9 @@ SUBSECTION_DELIMITER = "----"
 class QueryTestSectionReader(object):
   @staticmethod
   def build_query(query_section_text):
-    """Build a query by stripping comments and trailing semi-colons."""
+    """Build a query by stripping comments and trailing newlines and semi-colons."""
     query_section_text = remove_comments(query_section_text)
-    return query_section_text.rstrip(';')
+    return query_section_text.rstrip("\n;")
 
   @staticmethod
   def get_table_name_components(table_format, table_name, scale_factor=''):
@@ -175,7 +175,12 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True):
         subsection_name, subsection_comment = subsection_info
 
       lines_content = lines[1:-1]
+
       subsection_str = '\n'.join([line for line in lines_content])
+      if len(lines_content) != 0:
+        # Add trailing newline to last line if present. This disambiguates between the
+        # case of no lines versus a single line with no text.
+        subsection_str += "\n"
 
       if subsection_name not in valid_section_names:
         if skip_unknown_sections or not subsection_name:
@@ -213,6 +218,22 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True):
       sections.append(parsed_sections)
   return sections
 
+def split_section_lines(section_str):
+  """
+  Given a section string as produced by parse_test_file_text(), split it into separate
+  lines. The section string must have a trailing newline.
+  """
+  if section_str == '':
+    return []
+  assert section_str[-1] == '\n'
+  # Trim off the trailing newline and split into lines.
+  return section_str[:-1].split('\n')
+
+def join_section_lines(lines):
+  """
+  The inverse of split_section_lines().
+  """
+  return '\n'.join(lines) + '\n'
 
 def write_test_file(test_file_name, test_file_sections, encoding=None):
   """