You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/08 01:06:35 UTC
[impala] 03/03: IMPALA-8549: Add support for scanning DEFLATE text
files
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 6d68c4f6c01c3d1f9d51a802476e0ef99fbfa208
Author: Ethan Xue <et...@cloudera.com>
AuthorDate: Thu Jul 11 13:24:35 2019 -0700
IMPALA-8549: Add support for scanning DEFLATE text files
This patch adds support to Impala for scanning .DEFLATE files of
tables stored as text. To avoid confusion, it should be noted that
although these files have a compression type of DEFLATE in Impala,
they should be treated as if their compression type is DEFAULT.
Hadoop tools such as Hive and MapReduce support reading and writing
text files compressed using the deflate algorithm, which is the default
compression type. Hadoop uses the zlib library (an implementation of
the DEFLATE algorithm) to compress text files into .DEFLATE files,
which are not in the raw deflate format but rather the zlib format
(the zlib library supports three flavors of deflate, and Hadoop uses
the flavor that compresses data into deflate with zlib wrappings rather
than just raw deflate)
Testing:
There is a pre-existing unit test that validates compressing and
decompressing data with compression type DEFLATE. Also, modified
existing end-to-end testing that simulates querying files of various
formats and compression types. All core and exhaustive tests pass.
Change-Id: I45e41ab5a12637d396fef0812a09d71fa839b27a
Reviewed-on: http://gerrit.cloudera.org:8080/13857
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
---
be/src/exec/hdfs-text-scanner.cc | 21 ++++++++++++++++-----
be/src/exec/hdfs-text-scanner.h | 1 +
testdata/datasets/functional/schema_constraints.csv | 2 ++
.../functional-query_exhaustive.csv | 1 +
tests/query_test/test_compressed_formats.py | 19 +++++--------------
5 files changed, 25 insertions(+), 19 deletions(-)
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 21ed123..f1e9d26 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -90,10 +90,11 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
case THdfsCompression::SNAPPY:
case THdfsCompression::SNAPPY_BLOCKED:
case THdfsCompression::BZIP2:
+ case THdfsCompression::DEFLATE:
for (int j = 0; j < files[i]->splits.size(); ++j) {
- // In order to decompress gzip-, snappy- and bzip2-compressed text files, we
- // need to read entire files. Only read a file if we're assigned the first split
- // to avoid reading multi-block files with multiple scanners.
+ // In order to decompress gzip-, snappy-, bzip2- and deflate-compressed text
+ // files, we need to read entire files. Only read a file if we're assigned the
+ // first split to avoid reading multi-block files with multiple scanners.
ScanRange* split = files[i]->splits[j];
// We only process the split that starts at offset 0.
@@ -192,10 +193,20 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
Status HdfsTextScanner::InitNewRange() {
DCHECK_EQ(scan_state_, CONSTRUCTED);
+
+ auto compression_type = stream_ ->file_desc()->file_compression;
// Update the decompressor based on the compression type of the file in the context.
- DCHECK(stream_->file_desc()->file_compression != THdfsCompression::SNAPPY)
+ DCHECK(compression_type != THdfsCompression::SNAPPY)
<< "FE should have generated SNAPPY_BLOCKED instead.";
- RETURN_IF_ERROR(UpdateDecompressor(stream_->file_desc()->file_compression));
+ // In Hadoop, text files compressed into .DEFLATE files contain
+ // deflate with zlib wrappings as opposed to raw deflate, which
+ // is what THdfsCompression::DEFLATE implies. Since deflate is
+ // the default compression algorithm used in Hadoop, it makes
+ // sense to map it to type DEFAULT in Impala instead
+ if (compression_type == THdfsCompression::DEFLATE) {
+ compression_type = THdfsCompression::DEFAULT;
+ }
+ RETURN_IF_ERROR(UpdateDecompressor(compression_type));
HdfsPartitionDescriptor* hdfs_partition = context_->partition_descriptor();
char field_delim = hdfs_partition->field_delim();
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 5b15372..3ef4904 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -74,6 +74,7 @@ class HdfsTextScanner : public HdfsScanner {
case THdfsCompression::SNAPPY:
case THdfsCompression::SNAPPY_BLOCKED:
case THdfsCompression::BZIP2:
+ case THdfsCompression::DEFLATE:
return true;
default:
return false;
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 2099035..75185f5 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -235,11 +235,13 @@ table_name:date_tbl, constraint:restrict_to, table_format:text/lzo/block
table_name:date_tbl, constraint:restrict_to, table_format:text/bzip/block
table_name:date_tbl, constraint:restrict_to, table_format:text/gzip/block
table_name:date_tbl, constraint:restrict_to, table_format:text/snap/block
+table_name:date_tbl, constraint:restrict_to, table_format:text/def/block
table_name:date_tbl_error, constraint:restrict_to, table_format:text/none/none
table_name:date_tbl_error, constraint:restrict_to, table_format:text/lzo/block
table_name:date_tbl_error, constraint:restrict_to, table_format:text/bzip/block
table_name:date_tbl_error, constraint:restrict_to, table_format:text/gzip/block
table_name:date_tbl_error, constraint:restrict_to, table_format:text/snap/block
+table_name:date_tbl_error, constraint:restrict_to, table_format:text/def/block
table_name:insert_date_tbl, constraint:restrict_to, table_format:hbase/none/none
# Full transactional table is only supported for ORC
diff --git a/testdata/workloads/functional-query/functional-query_exhaustive.csv b/testdata/workloads/functional-query/functional-query_exhaustive.csv
index a06ab52..c2ef09f 100644
--- a/testdata/workloads/functional-query/functional-query_exhaustive.csv
+++ b/testdata/workloads/functional-query/functional-query_exhaustive.csv
@@ -1,5 +1,6 @@
# Generated File.
file_format: text, dataset: functional, compression_codec: none, compression_type: none
+file_format: text, dataset: functional, compression_codec: def, compression_type: block
file_format: text, dataset: functional, compression_codec: gzip, compression_type: block
file_format: text, dataset: functional, compression_codec: bzip, compression_type: block
file_format: text, dataset: functional, compression_codec: snap, compression_type: block
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 926889b..2748bd4 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -71,22 +71,13 @@ class TestCompressedFormats(ImpalaTestSuite):
def test_compressed_formats(self, vector):
file_format = vector.get_value('file_format')
extension, suffix = vector.get_value('compression_format')
- if file_format in ['rc', 'seq']:
- # Test that compressed RC/sequence files are supported.
+ if file_format in ['rc', 'seq', 'text']:
+ # TODO: How about LZO?
+ # Test that {gzip,snappy,bzip,deflate}-compressed
+ # {RC,sequence,text} files are supported.
db_suffix = '_%s_%s' % (file_format, suffix)
self._copy_and_query_compressed_file(
- 'tinytable', db_suffix, suffix, '000000_0', extension)
- elif file_format is 'text':
- # TODO: How about LZO?
- if suffix in ['gzip', 'snap', 'bzip']:
- # Test that {gzip,snappy,bzip}-compressed text files are supported.
- db_suffix = '_%s_%s' % (file_format, suffix)
- self._copy_and_query_compressed_file(
- 'tinytable', db_suffix, suffix, '000000_0', extension)
- else:
- # Deflate-compressed (['def']) text files (or at least text files with a
- # compressed extension) have not been tested yet.
- pytest.skip("Skipping the text/def tests")
+ 'tinytable', db_suffix, suffix, '000000_0', extension)
else:
assert False, "Unknown file_format: %s" % file_format