You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/04/08 20:47:01 UTC

[impala] 02/03: IMPALA-10629: Fix parquet compression codecs for data load scripts

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d29fab1ad9a32c0200b71506c3b31f1ac8838e63
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Mar 31 19:26:57 2021 -0700

    IMPALA-10629: Fix parquet compression codecs for data load scripts
    
    Currently, the dataload scripts don't respect non-standard
    compression codecs when loading Parquet data. It always
    loads snappy, even when specifying something else like
    --table_format=parquet/zstd.
    
    This fixes the dataload scripts so that they specify the
    compression_codec query option correctly and thus use the
    right codec when loading Parquet.
    
    For backwards compatibility, this preserves the behavior
    that parquet/none corresponds to the default compression
    codec (which is Snappy).
    
    This should make it easier to do performance testing on
    various Parquet codecs (like ZSTD).
    
    Testing:
     - Ran bin/load-data.py -w tpch --table_format=parquet/zstd
       and checked the codec in the file with the parquet-reader
       utility
    
    Change-Id: I1a346de3e5c4e38328e5a8ce8162697b7dd6553a
    Reviewed-on: http://gerrit.cloudera.org:8080/17259
    Reviewed-by: Joe McDonnell <jo...@cloudera.com>
    Tested-by: Joe McDonnell <jo...@cloudera.com>
---
 testdata/bin/generate-schema-statements.py | 40 +++++++++++++++++++++++++-----
 1 file changed, 34 insertions(+), 6 deletions(-)

diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index a69fe9e..72c39f0 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -151,9 +151,22 @@ AVRO_SCHEMA_DIR = "avro_schemas"
 DEFAULT_FS=os.environ['DEFAULT_FS']
 IMPALA_SUPPORTED_INSERT_FORMATS = ['parquet', 'hbase', 'text', 'kudu']
 
+IMPALA_PARQUET_COMPRESSION_MAP = \
+  {
+    'uncompressed': 'NONE',
+    # UGLY: parquet/none always referred to the default compression, which is SNAPPY
+    # Maintain that for backwards compatibility.
+    'none': 'SNAPPY',
+    'snap': 'SNAPPY',
+    'gzip': 'GZIP',
+    'lz4': 'LZ4',
+    'zstd': 'ZSTD'
+  }
+
 COMPRESSION_TYPE = "SET mapred.output.compression.type=%s;"
 COMPRESSION_ENABLED = "SET hive.exec.compress.output=%s;"
 COMPRESSION_CODEC = "SET mapred.output.compression.codec=%s;"
+IMPALA_COMPRESSION_CODEC = "SET compression_codec=%s;"
 AVRO_COMPRESSION_CODEC = "SET avro.output.codec=%s;"
 SET_DYNAMIC_PARTITION_STATEMENT = "SET hive.exec.dynamic.partition=true;"
 SET_PARTITION_MODE_NONSTRICT_STATEMENT = "SET hive.exec.dynamic.partition.mode=nonstrict;"
@@ -423,6 +436,11 @@ def build_codec_enabled_statement(codec):
   compression_enabled = 'false' if codec == 'none' else 'true'
   return COMPRESSION_ENABLED % compression_enabled
 
+
+def build_impala_parquet_codec_statement(codec):
+  parquet_codec = IMPALA_PARQUET_COMPRESSION_MAP[codec]
+  return IMPALA_COMPRESSION_CODEC % parquet_codec
+
 def build_insert_into_statement(insert, db_name, db_suffix, table_name, file_format,
                                 hdfs_path, for_impala=False):
   insert_hint = "/* +shuffle, clustered */" \
@@ -464,15 +482,24 @@ def build_hbase_insert(db_name, db_suffix, table_name):
   return hbase_insert
 
 def build_insert(insert, db_name, db_suffix, file_format,
-                 codec, compression_type, table_name, hdfs_path, create_hive=False):
+                 codec, compression_type, table_name, hdfs_path, create_hive=False,
+                 for_impala=False):
   # HBASE inserts don't need the hive options to be set, and don't require and HDFS
   # file location, so they're handled separately.
   if file_format == 'hbase' and not create_hive:
     return build_hbase_insert(db_name, db_suffix, table_name)
-  output = build_codec_enabled_statement(codec) + "\n"
-  output += build_compression_codec_statement(codec, compression_type, file_format) + "\n"
+  output = ''
+  if not for_impala:
+    # If this is not for Impala, then generate the hive codec statements
+    output += build_codec_enabled_statement(codec) + "\n"
+    output += build_compression_codec_statement(codec, compression_type,
+                                                file_format) + "\n"
+  elif file_format == 'parquet':
+    # This is for Impala parquet, add the appropriate codec statement
+    output += build_impala_parquet_codec_statement(codec) + "\n"
   output += build_insert_into_statement(insert, db_name, db_suffix,
-                                        table_name, file_format, hdfs_path) + "\n"
+                                        table_name, file_format, hdfs_path,
+                                        for_impala) + "\n"
   return output
 
 def build_load_statement(load_template, db_name, db_suffix, table_name):
@@ -760,8 +787,9 @@ def generate_statements(output_name, test_vectors, sections,
             hive_output.load.append(build_insert(insert_hive, db_name, db_suffix,
                 file_format, codec, compression_type, table_name, data_path))
           elif insert:
-            impala_load.load.append(build_insert_into_statement(insert, db_name,
-                db_suffix, table_name, file_format, data_path, for_impala=True))
+            impala_load.load.append(build_insert(insert, db_name, db_suffix,
+                file_format, codec, compression_type, table_name, data_path,
+                for_impala=True))
           else:
             print 'Empty parquet/kudu load for table %s. Skipping insert generation' \
               % table_name