You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/08/06 20:12:55 UTC

[impala] 02/02: IMPALA-10005: Fix Snappy decompression for non-block filesystems

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit dbbd40308a6d1cef77bfe45e016e775c918e0539
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Thu Jul 23 20:44:30 2020 -0700

    IMPALA-10005: Fix Snappy decompression for non-block filesystems
    
    Snappy-compressed text always uses THdfsCompression::SNAPPY_BLOCKED
    type compression in the backend. However, for non-block filesystems,
    the frontend is incorrectly passing THdfsCompression::SNAPPY instead.
    On debug builds, this leads to a DCHECK when trying to read
    Snappy-compressed text. On release builds, it fails to decompress
    the data.
    
    This fixes the frontend to always pass THdfsCompression::SNAPPY_BLOCKED
    for Snappy-compressed text.
    
    This reworks query_test/test_compressed_formats.py to provide better
    coverage:
     - Changed the RC and Seq test cases to verify that the file extension
       doesn't matter. Added Avro to this case as well.
     - Fixed the text case to use appropriate extensions (fixing IMPALA-9004)
     - Changed the utility function so it doesn't use Hive. This allows it
       to be enabled on non-HDFS filesystems like S3.
     - Changed the test to use unique_database and allow parallel execution.
     - Changed the test to run in the core job, so it now has coverage on
       the usual S3 test configuration. It is reasonably quick (1-2 minutes)
       and runs in parallel.
    
    Testing:
     - Exhaustive job
     - Core s3 job
     - Changed the frontend to force it to use the code for non-block
       filesystems (i.e. the TFileSplitGeneratorSpec code) and
       verified that it is now able to read Snappy-compressed text.
    
    Change-Id: I0879f2fc0bf75bb5c15cecb845ece46a901601ac
    Reviewed-on: http://gerrit.cloudera.org:8080/16278
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Sahil Takiar <st...@cloudera.com>
---
 .../org/apache/impala/catalog/HdfsCompression.java |  20 +-
 tests/query_test/test_compressed_formats.py        | 202 +++++++++++++--------
 2 files changed, 135 insertions(+), 87 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
index df76463..153106d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
@@ -24,13 +24,15 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
 /**
- * Support for recognizing compression suffixes on data files.
+ * Support for recognizing compression suffixes on data files. This is currently
+ * limited to text files. Other file formats embed metadata about the compression
+ * type and do not use the file suffixes.
  * Compression of a file is recognized in mapreduce by looking for suffixes of
  * supported codecs.
- * For now Impala supports GZIP, SNAPPY, BZIP2 and some additional formats if plugins
- * are available. Even if a plugin is available, we need to add the file suffixes here so
- * that we can resolve the compression type from the file name. LZO can use the specific
- * HIVE input class.
+ * For now Impala supports GZIP, SNAPPY_BLOCKED, BZIP2 and some additional formats if
+ * plugins are available. Even if a plugin is available, we need to add the file suffixes
+ * here so that we can resolve the compression type from the file name. LZO can use the
+ * specific HIVE input class.
  * Some compression types here are detected even though they are not supported. This
  * allows for better error messages (e.g. LZ4, LZO).
  */
@@ -39,7 +41,7 @@ public enum HdfsCompression {
   DEFLATE,
   GZIP,
   BZIP2,
-  SNAPPY,
+  SNAPPY_BLOCKED,
   LZO,
   LZO_INDEX, //Lzo index file.
   LZ4,
@@ -51,7 +53,7 @@ public enum HdfsCompression {
           put("deflate", DEFLATE).
           put("gz", GZIP).
           put("bz2", BZIP2).
-          put("snappy", SNAPPY).
+          put("snappy", SNAPPY_BLOCKED).
           put("lzo", LZO).
           put("index", LZO_INDEX).
           put("lz4", LZ4).
@@ -76,7 +78,7 @@ public enum HdfsCompression {
     case DEFLATE: return THdfsCompression.DEFLATE;
     case GZIP: return THdfsCompression.GZIP;
     case BZIP2: return THdfsCompression.BZIP2;
-    case SNAPPY: return THdfsCompression.SNAPPY_BLOCKED;
+    case SNAPPY_BLOCKED: return THdfsCompression.SNAPPY_BLOCKED;
     case LZO: return THdfsCompression.LZO;
     case LZ4: return THdfsCompression.LZ4;
     case ZSTD: return THdfsCompression.ZSTD;
@@ -90,7 +92,7 @@ public enum HdfsCompression {
       case DEFLATE: return FbCompression.DEFLATE;
       case GZIP: return FbCompression.GZIP;
       case BZIP2: return FbCompression.BZIP2;
-      case SNAPPY: return FbCompression.SNAPPY;
+      case SNAPPY_BLOCKED: return FbCompression.SNAPPY_BLOCKED;
       case LZO: return FbCompression.LZO;
       case LZ4: return FbCompression.LZ4;
       case ZSTD: return FbCompression.ZSTD;
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 8aa4705..f295bed 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -18,13 +18,12 @@
 import math
 import os
 import pytest
+import random
 import struct
 import subprocess
 from os.path import join
 
-from tests.common.environ import EXTERNAL_WAREHOUSE_DIR
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.common.test_result_verifier import verify_query_result_is_equal
 from tests.common.test_vector import ImpalaTestDimension
@@ -33,25 +32,19 @@ from tests.util.filesystem_utils import get_fs_path
 
 # (file extension, table suffix) pairs
 compression_formats = [
-  ('.bz2',     'bzip'),
+  ('.bz2', 'bzip'),
   ('.deflate', 'def'),
-  ('.gz',      'gzip'),
-  ('.snappy',  'snap'),
+  ('.gz', 'gzip'),
+  ('.snappy', 'snap'),
  ]
 
+compression_extensions = ['.bz2', '.deflate', '.gz', '.snappy']
 
-# Missing Coverage: Compressed data written by Hive is queriable by Impala on a non-hdfs
-# filesystem.
-@SkipIfS3.hive
-@SkipIfABFS.hive
-@SkipIfADLS.hive
-@SkipIfIsilon.hive
-@SkipIfLocal.hive
-class TestCompressedFormats(ImpalaTestSuite):
+
+class TestCompressedFormatsBase(ImpalaTestSuite):
   """
-  Tests that we support compressed RC, sequence and text files and that unsupported
-  formats fail gracefully (see IMPALA-14: Files with .gz extension reported as 'not
-  supported').
+  Base class to provide utility functions for testing support for compressed
+  data files.
   """
   @classmethod
   def get_workload(self):
@@ -59,64 +52,43 @@ class TestCompressedFormats(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestCompressedFormats, cls).add_test_dimensions()
-    cls.ImpalaTestMatrix.clear()
-    cls.ImpalaTestMatrix.add_dimension(\
-        ImpalaTestDimension('file_format', *['rc', 'seq', 'text']))
-    cls.ImpalaTestMatrix.add_dimension(\
-        ImpalaTestDimension('compression_format', *compression_formats))
-    if cls.exploration_strategy() == 'core':
-      # Don't run on core.  This test is very slow and we are unlikely
-      # to regress here.
-      cls.ImpalaTestMatrix.add_constraint(lambda v: False);
+    super(TestCompressedFormatsBase, cls).add_test_dimensions()
 
-  @pytest.mark.execute_serially
-  def test_compressed_formats(self, vector):
-    file_format = vector.get_value('file_format')
-    extension, suffix = vector.get_value('compression_format')
-    if file_format in ['rc', 'seq']:
-      # Test that {gzip,snappy,bzip,deflate}-compressed
-      # {RC,sequence,text} files are supported.
-      db_suffix = '_%s_%s' % (file_format, suffix)
-      self._copy_and_query_compressed_file(
-        'tinytable', db_suffix, suffix, '000000_0', extension)
-    elif file_format is 'text':
-        pytest.xfail('IMPALA-9004: TestCompressedFormats is broken for text files')
-    else:
-      assert False, "Unknown file_format: %s" % file_format
-
-  # TODO: switch to using hive metastore API rather than hive shell.
-  def _copy_and_query_compressed_file(self, table_name, db_suffix, compression_codec,
-                                     file_name, extension, expected_error=None):
-    # We want to create a test table with a compressed file that has a file
-    # extension. We'll do this by making a copy of an existing table with hive.
+  def _copy_and_query_compressed_file(self, unique_database, table_name, db_suffix,
+      file_basename, src_extension, dest_extension, expected_error=None):
+    """
+    This is a utility function to test the behavior for compressed file formats
+    with different file extensions. It creates a new table in the unique_database
+    as a copy of the provided table_name from the functional schema with the
+    specified db_suffix. It copies file_basename + src_extension to the new
+    table as file_basename + dest_extension. It then runs a query on the
+    new table. Unless expected_error is set, it expects the query to run successfully.
+    """
+    # Calculate locations for the source table
     base_dir = '/test-warehouse'
-    src_table = 'functional%s.%s' % (db_suffix, table_name)
-    src_table_dir = "%s%s" % (table_name, db_suffix)
-    src_table_dir = join(base_dir, src_table_dir)
-    src_file = join(src_table_dir, file_name)
-
-    # Make sure destination table uses suffix, even if use_suffix=False, so
-    # unique tables are created for each compression format
-    # In Hive3+ create table like behavior is still in discussion, add location
-    # to avoid impact on Impala test.
-    dest_base_dir = '/{0}'.format(EXTERNAL_WAREHOUSE_DIR)
-    dest_table = '%s_%s_copy' % (table_name, compression_codec)
-    dest_table_dir = join(dest_base_dir, dest_table)
-    dest_file = join(dest_table_dir, file_name + extension)
-
-    drop_cmd = 'DROP TABLE IF EXISTS %s;' % (dest_table)
-    hive_cmd = drop_cmd + 'CREATE TABLE %s LIKE %s LOCATION \'%s\';' % \
-      (dest_table, src_table, dest_table_dir)
-
-    # Create the table
-    self.run_stmt_in_hive(hive_cmd)
+    src_table = "functional{0}.{1}".format(db_suffix, table_name)
+    src_table_dir = join(base_dir, table_name + db_suffix)
+    src_file = join(src_table_dir, file_basename + src_extension)
+
+    # Calculate locations for the destination table
+    dest_table_dir = "/test-warehouse/{0}.db/{1}".format(unique_database, table_name)
+    dest_table = "{0}.{1}".format(unique_database, table_name)
+    dest_file = join(dest_table_dir, file_basename + dest_extension)
+
+    # Use a specific location to avoid any interactions with Hive behavior changes.
+    drop_cmd = "DROP TABLE IF EXISTS {0};".format(dest_table)
+    create_cmd = "CREATE TABLE {0} LIKE {1} LOCATION \'{2}\';".format(
+      dest_table, src_table, dest_table_dir)
+
+    # Create the table and copy in the data file
+    self.client.execute(drop_cmd)
+    self.client.execute(create_cmd)
     self.filesystem_client.copy(src_file, dest_file, overwrite=True)
     # Try to read the compressed file with extension
-    query = 'select count(*) from %s' % dest_table
+    query = "select count(*) from {0};".format(dest_table)
     try:
-      # Need to invalidate the metadata because the table was created external to Impala.
-      self.client.execute("invalidate metadata %s" % dest_table)
+      # Need to refresh the metadata to see the file copied in.
+      self.client.execute("refresh {0}".format(dest_table))
       result = self.execute_scalar(query)
       # Fail iff we expected an error
       assert expected_error is None, 'Query is expected to fail'
@@ -125,12 +97,84 @@ class TestCompressedFormats(ImpalaTestSuite):
       error_msg = str(e)
       print error_msg
       if expected_error is None or expected_error not in error_msg:
-        print "Unexpected error:\n%s", error_msg
+        print("Unexpected error:\n{0}".format(error_msg))
         raise
     finally:
-      self.run_stmt_in_hive(drop_cmd)
+      self.client.execute(drop_cmd)
       self.filesystem_client.delete_file_dir(dest_file)
 
+
+class TestCompressedNonText(TestCompressedFormatsBase):
+  """Tests behavior for compressed non-text formats (avro, rc, seq)."""
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestCompressedNonText, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.clear()
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('file_format', *['rc', 'seq', 'avro']))
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('compression_format', *compression_formats))
+
+  def test_insensitivity_to_extension(self, vector, unique_database):
+    """
+    Avro, RC, and Sequence files do not use the file extension to determine the
+    type of compression. This verifies that they work regardless of the
+    extension.
+    """
+    file_format = vector.get_value('file_format')
+    right_extension, suffix = vector.get_value('compression_format')
+    # Avro is only loaded in a subset of the compression types. Bail out for
+    # the ones that are not loaded.
+    if file_format == 'avro' and suffix not in ['def', 'snap']:
+      pytest.xfail('Avro is only created for Deflate and Snappy compression codecs')
+    db_suffix = '_{0}_{1}'.format(file_format, suffix)
+
+    # Pick one wrong extension randomly
+    wrong_extensions = [ext for ext in compression_extensions if ext != right_extension]
+    random.shuffle(wrong_extensions)
+    wrong_extension = wrong_extensions[0]
+    # Test with the "right" extension that matches the file's compression, one wrong
+    # extension, and no extension. By default, Hive does not use a file extension.
+    src_extension = ""
+    for ext in [right_extension, wrong_extension, ""]:
+      self._copy_and_query_compressed_file(
+        unique_database, 'tinytable', db_suffix, '000000_0', src_extension, ext)
+
+
+class TestCompressedText(TestCompressedFormatsBase):
+  """
+  Tests behavior for compressed text files, which determine the compression codec from
+  the file extension.
+  """
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestCompressedText, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.clear()
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('file_format', *['text']))
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('compression_format', *compression_formats))
+
+  def test_correct_extension(self, vector, unique_database):
+    """
+    Text files use the file extension to determine the type of compression.
+    By default, Hive creates files with the appropriate file extension.
+    This verifies the positive case that the correct extension works.
+
+    This is a somewhat trivial test. However, it runs with the core exploration
+    strategy and runs on all filesystems. It verifies formats on core that are
+    otherwise limited to exhaustive. That is important for coverage on non-HDFS
+    filesystems like s3.
+    """
+    file_format = vector.get_value('file_format')
+    extension, suffix = vector.get_value('compression_format')
+    db_suffix = '_{0}_{1}'.format(file_format, suffix)
+    self._copy_and_query_compressed_file(
+      unique_database, 'tinytable', db_suffix, '000000_0', extension, extension)
+
+
 class TestUnsupportedTableWriters(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):
@@ -143,7 +187,7 @@ class TestUnsupportedTableWriters(ImpalaTestSuite):
     # This class tests different formats, but doesn't use constraints.
     # The constraint added below is only to make sure that the test file runs once.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        (v.get_value('table_format').file_format =='text' and
+        (v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'none'))
 
   def test_error_message(self, vector, unique_database):
@@ -151,6 +195,7 @@ class TestUnsupportedTableWriters(ImpalaTestSuite):
     # compressed text, avro and sequence.
     self.run_test_case('QueryTest/unsupported-writers', vector, unique_database)
 
+
 @pytest.mark.execute_serially
 class TestLargeCompressedFile(ImpalaTestSuite):
   """
@@ -182,7 +227,7 @@ class TestLargeCompressedFile(ImpalaTestSuite):
     if cls.exploration_strategy() != 'exhaustive':
       pytest.skip("skipping if it's not exhaustive test.")
     cls.ImpalaTestMatrix.add_constraint(lambda v:
-        (v.get_value('table_format').file_format =='text' and
+        (v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'snap'))
 
   def teardown_method(self, method):
@@ -228,24 +273,25 @@ class TestLargeCompressedFile(ImpalaTestSuite):
     hdfs_put.wait()
 
   def test_query_large_file(self, vector):
-    self.__create_test_table();
+    self.__create_test_table()
     dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
     file_size = self.MAX_FILE_SIZE
     self.__generate_file(dst_path, file_size)
     self.client.execute("refresh %s" % self.TABLE_NAME)
 
     # Query the table
-    result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
+    self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
 
   def __create_test_table(self):
     self.__drop_test_table()
-    self.client.execute("CREATE TABLE %s (col string) " \
+    self.client.execute("CREATE TABLE %s (col string) "
         "ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
         % (self.TABLE_NAME, self.TABLE_LOCATION))
 
   def __drop_test_table(self):
     self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)
 
+
 class TestBzip2Streaming(ImpalaTestSuite):
   MAX_SCAN_RANGE_LENGTHS = [0, 5]
 
@@ -261,8 +307,8 @@ class TestBzip2Streaming(ImpalaTestSuite):
       pytest.skip("skipping if it's not exhaustive test.")
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('max_scan_range_length', *cls.MAX_SCAN_RANGE_LENGTHS))
-    cls.ImpalaTestMatrix.add_constraint(lambda v:\
-        v.get_value('table_format').file_format == 'text' and\
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'text' and
         v.get_value('table_format').compression_codec == 'bzip')
 
   def test_bzip2_streaming(self, vector):