You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2021/04/08 20:47:01 UTC
[impala] 02/03: IMPALA-10629: Fix parquet compression codecs for
data load scripts
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit d29fab1ad9a32c0200b71506c3b31f1ac8838e63
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Wed Mar 31 19:26:57 2021 -0700
IMPALA-10629: Fix parquet compression codecs for data load scripts
Currently, the dataload scripts don't respect non-standard
compression codecs when loading Parquet data. It always
loads snappy, even when specifying something else like
--table_format=parquet/zstd.
This fixes the dataload scripts so that they specify the
compression_codec query option correctly and thus use the
right codec when loading Parquet.
For backwards compatibility, this preserves the behavior
that parquet/none corresponds to the default compression
codec (which is Snappy).
This should make it easier to do performance testing on
various Parquet codecs (like ZSTD).
Testing:
- Ran bin/load-data.py -w tpch --table_format=parquet/zstd
and checked the codec in the file with the parquet-reader
utility
Change-Id: I1a346de3e5c4e38328e5a8ce8162697b7dd6553a
Reviewed-on: http://gerrit.cloudera.org:8080/17259
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Joe McDonnell <jo...@cloudera.com>
---
testdata/bin/generate-schema-statements.py | 40 +++++++++++++++++++++++++-----
1 file changed, 34 insertions(+), 6 deletions(-)
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index a69fe9e..72c39f0 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -151,9 +151,22 @@ AVRO_SCHEMA_DIR = "avro_schemas"
DEFAULT_FS=os.environ['DEFAULT_FS']
IMPALA_SUPPORTED_INSERT_FORMATS = ['parquet', 'hbase', 'text', 'kudu']
+IMPALA_PARQUET_COMPRESSION_MAP = \
+ {
+ 'uncompressed': 'NONE',
+ # UGLY: parquet/none always referred to the default compression, which is SNAPPY
+ # Maintain that for backwards compatibility.
+ 'none': 'SNAPPY',
+ 'snap': 'SNAPPY',
+ 'gzip': 'GZIP',
+ 'lz4': 'LZ4',
+ 'zstd': 'ZSTD'
+ }
+
COMPRESSION_TYPE = "SET mapred.output.compression.type=%s;"
COMPRESSION_ENABLED = "SET hive.exec.compress.output=%s;"
COMPRESSION_CODEC = "SET mapred.output.compression.codec=%s;"
+IMPALA_COMPRESSION_CODEC = "SET compression_codec=%s;"
AVRO_COMPRESSION_CODEC = "SET avro.output.codec=%s;"
SET_DYNAMIC_PARTITION_STATEMENT = "SET hive.exec.dynamic.partition=true;"
SET_PARTITION_MODE_NONSTRICT_STATEMENT = "SET hive.exec.dynamic.partition.mode=nonstrict;"
@@ -423,6 +436,11 @@ def build_codec_enabled_statement(codec):
compression_enabled = 'false' if codec == 'none' else 'true'
return COMPRESSION_ENABLED % compression_enabled
+
+def build_impala_parquet_codec_statement(codec):
+ parquet_codec = IMPALA_PARQUET_COMPRESSION_MAP[codec]
+ return IMPALA_COMPRESSION_CODEC % parquet_codec
+
def build_insert_into_statement(insert, db_name, db_suffix, table_name, file_format,
hdfs_path, for_impala=False):
insert_hint = "/* +shuffle, clustered */" \
@@ -464,15 +482,24 @@ def build_hbase_insert(db_name, db_suffix, table_name):
return hbase_insert
def build_insert(insert, db_name, db_suffix, file_format,
- codec, compression_type, table_name, hdfs_path, create_hive=False):
+ codec, compression_type, table_name, hdfs_path, create_hive=False,
+ for_impala=False):
# HBASE inserts don't need the hive options to be set, and don't require and HDFS
# file location, so they're handled separately.
if file_format == 'hbase' and not create_hive:
return build_hbase_insert(db_name, db_suffix, table_name)
- output = build_codec_enabled_statement(codec) + "\n"
- output += build_compression_codec_statement(codec, compression_type, file_format) + "\n"
+ output = ''
+ if not for_impala:
+ # If this is not for Impala, then generate the hive codec statements
+ output += build_codec_enabled_statement(codec) + "\n"
+ output += build_compression_codec_statement(codec, compression_type,
+ file_format) + "\n"
+ elif file_format == 'parquet':
+ # This is for Impala parquet, add the appropriate codec statement
+ output += build_impala_parquet_codec_statement(codec) + "\n"
output += build_insert_into_statement(insert, db_name, db_suffix,
- table_name, file_format, hdfs_path) + "\n"
+ table_name, file_format, hdfs_path,
+ for_impala) + "\n"
return output
def build_load_statement(load_template, db_name, db_suffix, table_name):
@@ -760,8 +787,9 @@ def generate_statements(output_name, test_vectors, sections,
hive_output.load.append(build_insert(insert_hive, db_name, db_suffix,
file_format, codec, compression_type, table_name, data_path))
elif insert:
- impala_load.load.append(build_insert_into_statement(insert, db_name,
- db_suffix, table_name, file_format, data_path, for_impala=True))
+ impala_load.load.append(build_insert(insert, db_name, db_suffix,
+ file_format, codec, compression_type, table_name, data_path,
+ for_impala=True))
else:
print 'Empty parquet/kudu load for table %s. Skipping insert generation' \
% table_name