You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/08 01:06:35 UTC

[impala] 03/03: IMPALA-8549: Add support for scanning DEFLATE text files

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6d68c4f6c01c3d1f9d51a802476e0ef99fbfa208
Author: Ethan Xue <et...@cloudera.com>
AuthorDate: Thu Jul 11 13:24:35 2019 -0700

    IMPALA-8549: Add support for scanning DEFLATE text files
    
    This patch adds support to Impala for scanning .DEFLATE files of
    tables stored as text. To avoid confusion, it should be noted that
    although these files have a compression type of DEFLATE in Impala,
    they should be treated as if their compression type is DEFAULT.
    
    Hadoop tools such as Hive and MapReduce support reading and writing
    text files compressed using the deflate algorithm, which is the default
    compression type. Hadoop uses the zlib library (an implementation of
    the DEFLATE algorithm) to compress text files into .DEFLATE files,
    which are not in the raw deflate format but rather the zlib format
    (the zlib library supports three flavors of deflate, and Hadoop uses
    the flavor that compresses data into deflate with zlib wrappings rather
    than just raw deflate)
    
    Testing:
    There is a pre-existing unit test that validates compressing and
    decompressing data with compression type DEFLATE. Also, modified
    existing end-to-end testing that simulates querying files of various
    formats and compression types. All core and exhaustive tests pass.
    
    Change-Id: I45e41ab5a12637d396fef0812a09d71fa839b27a
    Reviewed-on: http://gerrit.cloudera.org:8080/13857
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
---
 be/src/exec/hdfs-text-scanner.cc                    | 21 ++++++++++++++++-----
 be/src/exec/hdfs-text-scanner.h                     |  1 +
 testdata/datasets/functional/schema_constraints.csv |  2 ++
 .../functional-query_exhaustive.csv                 |  1 +
 tests/query_test/test_compressed_formats.py         | 19 +++++--------------
 5 files changed, 25 insertions(+), 19 deletions(-)

diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 21ed123..f1e9d26 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -90,10 +90,11 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
       case THdfsCompression::SNAPPY:
       case THdfsCompression::SNAPPY_BLOCKED:
       case THdfsCompression::BZIP2:
+      case THdfsCompression::DEFLATE:
         for (int j = 0; j < files[i]->splits.size(); ++j) {
-          // In order to decompress gzip-, snappy- and bzip2-compressed text files, we
-          // need to read entire files. Only read a file if we're assigned the first split
-          // to avoid reading multi-block files with multiple scanners.
+          // In order to decompress gzip-, snappy-, bzip2- and deflate-compressed text
+          // files, we need to read entire files. Only read a file if we're assigned the
+          // first split to avoid reading multi-block files with multiple scanners.
           ScanRange* split = files[i]->splits[j];
 
           // We only process the split that starts at offset 0.
@@ -192,10 +193,20 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
 
 Status HdfsTextScanner::InitNewRange() {
   DCHECK_EQ(scan_state_, CONSTRUCTED);
+
+  auto compression_type = stream_ ->file_desc()->file_compression;
   // Update the decompressor based on the compression type of the file in the context.
-  DCHECK(stream_->file_desc()->file_compression != THdfsCompression::SNAPPY)
+  DCHECK(compression_type != THdfsCompression::SNAPPY)
       << "FE should have generated SNAPPY_BLOCKED instead.";
-  RETURN_IF_ERROR(UpdateDecompressor(stream_->file_desc()->file_compression));
+  // In Hadoop, text files compressed into .DEFLATE files contain
+  // deflate with zlib wrappings as opposed to raw deflate, which
+  // is what THdfsCompression::DEFLATE implies. Since deflate is
+  // the default compression algorithm used in Hadoop, it makes
+  // sense to map it to type DEFAULT in Impala instead
+  if (compression_type == THdfsCompression::DEFLATE) {
+    compression_type = THdfsCompression::DEFAULT;
+  }
+  RETURN_IF_ERROR(UpdateDecompressor(compression_type));
 
   HdfsPartitionDescriptor* hdfs_partition = context_->partition_descriptor();
   char field_delim = hdfs_partition->field_delim();
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 5b15372..3ef4904 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -74,6 +74,7 @@ class HdfsTextScanner : public HdfsScanner {
       case THdfsCompression::SNAPPY:
       case THdfsCompression::SNAPPY_BLOCKED:
       case THdfsCompression::BZIP2:
+      case THdfsCompression::DEFLATE:
         return true;
       default:
         return false;
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 2099035..75185f5 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -235,11 +235,13 @@ table_name:date_tbl, constraint:restrict_to, table_format:text/lzo/block
 table_name:date_tbl, constraint:restrict_to, table_format:text/bzip/block
 table_name:date_tbl, constraint:restrict_to, table_format:text/gzip/block
 table_name:date_tbl, constraint:restrict_to, table_format:text/snap/block
+table_name:date_tbl, constraint:restrict_to, table_format:text/def/block
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/none/none
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/lzo/block
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/bzip/block
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/gzip/block
 table_name:date_tbl_error, constraint:restrict_to, table_format:text/snap/block
+table_name:date_tbl_error, constraint:restrict_to, table_format:text/def/block
 table_name:insert_date_tbl, constraint:restrict_to, table_format:hbase/none/none
 
 # Full transactional table is only supported for ORC
diff --git a/testdata/workloads/functional-query/functional-query_exhaustive.csv b/testdata/workloads/functional-query/functional-query_exhaustive.csv
index a06ab52..c2ef09f 100644
--- a/testdata/workloads/functional-query/functional-query_exhaustive.csv
+++ b/testdata/workloads/functional-query/functional-query_exhaustive.csv
@@ -1,5 +1,6 @@
 # Generated File.
 file_format: text, dataset: functional, compression_codec: none, compression_type: none
+file_format: text, dataset: functional, compression_codec: def, compression_type: block
 file_format: text, dataset: functional, compression_codec: gzip, compression_type: block
 file_format: text, dataset: functional, compression_codec: bzip, compression_type: block
 file_format: text, dataset: functional, compression_codec: snap, compression_type: block
diff --git a/tests/query_test/test_compressed_formats.py b/tests/query_test/test_compressed_formats.py
index 926889b..2748bd4 100644
--- a/tests/query_test/test_compressed_formats.py
+++ b/tests/query_test/test_compressed_formats.py
@@ -71,22 +71,13 @@ class TestCompressedFormats(ImpalaTestSuite):
   def test_compressed_formats(self, vector):
     file_format = vector.get_value('file_format')
     extension, suffix = vector.get_value('compression_format')
-    if file_format in ['rc', 'seq']:
-      # Test that compressed RC/sequence files are supported.
+    if file_format in ['rc', 'seq', 'text']:
+      # TODO: How about LZO?
+      # Test that {gzip,snappy,bzip,deflate}-compressed
+      # {RC,sequence,text} files are supported.
       db_suffix = '_%s_%s' % (file_format, suffix)
       self._copy_and_query_compressed_file(
-       'tinytable', db_suffix, suffix, '000000_0', extension)
-    elif file_format is 'text':
-      # TODO: How about LZO?
-      if suffix in ['gzip', 'snap', 'bzip']:
-        # Test that {gzip,snappy,bzip}-compressed text files are supported.
-        db_suffix = '_%s_%s' % (file_format, suffix)
-        self._copy_and_query_compressed_file(
-          'tinytable', db_suffix, suffix, '000000_0', extension)
-      else:
-        # Deflate-compressed (['def']) text files (or at least text files with a
-        # compressed extension) have not been tested yet.
-        pytest.skip("Skipping the text/def tests")
+        'tinytable', db_suffix, suffix, '000000_0', extension)
     else:
       assert False, "Unknown file_format: %s" % file_format