You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/10/14 23:01:46 UTC
[impala] branch master updated: IMPALA-8498: Write column index for
floating types when NaN is not present
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new f65c2a7 IMPALA-8498: Write column index for floating types when NaN is not present
f65c2a7 is described below
commit f65c2a754fe3011a17aa3bde093c117e57ffedcb
Author: norbert.luksa <no...@cloudera.com>
AuthorDate: Thu Sep 19 12:08:12 2019 +0200
IMPALA-8498: Write column index for floating types when NaN is not present
IMPALA-7307 disabled column index writing for floating point columns
until PARQUET-1222 is resolved. However, the problematic values are
only the NaNs. Therefore we can write column index if NaNs are not
present in data.
Testing:
* Added tests which should fail if a column index is
present while having NaN values in the column.
Change-Id: Ic9d367500243c8ca142a16ebfeef6c841f013434
Reviewed-on: http://gerrit.cloudera.org:8080/14264
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/parquet/hdfs-parquet-table-writer.cc | 26 +++---
.../queries/QueryTest/stats-extrapolation.test | 14 ++--
tests/query_test/test_parquet_page_index.py | 93 +++++++++++++++++-----
3 files changed, 96 insertions(+), 37 deletions(-)
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
index 00ded5c..b6a4391 100644
--- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc
@@ -412,16 +412,11 @@ class HdfsParquetTableWriter::ColumnWriter :
plain_encoded_value_size_(
ParquetPlainEncoder::EncodedByteSize(eval->root().type())) {
DCHECK_NE(eval->root().type().type, TYPE_BOOLEAN);
- // IMPALA-7304: Don't write column index for floating-point columns until
- // PARQUET-1222 is resolved.
- if (std::is_floating_point<T>::value) valid_column_index_ = false;
}
virtual void Reset() {
BaseColumnWriter::Reset();
- // IMPALA-7304: Don't write column index for floating-point columns until
- // PARQUET-1222 is resolved.
- if (std::is_floating_point<T>::value) valid_column_index_ = false;
+ valid_column_index_ = true;
// Default to dictionary encoding. If the cardinality ends up being too high,
// it will fall back to plain.
current_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
@@ -440,6 +435,7 @@ class HdfsParquetTableWriter::ColumnWriter :
protected:
virtual bool ProcessValue(void* value, int64_t* bytes_needed) {
+ T* val = CastValue(value);
if (current_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) {
if (UNLIKELY(num_values_since_dict_size_check_ >=
DICTIONARY_DATA_PAGE_SIZE_CHECK_PERIOD)) {
@@ -447,7 +443,7 @@ class HdfsParquetTableWriter::ColumnWriter :
if (dict_encoder_->EstimatedDataEncodedSize() >= page_size_) return false;
}
++num_values_since_dict_size_check_;
- *bytes_needed = dict_encoder_->Put(*CastValue(value));
+ *bytes_needed = dict_encoder_->Put(*val);
// If the dictionary contains the maximum number of values, switch to plain
// encoding for the next page. The current page is full and must be written out.
if (UNLIKELY(*bytes_needed < 0)) {
@@ -456,23 +452,31 @@ class HdfsParquetTableWriter::ColumnWriter :
}
parent_->file_size_estimate_ += *bytes_needed;
} else if (current_encoding_ == parquet::Encoding::PLAIN) {
- T* v = CastValue(value);
*bytes_needed = plain_encoded_value_size_ < 0 ?
- ParquetPlainEncoder::ByteSize<T>(*v) :
+ ParquetPlainEncoder::ByteSize<T>(*val) :
plain_encoded_value_size_;
if (current_page_->header.uncompressed_page_size + *bytes_needed > page_size_) {
return false;
}
uint8_t* dst_ptr = values_buffer_ + current_page_->header.uncompressed_page_size;
int64_t written_len =
- ParquetPlainEncoder::Encode(*v, plain_encoded_value_size_, dst_ptr);
+ ParquetPlainEncoder::Encode(*val, plain_encoded_value_size_, dst_ptr);
DCHECK_EQ(*bytes_needed, written_len);
current_page_->header.uncompressed_page_size += written_len;
} else {
// TODO: support other encodings here
DCHECK(false);
}
- page_stats_->Update(*CastValue(value));
+ // IMPALA-8498: Write column index for floating types when NaN is not present
+ if (std::is_same<float, std::remove_cv_t<T>>::value &&
+ UNLIKELY(std::isnan(*static_cast<float*>(value)))) {
+ valid_column_index_ = false;
+ } else if (std::is_same<double, std::remove_cv_t<T>>::value &&
+ UNLIKELY(std::isnan(*static_cast<double*>(value)))) {
+ valid_column_index_ = false;
+ }
+
+ page_stats_->Update(*val);
return true;
}
diff --git a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
index eac7916..313c5b9 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/stats-extrapolation.test
@@ -14,7 +14,7 @@ explain select id from alltypes;
' partitions: 0/12 rows=unavailable'
' columns: unavailable'
row_regex:.* extrapolated-rows=unavailable.*
-' tuple-ids=0 row-size=4B cardinality=5.92K'
+' tuple-ids=0 row-size=4B cardinality=5.97K'
---- TYPES
STRING
====
@@ -45,8 +45,11 @@ show table stats alltypes
---- LABELS
YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
---- RESULTS
-'2009','1',-1,308,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
-'2009','2',-1,289,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
+'2009','1',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=1'
+'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
+'2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
+'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
+'2009','2',-1,290,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=2'
'2009','3',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=3'
'2009','4',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=4'
'2009','5',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=5'
@@ -54,9 +57,6 @@ YEAR, MONTH, #ROWS, EXTRAP #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION,
'2009','7',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=7'
'2009','8',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=8'
'2009','9',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=9'
-'2009','10',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=10'
-'2009','11',-1,302,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=11'
-'2009','12',-1,307,1,regex:.*B,'NOT CACHED','NOT CACHED','PARQUET','false','$NAMENODE/test-warehouse/$DATABASE.db/alltypes/year=2009/month=12'
'Total','',3650,3650,12,regex:.*B,'0B','','','',''
---- TYPES
STRING,STRING,BIGINT,BIGINT,BIGINT,STRING,STRING,STRING,STRING,STRING,STRING
@@ -221,7 +221,7 @@ explain select id from alltypes;
' partitions: 0/24 rows=unavailable'
' columns: unavailable'
row_regex:.* extrapolated-rows=unavailable.*
-' tuple-ids=0 row-size=4B cardinality=17.76K'
+' tuple-ids=0 row-size=4B cardinality=17.91K'
' in pipelines: 00(GETNEXT)'
---- TYPES
STRING
diff --git a/tests/query_test/test_parquet_page_index.py b/tests/query_test/test_parquet_page_index.py
index 40c7eb4..cd81277 100644
--- a/tests/query_test/test_parquet_page_index.py
+++ b/tests/query_test/test_parquet_page_index.py
@@ -231,11 +231,6 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
index_size = len(column_info.offset_index.page_locations)
assert index_size > 0
self._validate_page_locations(column_info.offset_index.page_locations)
- # IMPALA-7304: Impala doesn't write column index for floating-point columns
- # until PARQUET-1222 is resolved.
- if column_info.schema.type in [4, 5]:
- assert column_info.column_index is None
- continue
self._validate_null_stats(index_size, column_info)
self._validate_min_max_values(index_size, column_info)
self._validate_boundary_order(column_info)
@@ -274,8 +269,8 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
qualified_source_table = "{0}.{1}".format(unique_database, source_table)
self._validate_parquet_page_index(hdfs_path, tmpdir.join(qualified_source_table))
- def _create_string_table_with_values(self, vector, unique_database, table_name,
- values_sql):
+ def _create_table_with_values_of_type(self, col_type, vector, unique_database,
+ table_name, values_sql):
"""Creates a parquet table that has a single string column, then invokes an insert
statement on it with the 'values_sql' parameter. E.g. 'values_sql' is "('asdf')".
It returns the HDFS path for the table.
@@ -283,8 +278,8 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
qualified_table_name = "{0}.{1}".format(unique_database, table_name)
self.execute_query("drop table if exists {0}".format(qualified_table_name))
vector.get_value('exec_option')['num_nodes'] = 1
- query = ("create table {0} (str string) stored as parquet").format(
- qualified_table_name)
+ query = ("create table {0} (x {1}) stored as parquet").format(
+ qualified_table_name, col_type)
self.execute_query(query, vector.get_value('exec_option'))
self.execute_query("insert into {0} values {1}".format(qualified_table_name,
values_sql), vector.get_value('exec_option'))
@@ -345,23 +340,26 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
def test_max_string_values(self, vector, unique_database, tmpdir):
"""Test string values that are all 0xFFs or end with 0xFFs."""
+ col_type = "STRING"
# String value is all of 0xFFs but its length is less than PAGE_INDEX_TRUNCATE_LENGTH.
short_tbl = "short_tbl"
- short_hdfs_path = self._create_string_table_with_values(vector, unique_database,
- short_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
+ short_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+ unique_database, short_tbl,
+ "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH - 1))
self._validate_parquet_page_index(short_hdfs_path, tmpdir.join(short_tbl))
# String value is all of 0xFFs and its length is PAGE_INDEX_TRUNCATE_LENGTH.
fit_tbl = "fit_tbl"
- fit_hdfs_path = self._create_string_table_with_values(vector, unique_database,
- fit_tbl, "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
+ fit_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+ unique_database, fit_tbl,
+ "(rpad('', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH))
self._validate_parquet_page_index(fit_hdfs_path, tmpdir.join(fit_tbl))
# All bytes are 0xFFs and the string is longer then PAGE_INDEX_TRUNCATE_LENGTH, so we
# should not write page statistics.
too_long_tbl = "too_long_tbl"
- too_long_hdfs_path = self._create_string_table_with_values(vector, unique_database,
- too_long_tbl, "(rpad('', {0}, chr(255)))".format(
+ too_long_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+ unique_database, too_long_tbl, "(rpad('', {0}, chr(255)))".format(
PAGE_INDEX_MAX_STRING_LENGTH + 1))
row_group_indexes = self._get_row_groups_from_hdfs_folder(too_long_hdfs_path,
tmpdir.join(too_long_tbl))
@@ -373,8 +371,9 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
# Test string with value that starts with 'aaa' following with 0xFFs and its length is
# greater than PAGE_INDEX_TRUNCATE_LENGTH. Max value should be 'aab'.
aaa_tbl = "aaa_tbl"
- aaa_hdfs_path = self._create_string_table_with_values(vector, unique_database,
- aaa_tbl, "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
+ aaa_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+ unique_database, aaa_tbl,
+ "(rpad('aaa', {0}, chr(255)))".format(PAGE_INDEX_MAX_STRING_LENGTH + 1))
row_group_indexes = self._get_row_groups_from_hdfs_folder(aaa_hdfs_path,
tmpdir.join(aaa_tbl))
column = row_group_indexes[0][0]
@@ -409,8 +408,8 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
"""
vector.get_value('exec_option')['parquet_page_row_count_limit'] = 2
null_tbl = 'null_table'
- null_tbl_path = self._create_string_table_with_values(vector, unique_database,
- null_tbl, "(NULL), (NULL), ('foo'), (NULL), (NULL), (NULL)")
+ null_tbl_path = self._create_table_with_values_of_type("STRING", vector,
+ unique_database, null_tbl, "(NULL), (NULL), ('foo'), (NULL), (NULL), (NULL)")
row_group_indexes = self._get_row_groups_from_hdfs_folder(null_tbl_path,
tmpdir.join(null_tbl))
column = row_group_indexes[0][0]
@@ -435,3 +434,59 @@ class TestHdfsParquetTableIndexWriter(ImpalaTestSuite):
for column in row_group:
assert column.offset_index is None
assert column.column_index is None
+
+ def test_nan_values_for_floating_types(self, vector, unique_database, tmpdir):
+ """ IMPALA-7304: Impala doesn't write column index for floating-point columns
+ until PARQUET-1222 is resolved. This is modified by:
+ IMPALA-8498: Write column index for floating types when NaN is not present.
+ """
+ for col_type in ["float", "double"]:
+ nan_val = "(CAST('NaN' as " + col_type + "))"
+ # Table contains no NaN values.
+ no_nan_tbl = "no_nan_tbl_" + col_type
+ no_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+ unique_database, no_nan_tbl, "(1.5), (2.3), (4.5), (42.42), (3.1415), (0.0)")
+ self._validate_parquet_page_index(no_nan_hdfs_path, tmpdir.join(no_nan_tbl))
+ row_group_indexes = self._get_row_groups_from_hdfs_folder(no_nan_hdfs_path,
+ tmpdir.join(no_nan_tbl))
+ column = row_group_indexes[0][0]
+ assert column.column_index is not None
+
+ # Table contains NaN as first value.
+ first_nan_tbl = "first_nan_tbl_" + col_type
+ first_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+ unique_database, first_nan_tbl,
+ nan_val + ", (2.3), (4.5), (42.42), (3.1415), (0.0)")
+ row_group_indexes = self._get_row_groups_from_hdfs_folder(first_nan_hdfs_path,
+ tmpdir.join(first_nan_tbl))
+ column = row_group_indexes[0][0]
+ assert column.column_index is None
+
+ # Table contains NaN as last value.
+ last_nan_tbl = "last_nan_tbl_" + col_type
+ last_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+ unique_database, last_nan_tbl,
+ "(1.5), (2.3), (42.42), (3.1415), (0.0), " + nan_val)
+ row_group_indexes = self._get_row_groups_from_hdfs_folder(last_nan_hdfs_path,
+ tmpdir.join(last_nan_tbl))
+ column = row_group_indexes[0][0]
+ assert column.column_index is None
+
+ # Table contains NaN value in the middle.
+ mid_nan_tbl = "mid_nan_tbl_" + col_type
+ mid_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+ unique_database, mid_nan_tbl,
+ "(2.3), (4.5), " + nan_val + ", (42.42), (3.1415), (0.0)")
+ row_group_indexes = self._get_row_groups_from_hdfs_folder(mid_nan_hdfs_path,
+ tmpdir.join(mid_nan_tbl))
+ column = row_group_indexes[0][0]
+ assert column.column_index is None
+
+ # Table contains only NaN values.
+ only_nan_tbl = "only_nan_tbl_" + col_type
+ only_nan_hdfs_path = self._create_table_with_values_of_type(col_type, vector,
+ unique_database, only_nan_tbl, (nan_val + ", ") * 3 + nan_val)
+ row_group_indexes = self._get_row_groups_from_hdfs_folder(only_nan_hdfs_path,
+ tmpdir.join(only_nan_tbl))
+ column = row_group_indexes[0][0]
+ assert column.column_index is None