You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/08/06 20:12:55 UTC
[impala] 02/02: IMPALA-10005: Fix Snappy decompression for
non-block filesystems
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit dbbd40308a6d1cef77bfe45e016e775c918e0539
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Thu Jul 23 20:44:30 2020 -0700
IMPALA-10005: Fix Snappy decompression for non-block filesystems
Snappy-compressed text always uses THdfsCompression::SNAPPY_BLOCKED
type compression in the backend. However, for non-block filesystems,
the frontend is incorrectly passing THdfsCompression::SNAPPY instead.
On debug builds, this leads to a DCHECK when trying to read
Snappy-compressed text. On release builds, it fails to decompress
the data.
This fixes the frontend to always pass THdfsCompression::SNAPPY_BLOCKED
for Snappy-compressed text.
This reworks query_test/test_compressed_formats.py to provide better
coverage:
- Changed the RC and Seq test cases to verify that the file extension
doesn't matter. Added Avro to this case as well.
- Fixed the text case to use appropriate extensions (fixing IMPALA-9004)
- Changed the utility function so it doesn't use Hive. This allows it
to be enabled on non-HDFS filesystems like S3.
- Changed the test to use unique_database and allow parallel execution.
- Changed the test to run in the core job, so it now has coverage on
the usual S3 test configuration. It is reasonably quick (1-2 minutes)
and runs in parallel.
Testing:
- Exhaustive job
- Core s3 job
- Changed the frontend to force it to use the code for non-block
filesystems (i.e. the TFileSplitGeneratorSpec code) and
verified that it is now able to read Snappy-compressed text.
Change-Id: I0879f2fc0bf75bb5c15cecb845ece46a901601ac
Reviewed-on: http://gerrit.cloudera.org:8080/16278
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Sahil Takiar <st...@cloudera.com>
---
.../org/apache/impala/catalog/HdfsCompression.java | 20 +-
tests/query_test/test_compressed_formats.py | 202 +++++++++++++--------
2 files changed, 135 insertions(+), 87 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
index df76463..153106d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
@@ -24,13 +24,15 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
/**
- * Support for recognizing compression suffixes on data files.
+ * Support for recognizing compression suffixes on data files. This is currently
+ * limited to text files. Other file formats embed metadata about the compression
+ * type and do not use the file suffixes.
* Compression of a file is recognized in mapreduce by looking for suffixes of
* supported codecs.
- * For now Impala supports GZIP, SNAPPY, BZIP2 and some additional formats if plugins
- * are available. Even if a plugin is available, we need to add the file suffixes here so
- * that we can resolve the compression type from the file name. LZO can use the specific
- * HIVE input class.
+ * For now Impala supports GZIP, SNAPPY_BLOCKED, BZIP2 and some additional formats if
+ * plugins are available. Even if a plugin is available, we need to add the file suffixes
+ * here so that we can resolve the compression type from the file name. LZO can use the
+ * specific HIVE input class.
* Some compression types here are detected even though they are not supported. This
* allows for better error messages (e.g. LZ4, LZO).
*/
@@ -39,7 +41,7 @@ public enum HdfsCompression {
DEFLATE,
GZIP,
BZIP2,
- SNAPPY,
+ SNAPPY_BLOCKED,
LZO,
LZO_INDEX, //Lzo index file.
LZ4,
@@ -51,7 +53,7 @@ public enum HdfsCompression {
put("deflate", DEFLATE).
put("gz", GZIP).
put("bz2", BZIP2).
- put("snappy", SNAPPY).
+ put("snappy", SNAPPY_BLOCKED).
put("lzo", LZO).
put("index", LZO_INDEX).
put("lz4", LZ4).
@@ -76,7 +78,7 @@ public enum HdfsCompression {
case DEFLATE: return THdfsCompression.DEFLATE;
case GZIP: return THdfsCompression.GZIP;
case BZIP2: return THdfsCompression.BZIP2;
- case SNAPPY: return THdfsCompression.SNAPPY_BLOCKED;
+ case SNAPPY_BLOCKED: return THdfsCompression.SNAPPY_BLOCKED;
case LZO: return THdfsCompression.LZO;
case LZ4: return THdfsCompression.LZ4;
case ZSTD: return THdfsCompression.ZSTD;
@@ -90,7 +92,7 @@ public enum HdfsCompression {
case DEFLATE: return FbCompression.DEFLATE;
case GZIP: return FbCompression.GZIP;
case BZIP2: return FbCompression.BZIP2;
- case SNAPPY: return FbCompression.SNAPPY;
+ case SNAPPY_BLOCKED: return FbCompression.SNAPPY_BLOCKED;
case LZO: return FbCompression.LZO;
case LZ4: return FbCompression.LZ4;
case ZSTD: return FbCompression.ZSTD;
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 8aa4705..f295bed 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -18,13 +18,12 @@
import math
import os
import pytest
+import random
import struct
import subprocess
from os.path import join
-from tests.common.environ import EXTERNAL_WAREHOUSE_DIR
from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.common.test_result_verifier import verify_query_result_is_equal
from tests.common.test_vector import ImpalaTestDimension
@@ -33,25 +32,19 @@ from tests.util.filesystem_utils import get_fs_path
# (file extension, table suffix) pairs
compression_formats = [
- ('.bz2', 'bzip'),
+ ('.bz2', 'bzip'),
('.deflate', 'def'),
- ('.gz', 'gzip'),
- ('.snappy', 'snap'),
+ ('.gz', 'gzip'),
+ ('.snappy', 'snap'),
]
+compression_extensions = ['.bz2', '.deflate', '.gz', '.snappy']
-# Missing Coverage: Compressed data written by Hive is queriable by Impala on a non-hdfs
-# filesystem.
-@SkipIfS3.hive
-@SkipIfABFS.hive
-@SkipIfADLS.hive
-@SkipIfIsilon.hive
-@SkipIfLocal.hive
-class TestCompressedFormats(ImpalaTestSuite):
+
+class TestCompressedFormatsBase(ImpalaTestSuite):
"""
- Tests that we support compressed RC, sequence and text files and that unsupported
- formats fail gracefully (see IMPALA-14: Files with .gz extension reported as 'not
- supported').
+ Base class to provide utility functions for testing support for compressed
+ data files.
"""
@classmethod
def get_workload(self):
@@ -59,64 +52,43 @@ class TestCompressedFormats(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
- super(TestCompressedFormats, cls).add_test_dimensions()
- cls.ImpalaTestMatrix.clear()
- cls.ImpalaTestMatrix.add_dimension(\
- ImpalaTestDimension('file_format', *['rc', 'seq', 'text']))
- cls.ImpalaTestMatrix.add_dimension(\
- ImpalaTestDimension('compression_format', *compression_formats))
- if cls.exploration_strategy() == 'core':
- # Don't run on core. This test is very slow and we are unlikely
- # to regress here.
- cls.ImpalaTestMatrix.add_constraint(lambda v: False);
+ super(TestCompressedFormatsBase, cls).add_test_dimensions()
- @pytest.mark.execute_serially
- def test_compressed_formats(self, vector):
- file_format = vector.get_value('file_format')
- extension, suffix = vector.get_value('compression_format')
- if file_format in ['rc', 'seq']:
- # Test that {gzip,snappy,bzip,deflate}-compressed
- # {RC,sequence,text} files are supported.
- db_suffix = '_%s_%s' % (file_format, suffix)
- self._copy_and_query_compressed_file(
- 'tinytable', db_suffix, suffix, '000000_0', extension)
- elif file_format is 'text':
- pytest.xfail('IMPALA-9004: TestCompressedFormats is broken for text files')
- else:
- assert False, "Unknown file_format: %s" % file_format
-
- # TODO: switch to using hive metastore API rather than hive shell.
- def _copy_and_query_compressed_file(self, table_name, db_suffix, compression_codec,
- file_name, extension, expected_error=None):
- # We want to create a test table with a compressed file that has a file
- # extension. We'll do this by making a copy of an existing table with hive.
+ def _copy_and_query_compressed_file(self, unique_database, table_name, db_suffix,
+ file_basename, src_extension, dest_extension, expected_error=None):
+ """
+ This is a utility function to test the behavior for compressed file formats
+ with different file extensions. It creates a new table in the unique_database
+ as a copy of the provided table_name from the functional schema with the
+ specified db_suffix. It copies file_basename + src_extension to the new
+ table as file_basename + dest_extension. It then runs a query on the
+ new table. Unless expected_error is set, it expects the query to run successfully.
+ """
+ # Calculate locations for the source table
base_dir = '/test-warehouse'
- src_table = 'functional%s.%s' % (db_suffix, table_name)
- src_table_dir = "%s%s" % (table_name, db_suffix)
- src_table_dir = join(base_dir, src_table_dir)
- src_file = join(src_table_dir, file_name)
-
- # Make sure destination table uses suffix, even if use_suffix=False, so
- # unique tables are created for each compression format
- # In Hive3+ create table like behavior is still in discussion, add location
- # to avoid impact on Impala test.
- dest_base_dir = '/{0}'.format(EXTERNAL_WAREHOUSE_DIR)
- dest_table = '%s_%s_copy' % (table_name, compression_codec)
- dest_table_dir = join(dest_base_dir, dest_table)
- dest_file = join(dest_table_dir, file_name + extension)
-
- drop_cmd = 'DROP TABLE IF EXISTS %s;' % (dest_table)
- hive_cmd = drop_cmd + 'CREATE TABLE %s LIKE %s LOCATION \'%s\';' % \
- (dest_table, src_table, dest_table_dir)
-
- # Create the table
- self.run_stmt_in_hive(hive_cmd)
+ src_table = "functional{0}.{1}".format(db_suffix, table_name)
+ src_table_dir = join(base_dir, table_name + db_suffix)
+ src_file = join(src_table_dir, file_basename + src_extension)
+
+ # Calculate locations for the destination table
+ dest_table_dir = "/test-warehouse/{0}.db/{1}".format(unique_database, table_name)
+ dest_table = "{0}.{1}".format(unique_database, table_name)
+ dest_file = join(dest_table_dir, file_basename + dest_extension)
+
+ # Use a specific location to avoid any interactions with Hive behavior changes.
+ drop_cmd = "DROP TABLE IF EXISTS {0};".format(dest_table)
+ create_cmd = "CREATE TABLE {0} LIKE {1} LOCATION \'{2}\';".format(
+ dest_table, src_table, dest_table_dir)
+
+ # Create the table and copy in the data file
+ self.client.execute(drop_cmd)
+ self.client.execute(create_cmd)
self.filesystem_client.copy(src_file, dest_file, overwrite=True)
# Try to read the compressed file with extension
- query = 'select count(*) from %s' % dest_table
+ query = "select count(*) from {0};".format(dest_table)
try:
- # Need to invalidate the metadata because the table was created external to Impala.
- self.client.execute("invalidate metadata %s" % dest_table)
+ # Need to refresh the metadata to see the file copied in.
+ self.client.execute("refresh {0}".format(dest_table))
result = self.execute_scalar(query)
# Fail iff we expected an error
assert expected_error is None, 'Query is expected to fail'
@@ -125,12 +97,84 @@ class TestCompressedFormats(ImpalaTestSuite):
error_msg = str(e)
print error_msg
if expected_error is None or expected_error not in error_msg:
- print "Unexpected error:\n%s", error_msg
+ print("Unexpected error:\n{0}".format(error_msg))
raise
finally:
- self.run_stmt_in_hive(drop_cmd)
+ self.client.execute(drop_cmd)
self.filesystem_client.delete_file_dir(dest_file)
+
+class TestCompressedNonText(TestCompressedFormatsBase):
+ """Tests behavior for compressed non-text formats (avro, rc, seq)."""
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestCompressedNonText, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.clear()
+ cls.ImpalaTestMatrix.add_dimension(
+ ImpalaTestDimension('file_format', *['rc', 'seq', 'avro']))
+ cls.ImpalaTestMatrix.add_dimension(
+ ImpalaTestDimension('compression_format', *compression_formats))
+
+ def test_insensitivity_to_extension(self, vector, unique_database):
+ """
+ Avro, RC, and Sequence files do not use the file extension to determine the
+ type of compression. This verifies that they work regardless of the
+ extension.
+ """
+ file_format = vector.get_value('file_format')
+ right_extension, suffix = vector.get_value('compression_format')
+ # Avro is only loaded in a subset of the compression types. Bail out for
+ # the ones that are not loaded.
+ if file_format == 'avro' and suffix not in ['def', 'snap']:
+ pytest.xfail('Avro is only created for Deflate and Snappy compression codecs')
+ db_suffix = '_{0}_{1}'.format(file_format, suffix)
+
+ # Pick one wrong extension randomly
+ wrong_extensions = [ext for ext in compression_extensions if ext != right_extension]
+ random.shuffle(wrong_extensions)
+ wrong_extension = wrong_extensions[0]
+ # Test with the "right" extension that matches the file's compression, one wrong
+ # extension, and no extension. By default, Hive does not use a file extension.
+ src_extension = ""
+ for ext in [right_extension, wrong_extension, ""]:
+ self._copy_and_query_compressed_file(
+ unique_database, 'tinytable', db_suffix, '000000_0', src_extension, ext)
+
+
+class TestCompressedText(TestCompressedFormatsBase):
+ """
+ Tests behavior for compressed text files, which determine the compression codec from
+ the file extension.
+ """
+
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestCompressedText, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.clear()
+ cls.ImpalaTestMatrix.add_dimension(
+ ImpalaTestDimension('file_format', *['text']))
+ cls.ImpalaTestMatrix.add_dimension(
+ ImpalaTestDimension('compression_format', *compression_formats))
+
+ def test_correct_extension(self, vector, unique_database):
+ """
+ Text files use the file extension to determine the type of compression.
+ By default, Hive creates files with the appropriate file extension.
+ This verifies the positive case that the correct extension works.
+
+ This is a somewhat trivial test. However, it runs with the core exploration
+ strategy and runs on all filesystems. It verifies formats on core that are
+ otherwise limited to exhaustive. That is important for coverage on non-HDFS
+ filesystems like s3.
+ """
+ file_format = vector.get_value('file_format')
+ extension, suffix = vector.get_value('compression_format')
+ db_suffix = '_{0}_{1}'.format(file_format, suffix)
+ self._copy_and_query_compressed_file(
+ unique_database, 'tinytable', db_suffix, '000000_0', extension, extension)
+
+
class TestUnsupportedTableWriters(ImpalaTestSuite):
@classmethod
def get_workload(cls):
@@ -143,7 +187,7 @@ class TestUnsupportedTableWriters(ImpalaTestSuite):
# This class tests different formats, but doesn't use constraints.
# The constraint added below is only to make sure that the test file runs once.
cls.ImpalaTestMatrix.add_constraint(lambda v:
- (v.get_value('table_format').file_format =='text' and
+ (v.get_value('table_format').file_format == 'text' and
v.get_value('table_format').compression_codec == 'none'))
def test_error_message(self, vector, unique_database):
@@ -151,6 +195,7 @@ class TestUnsupportedTableWriters(ImpalaTestSuite):
# compressed text, avro and sequence.
self.run_test_case('QueryTest/unsupported-writers', vector, unique_database)
+
@pytest.mark.execute_serially
class TestLargeCompressedFile(ImpalaTestSuite):
"""
@@ -182,7 +227,7 @@ class TestLargeCompressedFile(ImpalaTestSuite):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip("skipping if it's not exhaustive test.")
cls.ImpalaTestMatrix.add_constraint(lambda v:
- (v.get_value('table_format').file_format =='text' and
+ (v.get_value('table_format').file_format == 'text' and
v.get_value('table_format').compression_codec == 'snap'))
def teardown_method(self, method):
@@ -228,24 +273,25 @@ class TestLargeCompressedFile(ImpalaTestSuite):
hdfs_put.wait()
def test_query_large_file(self, vector):
- self.__create_test_table();
+ self.__create_test_table()
dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME)
file_size = self.MAX_FILE_SIZE
self.__generate_file(dst_path, file_size)
self.client.execute("refresh %s" % self.TABLE_NAME)
# Query the table
- result = self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
+ self.client.execute("select * from %s limit 1" % self.TABLE_NAME)
def __create_test_table(self):
self.__drop_test_table()
- self.client.execute("CREATE TABLE %s (col string) " \
+ self.client.execute("CREATE TABLE %s (col string) "
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '%s'"
% (self.TABLE_NAME, self.TABLE_LOCATION))
def __drop_test_table(self):
self.client.execute("DROP TABLE IF EXISTS %s" % self.TABLE_NAME)
+
class TestBzip2Streaming(ImpalaTestSuite):
MAX_SCAN_RANGE_LENGTHS = [0, 5]
@@ -261,8 +307,8 @@ class TestBzip2Streaming(ImpalaTestSuite):
pytest.skip("skipping if it's not exhaustive test.")
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('max_scan_range_length', *cls.MAX_SCAN_RANGE_LENGTHS))
- cls.ImpalaTestMatrix.add_constraint(lambda v:\
- v.get_value('table_format').file_format == 'text' and\
+ cls.ImpalaTestMatrix.add_constraint(lambda v:
+ v.get_value('table_format').file_format == 'text' and
v.get_value('table_format').compression_codec == 'bzip')
def test_bzip2_streaming(self, vector):