You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/08/25 12:23:47 UTC
[impala] branch branch-4.1.1 updated (f6ee249ac -> 44dc157a2)
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a change to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git
from f6ee249ac IMPALA-11317/IMPALA-11316/IMPALA-11315: impala-shell Python 3 fixes
new b28e133f0 IMPALA-11249: Fix add_test_dimensions() locations to call super()
new 051c59bd8 IMPALA-9410: Support resolving ORC file columns by names
new e535f4b8d IMPALA-11346: Migrated partitioned Iceberg tables might return ERROR when WHERE condition is used on partition column
new 0b2f6b7f3 IMPALA-11345: Parquet Bloom filtering failure if column is added to the schema
new 44dc157a2 IMPALA-11344: Missing slots in all cases should be allowed to be read
The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
be/src/exec/file-metadata-utils.cc | 1 -
be/src/exec/hdfs-orc-scanner.cc | 5 +-
be/src/exec/orc-column-readers.cc | 23 +-
be/src/exec/orc-metadata-utils.cc | 93 ++++-
be/src/exec/orc-metadata-utils.h | 11 +-
be/src/exec/parquet/hdfs-parquet-scanner.cc | 72 +++-
be/src/service/query-options.cc | 7 +
be/src/service/query-options.h | 462 ++++++++++-----------
common/thrift/ImpalaService.thrift | 3 +
common/thrift/Query.thrift | 3 +
.../queries/QueryTest/iceberg-migrated-tables.test | 146 ++++++-
tests/authorization/test_ranger.py | 8 +
tests/common/test_dimensions.py | 7 +
tests/custom_cluster/test_client_ssl.py | 1 +
tests/query_test/test_insert.py | 22 +-
tests/query_test/test_nested_types.py | 26 +-
tests/query_test/test_parquet_bloom_filter.py | 21 +
tests/query_test/test_scanners.py | 61 +++
tests/shell/test_shell_client.py | 11 +-
tests/shell/test_shell_commandline.py | 10 +-
tests/shell/test_shell_interactive.py | 16 +-
21 files changed, 723 insertions(+), 286 deletions(-)
[impala] 01/05: IMPALA-11249: Fix add_test_dimensions() locations to call super()
Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git
commit b28e133f008e5196bf2bc7f2114cc45d0dfbe963
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Sat May 21 19:10:11 2022 -0700
IMPALA-11249: Fix add_test_dimensions() locations to call super()
The original issue is that the strict HS2 shell tests
are not running in precommit or nightly jobs, but they
do run in local developer environments. Investigating
this showed that the shell tests were running with a
weird set of test dimensions that includes
table_format_and_file_extension. That dimension is only
used in test_insert.py::TestInsertFileExtension.
What is happening is that the shell tests and other
locations are running add_test_dimensions() without
calling super(..., cls).add_test_dimensions(). The
behavior is unclear, but there is clearly cross-talk
between the different tests that do this.
This changes all add_test_dimensions() locations to
call super(..., cls).add_test_dimensions() if they
don't already. Each location has been tuned to run
the same set of tests as before (except the shell
tests which now run the strict HS2 tests).
As part of this, several shell tests need to be
skipped or fixed for strict HS2.
Testing:
- Ran core job
- Ran tests locally to verify the set of tests
didn't change.
Change-Id: Ib20fd479d3b91ed0ed89a0bc5623cd2a5a458614
Reviewed-on: http://gerrit.cloudera.org:8080/18557
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/18905
Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
Tested-by: Quanlong Huang <hu...@gmail.com>
---
tests/authorization/test_ranger.py | 8 ++++++++
tests/custom_cluster/test_client_ssl.py | 1 +
tests/query_test/test_insert.py | 22 +++++++++++++++-------
tests/query_test/test_nested_types.py | 5 ++++-
tests/shell/test_shell_client.py | 11 +++++++++--
tests/shell/test_shell_commandline.py | 10 ++++++++--
tests/shell/test_shell_interactive.py | 16 +++++++++++++---
7 files changed, 58 insertions(+), 15 deletions(-)
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 875886b12..d0caa7b17 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -1591,6 +1591,7 @@ class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite):
@classmethod
def add_test_dimensions(cls):
+ super(TestRangerColumnMaskingComplexTypesInSelectList, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
cls.ImpalaTestMatrix.add_dimension(create_orc_dimension(cls.get_workload()))
cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -1598,6 +1599,13 @@ class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite):
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
disable_codegen_options=[True]))
+ @classmethod
+ def add_custom_cluster_constraints(cls):
+ # Do not call the super() implementation, because this class needs to relax
+ # the set of constraints. The usual constraints only run on uncompressed text.
+ # This disables that constraint to let us run against only ORC.
+ return
+
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py
index 66a3fff78..4add40501 100644
--- a/tests/custom_cluster/test_client_ssl.py
+++ b/tests/custom_cluster/test_client_ssl.py
@@ -140,6 +140,7 @@ class TestClientSsl(CustomClusterTestSuite):
@classmethod
def add_test_dimensions(cls):
+ super(TestClientSsl, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
@pytest.mark.execute_serially
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index 401d9b960..30d75fdc3 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -29,7 +29,9 @@ from tests.common.skip import SkipIfABFS, SkipIfEC, SkipIfLocal, \
SkipIfHive2, SkipIfNotHdfsMinicluster, SkipIfS3, SkipIfDockerizedCluster
from tests.common.test_dimensions import (
create_exec_option_dimension,
- create_uncompressed_text_dimension)
+ create_uncompressed_text_dimension,
+ create_single_exec_option_dimension,
+ is_supported_insert_format)
from tests.common.test_result_verifier import (
QueryTestResult,
parse_result_rows)
@@ -335,20 +337,26 @@ class TestInsertFileExtension(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
- cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(
- 'table_format_and_file_extension',
- *[('parquet', '.parq'), ('textfile', '.txt')]))
+ super(TestInsertFileExtension, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_constraint(lambda v:
+ is_supported_insert_format(v.get_value('table_format')))
+ cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
@classmethod
def setup_class(cls):
super(TestInsertFileExtension, cls).setup_class()
def test_file_extension(self, vector, unique_database):
- table_format = vector.get_value('table_format_and_file_extension')[0]
- file_extension = vector.get_value('table_format_and_file_extension')[1]
+ table_format = vector.get_value('table_format').file_format
+ if table_format == 'parquet':
+ file_extension = '.parq'
+ stored_as_format = 'parquet'
+ else:
+ file_extension = '.txt'
+ stored_as_format = 'textfile'
table_name = "{0}_table".format(table_format)
ctas_query = "create table {0}.{1} stored as {2} as select 1".format(
- unique_database, table_name, table_format)
+ unique_database, table_name, stored_as_format)
self.execute_query_expect_success(self.client, ctas_query)
for path in self.filesystem_client.ls("test-warehouse/{0}.db/{1}".format(
unique_database, table_name)):
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index 3ca64f885..ef64e1181 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -36,7 +36,8 @@ from tests.common.skip import (
SkipIfNotHdfsMinicluster
)
from tests.common.test_dimensions import (create_exec_option_dimension,
- create_exec_option_dimension_from_dict, create_client_protocol_dimension)
+ create_exec_option_dimension_from_dict, create_client_protocol_dimension,
+ create_orc_dimension)
from tests.common.test_vector import ImpalaTestDimension
from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_HDFS
@@ -185,9 +186,11 @@ class TestNestedTypesInSelectListWithBeeswax(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
+ super(TestNestedTypesInSelectListWithBeeswax, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('protocol') == 'beeswax')
+ cls.ImpalaTestMatrix.add_dimension(create_orc_dimension(cls.get_workload()))
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
disable_codegen_options=[True]))
diff --git a/tests/shell/test_shell_client.py b/tests/shell/test_shell_client.py
index 5fa1d5caa..1388b527f 100644
--- a/tests/shell/test_shell_client.py
+++ b/tests/shell/test_shell_client.py
@@ -20,8 +20,9 @@
from shell.impala_client import ImpalaBeeswaxClient, ImpalaHS2Client
from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.test_dimensions import create_client_protocol_dimension
-from tests.common.test_dimensions import create_client_protocol_no_strict_dimension
+from tests.common.test_dimensions import (
+ create_client_protocol_dimension, create_client_protocol_no_strict_dimension,
+ create_uncompressed_text_dimension, create_single_exec_option_dimension)
from util import get_impalad_host_port
@@ -34,6 +35,12 @@ class TestShellClient(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
+ super(TestShellClient, cls).add_test_dimensions()
+ # Limit to uncompressed text with default exec options
+ cls.ImpalaTestMatrix.add_dimension(
+ create_uncompressed_text_dimension(cls.get_workload()))
+ cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+ # Run with beeswax and HS2
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_no_strict_dimension())
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index cd53404c2..de81c9761 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -34,8 +34,9 @@ from tests.common.environ import ImpalaTestClusterProperties
from tests.common.impala_service import ImpaladService
from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT
from tests.common.skip import SkipIf
-from tests.common.test_dimensions import create_client_protocol_dimension
-from tests.common.test_dimensions import create_client_protocol_strict_dimension
+from tests.common.test_dimensions import (
+ create_client_protocol_dimension, create_client_protocol_strict_dimension,
+ create_uncompressed_text_dimension, create_single_exec_option_dimension)
from time import sleep, time
from util import (get_impalad_host_port, assert_var_substitution, run_impala_shell_cmd,
ImpalaShell, IMPALA_SHELL_EXECUTABLE, SHELL_IS_PYTHON_2,
@@ -134,6 +135,11 @@ class TestImpalaShell(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
+ super(TestImpalaShell, cls).add_test_dimensions()
+ # Limit to uncompressed text with default exec options
+ cls.ImpalaTestMatrix.add_dimension(
+ create_uncompressed_text_dimension(cls.get_workload()))
+ cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
# Run with both beeswax and HS2 to ensure that behaviour is the same.
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_strict_dimension())
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index 2d1a4aa7a..0a6ad20b4 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -40,8 +40,9 @@ from tempfile import NamedTemporaryFile
from tests.common.impala_service import ImpaladService
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfLocal
-from tests.common.test_dimensions import create_client_protocol_dimension
-from tests.common.test_dimensions import create_client_protocol_strict_dimension
+from tests.common.test_dimensions import (
+ create_client_protocol_dimension, create_client_protocol_strict_dimension,
+ create_uncompressed_text_dimension, create_single_exec_option_dimension)
from tests.shell.util import get_unused_port
from util import (assert_var_substitution, ImpalaShell, get_impalad_port, get_shell_cmd,
get_open_sessions_metric, IMPALA_SHELL_EXECUTABLE, spawn_shell)
@@ -165,6 +166,11 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
+ super(TestImpalaShellInteractive, cls).add_test_dimensions()
+ # Limit to uncompressed text with default exec options
+ cls.ImpalaTestMatrix.add_dimension(
+ create_uncompressed_text_dimension(cls.get_workload()))
+ cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
# Run with both beeswax and HS2 to ensure that behaviour is the same.
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_strict_dimension())
@@ -681,6 +687,8 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
def test_commandline_flag_disable_live_progress(self, vector):
"""Test the command line flag disable_live_progress with live_progress."""
+ if vector.get_value('strict_hs2_protocol'):
+ pytest.skip("Live option not supported in strict hs2 mode.")
# By default, shell option live_progress is set to True in the interactive mode.
cmds = "set all;"
result = run_impala_shell_interactive(vector, cmds)
@@ -704,13 +712,15 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
cmds = "set all;"
# override the default option through command line argument.
- args = ['--strict_h2_protocol']
+ args = ['--strict_hs2_protocol']
result = run_impala_shell_interactive(vector, cmds, shell_args=args)
assert "\tLIVE_PROGRESS: False" in result.stdout
assert "\tLIVE_SUMMARY: False" in result.stdout
def test_live_option_configuration(self, vector):
"""Test the optional configuration file with live_progress and live_summary."""
+ if vector.get_value('strict_hs2_protocol'):
+ pytest.skip("Live option not supported in strict hs2 mode.")
# Positive tests
# set live_summary and live_progress as True with config file
rcfile_path = os.path.join(QUERY_FILE_PATH, 'good_impalarc3')
[impala] 05/05: IMPALA-11344: Missing slots in all cases should be allowed to be read
Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 44dc157a2c10578b82518012aa2e9aa9288dc6e5
Author: ttttttz <24...@qq.com>
AuthorDate: Wed Jun 22 11:53:28 2022 +0800
IMPALA-11344: Missing slots in all cases should be allowed to be read
When selecting only the missing fields of ORC files and the missing fields
contain non-partition fields, the query will fail due to `Parse error in
possibly corrupt ORC file: '$filename'. No columns found for this scan`.
We should allow read missing slots in all cases.
Testing:
- Added a test to test_scanners.py that ensures the query can be
executed successfully when selecting only the missing fields of
ORC files.
Change-Id: I15dca47ba5f7a93bfd5fcba3cab4ac6d64459023
Reviewed-on: http://gerrit.cloudera.org:8080/18652
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/18907
Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
be/src/exec/orc-column-readers.cc | 23 +--------------
tests/query_test/test_scanners.py | 60 +++++++++++++++++++++++++++++++++++++++
2 files changed, 61 insertions(+), 22 deletions(-)
diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc
index 644ac325f..7c9ae072d 100644
--- a/be/src/exec/orc-column-readers.cc
+++ b/be/src/exec/orc-column-readers.cc
@@ -530,28 +530,7 @@ Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
}
int num_rows_read = scratch_batch->num_tuples - scratch_batch_idx;
if (children_.empty()) {
- // We allow empty 'children_' for original files, because we might select the
- // synthetic 'rowid' field which is not present in original files.
- // We also allow empty 'children_' when we need to validate row batches of a zero slot
- // scan. In that case 'children_' is empty and only 'row_validator_' owns an ORC
- // vector batch (the write id batch).
- bool valid_empty_children = scanner_->acid_original_file_ ||
- (scanner_->row_batches_need_validation_ &&
- scanner_->scan_node_->IsZeroSlotTableScan());
- if (!valid_empty_children) {
- bool only_partitions = true;
- for (SlotDescriptor* slot : tuple_desc_->slots()) {
- if (!scanner_->IsPartitionKeySlot(slot)) {
- only_partitions = false;
- break;
- }
- }
- if (!only_partitions) {
- return Status(Substitute("Parse error in possibly corrupt ORC file: '$0'. "
- "No columns found for this scan.",
- scanner_->filename()));
- }
- }
+ // We allow empty 'children_' in all cases.
DCHECK_EQ(0, num_rows_read);
num_rows_read = std::min(scratch_batch->capacity - scratch_batch->num_tuples,
NumElements() - row_idx_);
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 112b53dc0..ca15d43a2 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1734,6 +1734,66 @@ class TestOrc(ImpalaTestSuite):
self.run_test_case('QueryTest/hive2-pre-gregorian-date-orc', vector, unique_database)
+ @SkipIfABFS.hive
+ @SkipIfADLS.hive
+ @SkipIfIsilon.hive
+ @SkipIfLocal.hive
+ @SkipIfS3.hive
+ @SkipIfGCS.hive
+ @SkipIfCOS.hive
+ def test_missing_field_orc(self, unique_database):
+ # Test scanning orc files with missing fields in file meta.
+ orc_tbl_name = unique_database + ".missing_field_orc"
+ self.client.execute("create table %s (f0 int) stored as orc" % orc_tbl_name)
+ self.run_stmt_in_hive("insert into table %s select 1" % orc_tbl_name)
+ self.client.execute("refresh %s" % orc_tbl_name)
+
+ self.client.execute("alter table %s add columns(f1 int)" % orc_tbl_name)
+ result = self.client.execute("select f1 from %s " % orc_tbl_name)
+ assert result.data == ['NULL']
+
+ self.client.execute("alter table %s add columns(f2 STRUCT<s0:STRING, s1:STRING>)"
+ % orc_tbl_name)
+ result = self.client.execute("select f2.s0 from %s " % orc_tbl_name)
+ assert result.data == ['NULL']
+
+ orc_tbl_name = unique_database + ".missing_field_full_txn_test"
+ self.client.execute("create table %s(f0 int) stored as orc "
+ "tblproperties('transactional'='true')" % orc_tbl_name)
+ self.run_stmt_in_hive("insert into %s values(0)" % orc_tbl_name)
+ self.run_stmt_in_hive("alter table %s add columns(f1 int)" % orc_tbl_name)
+ self.run_stmt_in_hive("insert into %s values(1,1)" % orc_tbl_name)
+ self.client.execute("refresh %s" % orc_tbl_name)
+ result = self.client.execute("select f1 from %s" % orc_tbl_name)
+ assert len(result.data) == 2
+ assert '1' in result.data
+ assert 'NULL' in result.data
+
+ # TODO: add a test case for Iceberg tables once IMPALA-10542 is done.
+ # orc_tbl_name = unique_database + ".missing_field_iceberg_test"
+ # self.client.execute("create table %s (f0 int) stored as iceberg "
+ # "tblproperties('write.format.default' = 'orc')"
+ # % orc_tbl_name)
+ # self.run_stmt_in_hive("insert into %s values(0)" % orc_tbl_name)
+ # self.run_stmt_in_hive("alter table %s add columns(f1 int)" % orc_tbl_name)
+ # self.run_stmt_in_hive("insert into %s values(1,1)" % orc_tbl_name)
+ # self.client.execute("refresh %s" % orc_tbl_name)
+ # result = self.client.execute("select f1 from %s" % orc_tbl_name)
+ # assert len(result.data) == 2
+ # assert '1' in result.data
+ # assert 'NULL' in result.data
+
+ orc_tbl_name = unique_database + ".lineitem_orc_ext"
+ test_file = "/test-warehouse/tpch.lineitem_orc_def"
+ create_sql = "create external table %s like tpch_orc_def.lineitem " \
+ "location '%s'" % (orc_tbl_name, test_file)
+ self.client.execute(create_sql)
+ self.client.execute("alter table %s add columns (new_col int)" % orc_tbl_name)
+ result = self.execute_query("select count(*) from %s where new_col is null"
+ % orc_tbl_name)
+ assert len(result.data) == 1
+ assert '6001215' in result.data
+
class TestScannerReservation(ImpalaTestSuite):
@classmethod
[impala] 02/05: IMPALA-9410: Support resolving ORC file columns by names
Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 051c59bd80dfb8291dd4ef61419db7db2c41cab4
Author: Gergely Fürnstáhl <gf...@cloudera.com>
AuthorDate: Wed Apr 20 10:34:27 2022 +0200
IMPALA-9410: Support resolving ORC file columns by names
Added query option and implementation to be able to resolve columns by
names.
Changed secondary resolution strategy for iceberg orc tables to name
based resolution.
Testing:
Added new test dimension for orc tests, added results to now working
iceberg migrated table test
Change-Id: I29562a059160c19eb58ccea76aa959d2e408f8de
Reviewed-on: http://gerrit.cloudera.org:8080/18397
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/18906
Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
Tested-by: Quanlong Huang <hu...@gmail.com>
---
be/src/exec/hdfs-orc-scanner.cc | 5 +-
be/src/exec/orc-metadata-utils.cc | 93 ++++-
be/src/exec/orc-metadata-utils.h | 11 +-
be/src/service/query-options.cc | 7 +
be/src/service/query-options.h | 462 ++++++++++-----------
common/thrift/ImpalaService.thrift | 3 +
common/thrift/Query.thrift | 3 +
.../queries/QueryTest/iceberg-migrated-tables.test | 8 +-
tests/common/test_dimensions.py | 7 +
tests/query_test/test_nested_types.py | 23 +-
tests/query_test/test_scanners.py | 1 +
11 files changed, 378 insertions(+), 245 deletions(-)
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index cb02dac52..f6643e9b4 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -357,8 +357,9 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
RETURN_IF_ERROR(footer_status);
bool is_table_full_acid = scan_node_->hdfs_table()->IsTableFullAcid();
- schema_resolver_.reset(new OrcSchemaResolver(*scan_node_->hdfs_table(),
- &reader_->getType(), filename(), is_table_full_acid));
+ schema_resolver_.reset(
+ new OrcSchemaResolver(*scan_node_->hdfs_table(), &reader_->getType(), filename(),
+ is_table_full_acid, state_->query_options().orc_schema_resolution));
bool is_file_full_acid = schema_resolver_->HasFullAcidV2Schema();
acid_original_file_ = is_table_full_acid && !is_file_full_acid;
if (is_table_full_acid) {
diff --git a/be/src/exec/orc-metadata-utils.cc b/be/src/exec/orc-metadata-utils.cc
index 890db5792..b08e5c61f 100644
--- a/be/src/exec/orc-metadata-utils.cc
+++ b/be/src/exec/orc-metadata-utils.cc
@@ -37,16 +37,21 @@ inline int GetFieldIdFromStr(const std::string& str) {
}
OrcSchemaResolver::OrcSchemaResolver(const HdfsTableDescriptor& tbl_desc,
- const orc::Type* root, const char* filename, bool is_table_acid) :
- tbl_desc_(tbl_desc), root_(root), filename_(filename),
+ const orc::Type* root, const char* filename, bool is_table_acid,
+ TSchemaResolutionStrategy::type schema_resolution)
+ : schema_resolution_strategy_(schema_resolution),
+ tbl_desc_(tbl_desc),
+ root_(root),
+ filename_(filename),
is_table_full_acid_(is_table_acid) {
DetermineFullAcidSchema();
- schema_resolution_strategy_ = TSchemaResolutionStrategy::POSITION;
if (tbl_desc_.IsIcebergTable() && root_->getSubtypeCount() > 0) {
// Use FIELD_ID-based column resolution for Iceberg tables if possible.
const orc::Type* first_child = root_->getSubtype(0);
if (first_child->hasAttributeKey(ICEBERG_FIELD_ID)) {
schema_resolution_strategy_ = TSchemaResolutionStrategy::FIELD_ID;
+ } else {
+ schema_resolution_strategy_ = TSchemaResolutionStrategy::NAME;
}
}
}
@@ -55,6 +60,8 @@ Status OrcSchemaResolver::ResolveColumn(const SchemaPath& col_path,
const orc::Type** node, bool* pos_field, bool* missing_field) const {
if (schema_resolution_strategy_ == TSchemaResolutionStrategy::POSITION) {
return ResolveColumnByPosition(col_path, node, pos_field, missing_field);
+ } else if (schema_resolution_strategy_ == TSchemaResolutionStrategy::NAME) {
+ return ResolveColumnByName(col_path, node, pos_field, missing_field);
} else if (schema_resolution_strategy_ == TSchemaResolutionStrategy::FIELD_ID) {
return ResolveColumnByIcebergFieldId(col_path, node, pos_field, missing_field);
} else {
@@ -162,6 +169,86 @@ Status OrcSchemaResolver::ValidateMap(const ColumnType& type,
return Status::OK();
}
+Status OrcSchemaResolver::ResolveColumnByName(const SchemaPath& col_path,
+ const orc::Type** node, bool* pos_field, bool* missing_field) const {
+ const ColumnType* table_col_type = nullptr;
+ *node = root_;
+ *pos_field = false;
+ *missing_field = false;
+ if (col_path.empty()) return Status::OK();
+ SchemaPath table_path, file_path;
+ TranslateColPaths(col_path, &table_path, &file_path);
+
+ int i = 0;
+
+ // Resolve table and file ACID differences
+ int table_idx = table_path[i];
+ int file_idx = file_path[i];
+ if (table_idx == -1 || file_idx == -1) {
+ DCHECK_NE(table_idx, file_idx);
+ if (table_idx == -1) {
+ DCHECK_EQ(*node, root_);
+ *node = (*node)->getSubtype(file_idx);
+ } else {
+ DCHECK(table_col_type == nullptr);
+ table_col_type = &tbl_desc_.col_descs()[table_idx].type();
+ }
+ i++;
+ }
+
+ for (; i < table_path.size(); ++i) {
+ table_idx = table_path[i];
+ if (table_col_type == nullptr) {
+ // non ACID table, or top level user column in ACID table
+ table_col_type = &tbl_desc_.col_descs()[table_idx].type();
+ const std::string& name = tbl_desc_.col_descs()[table_idx].name();
+ *node = FindChildWithName(*node, name);
+ if (*node == nullptr) {
+ *missing_field = true;
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(ValidateType(*table_col_type, **node, table_path, i));
+ continue;
+ } else if (table_col_type->type == TYPE_STRUCT) {
+ // Resolve struct field by name.
+ DCHECK_LT(table_idx, table_col_type->field_names.size());
+ const std::string& name = table_col_type->field_names[table_idx];
+ *node = FindChildWithName(*node, name);
+ } else if (table_col_type->type == TYPE_ARRAY) {
+ if (table_idx == SchemaPathConstants::ARRAY_POS) {
+ *pos_field = true;
+ break; // return *node as the ARRAY node
+ }
+ DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM);
+ *node = (*(node))->getSubtype(table_idx);
+ } else if (table_col_type->type == TYPE_MAP) {
+ DCHECK(table_idx == SchemaPathConstants::MAP_KEY
+ || table_idx == SchemaPathConstants::MAP_VALUE);
+ // At this point we've found a MAP with a matching name. It's safe to resolve
+ // the child (key or value) by position.
+ *node = (*(node))->getSubtype(table_idx);
+ }
+ if (*node == nullptr) {
+ *missing_field = true;
+ return Status::OK();
+ }
+ table_col_type = &table_col_type->children[table_idx];
+ RETURN_IF_ERROR(ValidateType(*table_col_type, **node, table_path, i));
+ }
+ return Status::OK();
+}
+
+const orc::Type* OrcSchemaResolver::FindChildWithName(
+ const orc::Type* node, const std::string& name) const {
+ for (int i = 0; i < node->getSubtypeCount(); ++i) {
+ const orc::Type* child = node->getSubtype(i);
+ DCHECK(child != nullptr);
+ const std::string& fieldName = node->getFieldName(i);
+ if (iequals(fieldName, name)) return child;
+ }
+ return nullptr;
+}
+
Status OrcSchemaResolver::ResolveColumnByIcebergFieldId(const SchemaPath& col_path,
const orc::Type** node, bool* pos_field, bool* missing_field) const {
const ColumnType* table_col_type = nullptr;
diff --git a/be/src/exec/orc-metadata-utils.h b/be/src/exec/orc-metadata-utils.h
index af23afb11..aaab7d46f 100644
--- a/be/src/exec/orc-metadata-utils.h
+++ b/be/src/exec/orc-metadata-utils.h
@@ -42,7 +42,8 @@ constexpr int CURRENT_TRANSCACTION_TYPE_ID = 5;
class OrcSchemaResolver {
public:
OrcSchemaResolver(const HdfsTableDescriptor& tbl_desc, const orc::Type* root,
- const char* filename, bool is_table_acid);
+ const char* filename, bool is_table_acid,
+ TSchemaResolutionStrategy::type schema_resolution);
/// Resolve SchemaPath into orc::Type (ORC column representation)
/// 'pos_field' is set to true if 'col_path' reference the index field of an array
@@ -94,12 +95,20 @@ class OrcSchemaResolver {
Status ResolveColumnByPosition(const SchemaPath& col_path, const orc::Type** node,
bool* pos_field, bool* missing_field) const;
+ /// Resolve column based on name.
+ Status ResolveColumnByName(const SchemaPath& col_path, const orc::Type** node,
+ bool* pos_field, bool* missing_field) const;
+
/// Resolve column based on the Iceberg field ids. This way we will retrieve the
/// Iceberg field ids from the HMS table via 'col_path', then find the corresponding
/// field in the ORC file.
Status ResolveColumnByIcebergFieldId(const SchemaPath& col_path, const orc::Type** node,
bool* pos_field, bool* missing_field) const;
+ /// Finds child of 'node' whose column name matches to provided 'name'.
+ const orc::Type* FindChildWithName(
+ const orc::Type* node, const std::string& name) const;
+
/// Finds child of 'node' that has Iceberg field id equals to 'field_id'.
const orc::Type* FindChildWithFieldId(const orc::Type* node, const int field_id) const;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index c5a1206ff..20105b5e3 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1222,6 +1222,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_test_replan(IsTrue(value));
break;
}
+ case TImpalaQueryOptions::ORC_SCHEMA_RESOLUTION: {
+ TSchemaResolutionStrategy::type enum_type;
+ RETURN_IF_ERROR(GetThriftEnum(value, "orc schema resolution",
+ _TSchemaResolutionStrategy_VALUES_TO_NAMES, &enum_type));
+ query_options->__set_orc_schema_resolution(enum_type);
+ break;
+ }
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index fa9691b2c..48444a4f9 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -48,241 +48,233 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// option in the enum TImpalaQueryOptions (defined in ImpalaService.thrift)
// plus one. Thus, the second argument to the DCHECK has to be updated every
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
-#define QUERY_OPTS_TABLE\
- DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
- TImpalaQueryOptions::LOCK_MAX_WAIT_TIME_S + 1);\
- REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
- QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
- REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
- QUERY_OPT_FN(batch_size, BATCH_SIZE, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(debug_action, DEBUG_ACTION, TQueryOptionLevel::DEVELOPMENT)\
- REMOVED_QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT)\
- REMOVED_QUERY_OPT_FN(disable_cached_reads, DISABLE_CACHED_READS)\
- QUERY_OPT_FN(disable_outermost_topn, DISABLE_OUTERMOST_TOPN,\
- TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(disable_codegen, DISABLE_CODEGEN, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(explain_level, EXPLAIN_LEVEL, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(hbase_cache_blocks, HBASE_CACHE_BLOCKS, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(hbase_caching, HBASE_CACHING, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(max_errors, MAX_ERRORS, TQueryOptionLevel::ADVANCED)\
- REMOVED_QUERY_OPT_FN(max_io_buffers, MAX_IO_BUFFERS)\
- QUERY_OPT_FN(max_scan_range_length, MAX_SCAN_RANGE_LENGTH,\
- TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(mem_limit, MEM_LIMIT, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(num_nodes, NUM_NODES, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(num_scanner_threads, NUM_SCANNER_THREADS, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(compression_codec, COMPRESSION_CODEC, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(parquet_file_size, PARQUET_FILE_SIZE, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(request_pool, REQUEST_POOL, TQueryOptionLevel::REGULAR)\
- REMOVED_QUERY_OPT_FN(reservation_request_timeout, RESERVATION_REQUEST_TIMEOUT)\
- QUERY_OPT_FN(sync_ddl, SYNC_DDL, TQueryOptionLevel::REGULAR)\
- REMOVED_QUERY_OPT_FN(v_cpu_cores, V_CPU_CORES)\
- REMOVED_QUERY_OPT_FN(rm_initial_mem, RM_INITIAL_MEM)\
- QUERY_OPT_FN(query_timeout_s, QUERY_TIMEOUT_S, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS, TQueryOptionLevel::REGULAR)\
- REMOVED_QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\
- QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(replica_preference, REPLICA_PREFERENCE, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(schedule_random_replica, SCHEDULE_RANDOM_REPLICA,\
- TQueryOptionLevel::ADVANCED)\
- REMOVED_QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD)\
- QUERY_OPT_FN(disable_streaming_preaggregations, DISABLE_STREAMING_PREAGGREGATIONS,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(runtime_filter_mode, RUNTIME_FILTER_MODE, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(runtime_bloom_filter_size, RUNTIME_BLOOM_FILTER_SIZE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(runtime_filter_wait_time_ms, RUNTIME_FILTER_WAIT_TIME_MS,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(disable_row_runtime_filtering, DISABLE_ROW_RUNTIME_FILTERING,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(mt_dop, MT_DOP, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(strict_mode, STRICT_MODE, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(scratch_limit, SCRATCH_LIMIT, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(enable_expr_rewrites, ENABLE_EXPR_REWRITES, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(enable_cnf_rewrites, ENABLE_CNF_REWRITES, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(decimal_v2, DECIMAL_V2, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(parquet_dictionary_filtering, PARQUET_DICTIONARY_FILTERING,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(parquet_array_resolution, PARQUET_ARRAY_RESOLUTION,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(parquet_read_statistics, PARQUET_READ_STATISTICS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(disable_codegen_rows_threshold, DISABLE_CODEGEN_ROWS_THRESHOLD,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(default_spillable_buffer_size, DEFAULT_SPILLABLE_BUFFER_SIZE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(min_spillable_buffer_size, MIN_SPILLABLE_BUFFER_SIZE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(max_row_size, MAX_ROW_SIZE, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(idle_session_timeout, IDLE_SESSION_TIMEOUT, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(compute_stats_min_sample_size, COMPUTE_STATS_MIN_SAMPLE_SIZE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(exec_time_limit_s, EXEC_TIME_LIMIT_S, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(shuffle_distinct_exprs, SHUFFLE_DISTINCT_EXPRS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(max_mem_estimate_for_admission, MAX_MEM_ESTIMATE_FOR_ADMISSION,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(thread_reservation_limit, THREAD_RESERVATION_LIMIT,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(thread_reservation_aggregate_limit, THREAD_RESERVATION_AGGREGATE_LIMIT,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(kudu_read_mode, KUDU_READ_MODE, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(allow_erasure_coded_files, ALLOW_ERASURE_CODED_FILES,\
- TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(timezone, TIMEZONE, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(scan_bytes_limit, SCAN_BYTES_LIMIT,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(topn_bytes_limit, TOPN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(client_identifier, CLIENT_IDENTIFIER, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(resource_trace_ratio, RESOURCE_TRACE_RATIO, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(num_remote_executor_candidates, NUM_REMOTE_EXECUTOR_CANDIDATES,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(num_rows_produced_limit, NUM_ROWS_PRODUCED_LIMIT,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(\
- planner_testcase_mode, PLANNER_TESTCASE_MODE, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(default_file_format, DEFAULT_FILE_FORMAT, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(parquet_timestamp_type, PARQUET_TIMESTAMP_TYPE,\
- TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(parquet_read_page_index, PARQUET_READ_PAGE_INDEX,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(parquet_write_page_index, PARQUET_WRITE_PAGE_INDEX,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(parquet_page_row_count_limit, PARQUET_PAGE_ROW_COUNT_LIMIT,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(disable_hdfs_num_rows_estimate, DISABLE_HDFS_NUM_ROWS_ESTIMATE,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(default_hints_insert_statement, DEFAULT_HINTS_INSERT_STATEMENT,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(spool_query_results, SPOOL_QUERY_RESULTS,\
- TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(default_transactional_type, DEFAULT_TRANSACTIONAL_TYPE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(statement_expression_limit, STATEMENT_EXPRESSION_LIMIT,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(max_statement_length_bytes, MAX_STATEMENT_LENGTH_BYTES,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(disable_data_cache, DISABLE_DATA_CACHE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(max_result_spooling_mem, MAX_RESULT_SPOOLING_MEM,\
- TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(max_spilled_result_spooling_mem, MAX_SPILLED_RESULT_SPOOLING_MEM,\
- TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(disable_hbase_num_rows_estimate, DISABLE_HBASE_NUM_ROWS_ESTIMATE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(fetch_rows_timeout_ms, FETCH_ROWS_TIMEOUT_MS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(now_string, NOW_STRING, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(preagg_bytes_limit, PREAGG_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(max_cnf_exprs, MAX_CNF_EXPRS, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(kudu_snapshot_read_timestamp_micros, KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(retry_failed_queries, RETRY_FAILED_QUERIES, TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(enabled_runtime_filter_types, ENABLED_RUNTIME_FILTER_TYPES,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(async_codegen, ASYNC_CODEGEN, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(enable_distinct_semi_join_optimization,\
- ENABLE_DISTINCT_SEMI_JOIN_OPTIMIZATION, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(sort_run_bytes_limit, SORT_RUN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(max_fs_writers, MAX_FS_WRITERS, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(refresh_updated_hms_partitions,\
- REFRESH_UPDATED_HMS_PARTITIONS, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(spool_all_results_for_retries, SPOOL_ALL_RESULTS_FOR_RETRIES,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(runtime_filter_error_rate, RUNTIME_FILTER_ERROR_RATE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(use_local_tz_for_unix_timestamp_conversions,\
- USE_LOCAL_TZ_FOR_UNIX_TIMESTAMP_CONVERSIONS, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(convert_legacy_hive_parquet_utc_timestamps,\
- CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(enable_outer_join_to_inner_transformation,\
- ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(targeted_kudu_scan_range_length, TARGETED_KUDU_SCAN_RANGE_LENGTH,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(report_skew_limit, REPORT_SKEW_LIMIT,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(optimize_simple_limit, OPTIMIZE_SIMPLE_LIMIT,\
- TQueryOptionLevel::REGULAR)\
- QUERY_OPT_FN(use_dop_for_costing, USE_DOP_FOR_COSTING,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(broadcast_to_partition_factor, BROADCAST_TO_PARTITION_FACTOR,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(join_rows_produced_limit, JOIN_ROWS_PRODUCED_LIMIT,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(utf8_mode, UTF8_MODE, TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(analytic_rank_pushdown_threshold,\
- ANALYTIC_RANK_PUSHDOWN_THRESHOLD, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(minmax_filter_threshold, MINMAX_FILTER_THRESHOLD,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(minmax_filtering_level, MINMAX_FILTERING_LEVEL,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(compute_column_minmax_stats, COMPUTE_COLUMN_MINMAX_STATS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(show_column_minmax_stats, SHOW_COLUMN_MINMAX_STATS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(default_ndv_scale, DEFAULT_NDV_SCALE, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(kudu_replica_selection, KUDU_REPLICA_SELECTION,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(delete_stats_in_truncate, DELETE_STATS_IN_TRUNCATE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(parquet_bloom_filtering, PARQUET_BLOOM_FILTERING,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(minmax_filter_sorted_columns, MINMAX_FILTER_SORTED_COLUMNS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(minmax_filter_fast_code_path, MINMAX_FILTER_FAST_CODE_PATH,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(enable_kudu_transaction, ENABLE_KUDU_TRANSACTION,\
- TQueryOptionLevel::DEVELOPMENT)\
- QUERY_OPT_FN(minmax_filter_partition_columns, MINMAX_FILTER_PARTITION_COLUMNS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(parquet_bloom_filter_write, PARQUET_BLOOM_FILTER_WRITE,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(orc_read_statistics, ORC_READ_STATISTICS,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(enable_async_ddl_execution, ENABLE_ASYNC_DDL_EXECUTION,\
- TQueryOptionLevel::ADVANCED) \
- QUERY_OPT_FN(enable_async_load_data_execution, ENABLE_ASYNC_LOAD_DATA_EXECUTION,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(parquet_late_materialization_threshold,\
- PARQUET_LATE_MATERIALIZATION_THRESHOLD, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(parquet_dictionary_runtime_filter_entry_limit,\
- PARQUET_DICTIONARY_RUNTIME_FILTER_ENTRY_LIMIT, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(abort_java_udf_on_exception,\
- ABORT_JAVA_UDF_ON_EXCEPTION, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(orc_async_read, ORC_ASYNC_READ, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(runtime_in_list_filter_entry_limit,\
- RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT, TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(enable_replan, ENABLE_REPLAN,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(test_replan, TEST_REPLAN,\
- TQueryOptionLevel::ADVANCED)\
- QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S, TQueryOptionLevel::REGULAR)\
- ;
+#define QUERY_OPTS_TABLE \
+ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \
+ TImpalaQueryOptions::ORC_SCHEMA_RESOLUTION + 1); \
+ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
+ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \
+ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \
+ QUERY_OPT_FN(batch_size, BATCH_SIZE, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(debug_action, DEBUG_ACTION, TQueryOptionLevel::DEVELOPMENT) \
+ REMOVED_QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT) \
+ REMOVED_QUERY_OPT_FN(disable_cached_reads, DISABLE_CACHED_READS) \
+ QUERY_OPT_FN( \
+ disable_outermost_topn, DISABLE_OUTERMOST_TOPN, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(disable_codegen, DISABLE_CODEGEN, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(explain_level, EXPLAIN_LEVEL, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(hbase_cache_blocks, HBASE_CACHE_BLOCKS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(hbase_caching, HBASE_CACHING, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(max_errors, MAX_ERRORS, TQueryOptionLevel::ADVANCED) \
+ REMOVED_QUERY_OPT_FN(max_io_buffers, MAX_IO_BUFFERS) \
+ QUERY_OPT_FN( \
+ max_scan_range_length, MAX_SCAN_RANGE_LENGTH, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(mem_limit, MEM_LIMIT, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(num_nodes, NUM_NODES, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(num_scanner_threads, NUM_SCANNER_THREADS, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(compression_codec, COMPRESSION_CODEC, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(parquet_file_size, PARQUET_FILE_SIZE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(request_pool, REQUEST_POOL, TQueryOptionLevel::REGULAR) \
+ REMOVED_QUERY_OPT_FN(reservation_request_timeout, RESERVATION_REQUEST_TIMEOUT) \
+ QUERY_OPT_FN(sync_ddl, SYNC_DDL, TQueryOptionLevel::REGULAR) \
+ REMOVED_QUERY_OPT_FN(v_cpu_cores, V_CPU_CORES) \
+ REMOVED_QUERY_OPT_FN(rm_initial_mem, RM_INITIAL_MEM) \
+ QUERY_OPT_FN(query_timeout_s, QUERY_TIMEOUT_S, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS, TQueryOptionLevel::REGULAR) \
+ REMOVED_QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE) \
+ QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(replica_preference, REPLICA_PREFERENCE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ schedule_random_replica, SCHEDULE_RANDOM_REPLICA, TQueryOptionLevel::ADVANCED) \
+ REMOVED_QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD) \
+ QUERY_OPT_FN(disable_streaming_preaggregations, DISABLE_STREAMING_PREAGGREGATIONS, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(runtime_filter_mode, RUNTIME_FILTER_MODE, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN( \
+ runtime_bloom_filter_size, RUNTIME_BLOOM_FILTER_SIZE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(runtime_filter_wait_time_ms, RUNTIME_FILTER_WAIT_TIME_MS, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(disable_row_runtime_filtering, DISABLE_ROW_RUNTIME_FILTERING, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN( \
+ max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(mt_dop, MT_DOP, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN( \
+ s3_skip_insert_staging, S3_SKIP_INSERT_STAGING, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN( \
+ runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(strict_mode, STRICT_MODE, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(scratch_limit, SCRATCH_LIMIT, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(enable_expr_rewrites, ENABLE_EXPR_REWRITES, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(enable_cnf_rewrites, ENABLE_CNF_REWRITES, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(decimal_v2, DECIMAL_V2, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(parquet_dictionary_filtering, PARQUET_DICTIONARY_FILTERING, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ parquet_array_resolution, PARQUET_ARRAY_RESOLUTION, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN( \
+ parquet_read_statistics, PARQUET_READ_STATISTICS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(disable_codegen_rows_threshold, DISABLE_CODEGEN_ROWS_THRESHOLD, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(default_spillable_buffer_size, DEFAULT_SPILLABLE_BUFFER_SIZE, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ min_spillable_buffer_size, MIN_SPILLABLE_BUFFER_SIZE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(max_row_size, MAX_ROW_SIZE, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(idle_session_timeout, IDLE_SESSION_TIMEOUT, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(compute_stats_min_sample_size, COMPUTE_STATS_MIN_SAMPLE_SIZE, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(exec_time_limit_s, EXEC_TIME_LIMIT_S, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN( \
+ shuffle_distinct_exprs, SHUFFLE_DISTINCT_EXPRS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(max_mem_estimate_for_admission, MAX_MEM_ESTIMATE_FOR_ADMISSION, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ thread_reservation_limit, THREAD_RESERVATION_LIMIT, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(thread_reservation_aggregate_limit, THREAD_RESERVATION_AGGREGATE_LIMIT, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(kudu_read_mode, KUDU_READ_MODE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(allow_erasure_coded_files, ALLOW_ERASURE_CODED_FILES, \
+ TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(timezone, TIMEZONE, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(scan_bytes_limit, SCAN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(topn_bytes_limit, TOPN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(client_identifier, CLIENT_IDENTIFIER, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(resource_trace_ratio, RESOURCE_TRACE_RATIO, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(num_remote_executor_candidates, NUM_REMOTE_EXECUTOR_CANDIDATES, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ num_rows_produced_limit, NUM_ROWS_PRODUCED_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ planner_testcase_mode, PLANNER_TESTCASE_MODE, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(default_file_format, DEFAULT_FILE_FORMAT, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN( \
+ parquet_timestamp_type, PARQUET_TIMESTAMP_TYPE, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN( \
+ parquet_read_page_index, PARQUET_READ_PAGE_INDEX, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ parquet_write_page_index, PARQUET_WRITE_PAGE_INDEX, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(parquet_page_row_count_limit, PARQUET_PAGE_ROW_COUNT_LIMIT, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(disable_hdfs_num_rows_estimate, DISABLE_HDFS_NUM_ROWS_ESTIMATE, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(default_hints_insert_statement, DEFAULT_HINTS_INSERT_STATEMENT, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(spool_query_results, SPOOL_QUERY_RESULTS, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(default_transactional_type, DEFAULT_TRANSACTIONAL_TYPE, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(statement_expression_limit, STATEMENT_EXPRESSION_LIMIT, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(max_statement_length_bytes, MAX_STATEMENT_LENGTH_BYTES, \
+ TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(disable_data_cache, DISABLE_DATA_CACHE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ max_result_spooling_mem, MAX_RESULT_SPOOLING_MEM, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(max_spilled_result_spooling_mem, MAX_SPILLED_RESULT_SPOOLING_MEM, \
+ TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(disable_hbase_num_rows_estimate, DISABLE_HBASE_NUM_ROWS_ESTIMATE, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ fetch_rows_timeout_ms, FETCH_ROWS_TIMEOUT_MS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(now_string, NOW_STRING, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN( \
+ broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(preagg_bytes_limit, PREAGG_BYTES_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(max_cnf_exprs, MAX_CNF_EXPRS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(kudu_snapshot_read_timestamp_micros, KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(retry_failed_queries, RETRY_FAILED_QUERIES, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(enabled_runtime_filter_types, ENABLED_RUNTIME_FILTER_TYPES, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(async_codegen, ASYNC_CODEGEN, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(enable_distinct_semi_join_optimization, \
+ ENABLE_DISTINCT_SEMI_JOIN_OPTIMIZATION, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(sort_run_bytes_limit, SORT_RUN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(max_fs_writers, MAX_FS_WRITERS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(refresh_updated_hms_partitions, REFRESH_UPDATED_HMS_PARTITIONS, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(spool_all_results_for_retries, SPOOL_ALL_RESULTS_FOR_RETRIES, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ runtime_filter_error_rate, RUNTIME_FILTER_ERROR_RATE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(use_local_tz_for_unix_timestamp_conversions, \
+ USE_LOCAL_TZ_FOR_UNIX_TIMESTAMP_CONVERSIONS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(convert_legacy_hive_parquet_utc_timestamps, \
+ CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(enable_outer_join_to_inner_transformation, \
+ ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(targeted_kudu_scan_range_length, TARGETED_KUDU_SCAN_RANGE_LENGTH, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(report_skew_limit, REPORT_SKEW_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(optimize_simple_limit, OPTIMIZE_SIMPLE_LIMIT, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(use_dop_for_costing, USE_DOP_FOR_COSTING, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(broadcast_to_partition_factor, BROADCAST_TO_PARTITION_FACTOR, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ join_rows_produced_limit, JOIN_ROWS_PRODUCED_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(utf8_mode, UTF8_MODE, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(analytic_rank_pushdown_threshold, ANALYTIC_RANK_PUSHDOWN_THRESHOLD, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ minmax_filter_threshold, MINMAX_FILTER_THRESHOLD, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ minmax_filtering_level, MINMAX_FILTERING_LEVEL, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(compute_column_minmax_stats, COMPUTE_COLUMN_MINMAX_STATS, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ show_column_minmax_stats, SHOW_COLUMN_MINMAX_STATS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(default_ndv_scale, DEFAULT_NDV_SCALE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ kudu_replica_selection, KUDU_REPLICA_SELECTION, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ delete_stats_in_truncate, DELETE_STATS_IN_TRUNCATE, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ parquet_bloom_filtering, PARQUET_BLOOM_FILTERING, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(minmax_filter_sorted_columns, MINMAX_FILTER_SORTED_COLUMNS, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(minmax_filter_fast_code_path, MINMAX_FILTER_FAST_CODE_PATH, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN( \
+ enable_kudu_transaction, ENABLE_KUDU_TRANSACTION, TQueryOptionLevel::DEVELOPMENT) \
+ QUERY_OPT_FN(minmax_filter_partition_columns, MINMAX_FILTER_PARTITION_COLUMNS, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(parquet_bloom_filter_write, PARQUET_BLOOM_FILTER_WRITE, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(orc_read_statistics, ORC_READ_STATISTICS, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(enable_async_ddl_execution, ENABLE_ASYNC_DDL_EXECUTION, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(enable_async_load_data_execution, ENABLE_ASYNC_LOAD_DATA_EXECUTION, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(parquet_late_materialization_threshold, \
+ PARQUET_LATE_MATERIALIZATION_THRESHOLD, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(parquet_dictionary_runtime_filter_entry_limit, \
+ PARQUET_DICTIONARY_RUNTIME_FILTER_ENTRY_LIMIT, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(abort_java_udf_on_exception, ABORT_JAVA_UDF_ON_EXCEPTION, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(orc_async_read, ORC_ASYNC_READ, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(runtime_in_list_filter_entry_limit, RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT, \
+ TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(enable_replan, ENABLE_REPLAN, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(test_replan, TEST_REPLAN, TQueryOptionLevel::ADVANCED) \
+ QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S, TQueryOptionLevel::REGULAR) \
+ QUERY_OPT_FN(orc_schema_resolution, ORC_SCHEMA_RESOLUTION, TQueryOptionLevel::REGULAR);
/// Enforce practical limits on some query options to avoid undesired query state.
static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 8a118d241..60844d6c6 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -733,6 +733,9 @@ enum TImpalaQueryOptions {
// Maximum wait time on HMS ACID lock in seconds.
LOCK_MAX_WAIT_TIME_S = 145
+
+ // Determines how to resolve ORC files' schemas. Valid values are "position" and "name".
+ ORC_SCHEMA_RESOLUTION = 146;
}
// The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 4d45ad1c8..3e4726c14 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -591,6 +591,9 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift
146: optional i32 lock_max_wait_time_s = 300
+
+ // See comment in ImpalaService.thrift
+ 147: optional TSchemaResolutionStrategy orc_schema_resolution = 0;
}
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
index db40813e5..d1549c529 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
@@ -74,10 +74,12 @@ BIGINT, DOUBLE, DECIMAL
# * Partition FLOAT column to DOUBLE
# * Partition DECIMAL(5,3) column to DECIMAL(8,3)
# * Non-partition column has been moved to end of the schema
-# Currently this fails due to IMPALA-9410
select * from functional_parquet.iceberg_legacy_partition_schema_evolution_orc
----- CATCH
-Parse error in possibly corrupt ORC file
+---- RESULTS
+1,1.100000023841858,2.718,2
+1,1.100000023841858,3.141,1
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
====
---- QUERY
# Read only the partition columns.
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 4ac912b47..5a5788aac 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -164,6 +164,13 @@ def hs2_text_constraint(v):
v.get_value('table_format').file_format == 'text' and
v.get_value('table_format').compression_codec == 'none')
+
+def orc_schema_resolution_constraint(v):
+ """ Constraint to use multiple orc_schema_resolution only in case of orc files"""
+ file_format = v.get_value('table_format').file_format
+ orc_schema_resolution = v.get_value('orc_schema_resolution')
+ return file_format == 'orc' or orc_schema_resolution == 0
+
# Common sets of values for the exec option vectors
ALL_BATCH_SIZES = [0]
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index ef64e1181..23e46e6e5 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -37,7 +37,7 @@ from tests.common.skip import (
)
from tests.common.test_dimensions import (create_exec_option_dimension,
create_exec_option_dimension_from_dict, create_client_protocol_dimension,
- create_orc_dimension)
+ create_orc_dimension, orc_schema_resolution_constraint)
from tests.common.test_vector import ImpalaTestDimension
from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_HDFS
@@ -48,6 +48,13 @@ class TestNestedTypes(ImpalaTestSuite):
def get_workload(self):
return 'functional-query'
+ @staticmethod
+ def orc_schema_resolution_constraint(vector):
+ """ Constraint to use multiple orc_schema_resolution only in case of orc files"""
+ file_format = vector.get_value('table_format').file_format
+ orc_schema_resolution = vector.get_value('orc_schema_resolution')
+ return file_format == 'orc' or orc_schema_resolution == 0
+
@classmethod
def add_test_dimensions(cls):
super(TestNestedTypes, cls).add_test_dimensions()
@@ -55,6 +62,8 @@ class TestNestedTypes(ImpalaTestSuite):
v.get_value('table_format').file_format in ['parquet', 'orc'])
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('mt_dop', 0, 2))
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+ cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
def test_scanner_basic(self, vector):
"""Queries that do not materialize arrays."""
@@ -135,6 +144,8 @@ class TestNestedStructsInSelectList(ImpalaTestSuite):
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('protocol') == 'hs2')
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+ cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
def test_struct_in_select_list(self, vector, unique_database):
"""Queries where a struct column is in the select list"""
@@ -170,6 +181,8 @@ class TestNestedTArraysInSelectList(ImpalaTestSuite):
create_exec_option_dimension_from_dict({
'disable_codegen': ['False', 'True']}))
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+ cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
def test_array_in_select_list(self, vector, unique_database):
"""Queries where an array column is in the select list"""
@@ -215,6 +228,8 @@ class TestComputeStatsWithNestedTypes(ImpalaTestSuite):
super(TestComputeStatsWithNestedTypes, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ['parquet', 'orc'])
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+ cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
def test_compute_stats_with_structs(self, vector):
"""COMPUTE STATS and SHOW COLUMN STATS for tables with structs"""
@@ -232,6 +247,8 @@ class TestZippingUnnest(ImpalaTestSuite):
super(TestZippingUnnest, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ['parquet', 'orc'])
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+ cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
def test_zipping_unnest_in_from_clause(self, vector):
"""Queries where zipping unnest is executed by providing UNNEST() in the from clause.
@@ -261,6 +278,8 @@ class TestNestedTypesNoMtDop(ImpalaTestSuite):
super(TestNestedTypesNoMtDop, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ['parquet', 'orc'])
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+ cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
def test_tpch(self, vector):
"""Queries over the larger nested TPCH dataset."""
@@ -818,6 +837,8 @@ class TestMaxNestingDepth(ImpalaTestSuite):
super(TestMaxNestingDepth, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ['parquet', 'orc'])
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+ cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
def test_max_nesting_depth(self, vector, unique_database):
"""Tests that Impala can scan Parquet and ORC files having complex types of
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 53a15f0ae..112b53dc0 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1477,6 +1477,7 @@ class TestOrc(ImpalaTestSuite):
super(TestOrc, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'orc')
+ cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
@SkipIfS3.hdfs_block_size
@SkipIfGCS.hdfs_block_size
[impala] 04/05: IMPALA-11345: Parquet Bloom filtering failure if column is added to the schema
Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0b2f6b7f35ab8de31d27631d2b693915848c99a5
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Mon Jul 25 16:22:27 2022 +0200
IMPALA-11345: Parquet Bloom filtering failure if column is added to the
schema
If a new column was added to an existing table with existing data and
Parquet Bloom filtering was turned ON, queries having an equality
conjunct on the new column failed.
This was because the old Parquet data files did not have the new column
in their schema and could not find a column for the conjunct. This was
treated as an error and the query failed.
After this patch this situation is no longer treated as an error and the
conjunct is simply disregarded for Bloom filtering in the files that
lack the new column.
Testing:
- added the test
TestParquetBloomFilter::test_parquet_bloom_filtering_schema_change in
tests/query_test/test_parquet_bloom_filter.py that checks that a
query as described above does not fail.
Merge conflicts:
- hdfs-parquet-scanner.cc removes usage of NeedDataInFile().
Change-Id: Ief3e6b6358d3dff3abe5beeda752033a7e8e16a6
Reviewed-on: http://gerrit.cloudera.org:8080/18779
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/18888
Tested-by: Quanlong Huang <hu...@gmail.com>
Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
be/src/exec/parquet/hdfs-parquet-scanner.cc | 72 ++++++++++++++++++++++++---
tests/query_test/test_parquet_bloom_filter.py | 21 ++++++++
2 files changed, 87 insertions(+), 6 deletions(-)
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index d8058c875..04b1fddaa 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1924,14 +1924,40 @@ Status HdfsParquetScanner::ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64
return Status::OK();
}
+void LogMissingFields(google::LogSeverity log_level, const std::string& text_before,
+ const std::string& text_after, const std::unordered_set<std::string>& paths) {
+ stringstream s;
+ s << text_before;
+ s << "[";
+ size_t i = 0;
+ for (const std::string& path : paths) {
+ s << path;
+ if (i + 1 < paths.size()) {
+ s << ", ";
+ }
+ i++;
+ }
+
+ s << "]. ";
+ s << text_after;
+ VLOG(log_level) << s.str();
+}
+
// Create a map from column index to EQ conjuncts for Bloom filtering.
Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
// EQ conjuncts are represented as a LE and a GE conjunct with the same
// value. This map is used to pair them to form EQ conjuncts.
- // The value is a vector because there may be multiple GE or LE conjuncts on a column.
+ // The value is a set because there may be multiple GE or LE conjuncts on a column.
unordered_map<int, std::unordered_set<std::pair<std::string, const Literal*>>>
conjunct_halves;
+ // Slot paths for which no data is found in the file. It is expected for example if it
+ // is a partition column and unexpected for example if the column was added to the table
+ // schema after the current file was written and therefore the current file does
+ // not have the column.
+ std::unordered_set<std::string> unexpected_missing_fields;
+ std::unordered_set<std::string> expected_missing_fields;
+
for (ScalarExprEvaluator* eval : stats_conjunct_evals_) {
const ScalarExpr& expr = eval->root();
const string& function_name = expr.function_name();
@@ -1983,13 +2009,24 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
}
if (missing_field) {
- if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue;
-
- return Status(Substitute(
- "Unable to find SchemaNode for path '$0' in the schema of file '$1'.",
- PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()), filename()));
+ if (!file_metadata_utils_.IsValuePartitionCol(slot_desc)) {
+ // If a column is added to the schema of an existing table, the schemas of the
+ // old parquet data files do not contain the new column: see IMPALA-11345. This
+ // is not an error, we simply disregard this column in Bloom filtering in this
+ // scanner.
+ unexpected_missing_fields.emplace(
+ PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()));
+ } else {
+ // If the data is not expected to be in the file, we disregard the conjuncts for
+ // the purposes of Bloom filtering.
+ expected_missing_fields.emplace(
+ PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()));
+ }
+ continue;
}
+ DCHECK(node != nullptr);
+
if (!IsParquetBloomFilterSupported(node->element->type, child_slot_ref->type())) {
continue;
}
@@ -2029,6 +2066,29 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
}
}
+ // Log expected and unexpected missing fields.
+ if (!unexpected_missing_fields.empty()) {
+ LogMissingFields(google::WARNING,
+ Substitute(
+ "Unable to find SchemaNode for the following paths in the schema of "
+ "file '$0': ",
+ filename()),
+ "This may be because the column may have been added to the table schema after "
+ "writing this file. Disregarding conjuncts on this path for the purpose of "
+ "Parquet Bloom filtering in this file.",
+ unexpected_missing_fields);
+ }
+
+ if (!expected_missing_fields.empty()) {
+ LogMissingFields(google::INFO,
+ Substitute(
+ "Data for the following paths is not expected to be present in file '$0': ",
+ filename()),
+ "Disregarding conjuncts on this path for the purpose of Parquet Bloom filtering "
+ "in this file.",
+ expected_missing_fields);
+ }
+
return Status::OK();
}
diff --git a/tests/query_test/test_parquet_bloom_filter.py b/tests/query_test/test_parquet_bloom_filter.py
index eda93999b..11af6d28d 100644
--- a/tests/query_test/test_parquet_bloom_filter.py
+++ b/tests/query_test/test_parquet_bloom_filter.py
@@ -95,6 +95,27 @@ class TestParquetBloomFilter(ImpalaTestSuite):
vector.get_value('exec_option')['parquet_bloom_filtering'] = False
self.run_test_case('QueryTest/parquet-bloom-filter-disabled', vector, unique_database)
+ def test_parquet_bloom_filtering_schema_change(self, vector, unique_database):
+ """ Regression test for IMPALA-11345. Tests that the query does not fail when a new
+ column is added to the table schema but the old Parquet files do not contain it and
+ therefore no column is found for a conjunct while preparing Bloom filtering. """
+ vector.get_value('exec_option')['parquet_bloom_filtering'] = True
+
+ tbl_name = 'changed_schema'
+
+ stmts = [
+ 'create table {db}.{tbl} (id INT) stored as parquet',
+ 'insert into {db}.{tbl} values (1),(2),(3)',
+ 'alter table {db}.{tbl} add columns (name STRING)',
+ 'insert into {db}.{tbl} values (4, "James")',
+ 'select * from {db}.{tbl} where name in ("Lily")'
+ ]
+
+ for stmt in stmts:
+ self.execute_query_expect_success(self.client,
+ stmt.format(db=str(unique_database), tbl=tbl_name),
+ vector.get_value('exec_option'))
+
def test_write_parquet_bloom_filter(self, vector, unique_database, tmpdir):
# Get Bloom filters from the first row group of file PARQUET_TEST_FILE.
reference_col_to_bloom_filter = self._get_first_row_group_bloom_filters(
[impala] 03/05: IMPALA-11346: Migrated partitioned Iceberg tables might return ERROR when WHERE condition is used on partition column
Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git
commit e535f4b8dd1f8c1b3f6911cb99efe2a96a7e441f
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Thu Jun 9 17:21:28 2022 +0200
IMPALA-11346: Migrated partitioned Iceberg tables might return ERROR when WHERE condition is used on partition column
Identity-partitioned columns are not necessarily stored in the data
files. E.g. when we migrate a legacy partitioned table to Iceberg
without rewriting the data files, the partition columns won't be
present in the files.
The Parquet scanner does a few optimizations to eliminate row groups,
i.e. filtering based on stats, bloom filters, etc. When a column is
not present in the data file that has some predicate on, then it is
assumed that the whole row group doesn't pass the filtering criteria.
But for Iceberg some files might contain partition columns, while
other files doesn't, so we need to prepare the scanners to handle
such cases.
The ORC scanner doesn't have that many optimizations so it didn't
ran into this issue.
Testing:
* e2e tests
Merge conflicts due to missing 23d09638d:
* file-metadata-utils.cc resolves trivial conflicts
* hdfs-parquet-scanner.cc removes usage of NeedDataInFile()
Change-Id: Ie706317888981f634d792fb570f3eab1ec11a4f4
Reviewed-on: http://gerrit.cloudera.org:8080/18605
Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
Reviewed-by: Tamas Mate <tm...@apache.org>
Reviewed-by: <li...@sensorsdata.cn>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/18887
Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
Tested-by: Quanlong Huang <hu...@gmail.com>
---
be/src/exec/file-metadata-utils.cc | 1 -
be/src/exec/parquet/hdfs-parquet-scanner.cc | 4 +
.../queries/QueryTest/iceberg-migrated-tables.test | 138 +++++++++++++++++++++
3 files changed, 142 insertions(+), 1 deletion(-)
diff --git a/be/src/exec/file-metadata-utils.cc b/be/src/exec/file-metadata-utils.cc
index 708b4f5c5..2a9230996 100644
--- a/be/src/exec/file-metadata-utils.cc
+++ b/be/src/exec/file-metadata-utils.cc
@@ -114,7 +114,6 @@ Tuple* FileMetadataUtils::CreateTemplateTuple(MemPool* mem_pool) {
bool FileMetadataUtils::IsValuePartitionCol(const SlotDescriptor* slot_desc) {
DCHECK(context_ != nullptr);
DCHECK(file_desc_ != nullptr);
- if (slot_desc->parent() != scan_node_->tuple_desc()) return false;
if (slot_desc->col_pos() < scan_node_->num_partition_keys()) {
return true;
}
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index f79e02b91..d8058c875 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -566,6 +566,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
RETURN_IF_ERROR(ResolveSchemaForStatFiltering(slot_desc, &missing_field, &node));
if (missing_field) {
+ if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue;
// We are selecting a column that is not in the file. We would set its slot to NULL
// during the scan, so any predicate would evaluate to false. Return early. NULL
// comparisons cannot happen here, since predicates with NULL literals are filtered
@@ -707,6 +708,7 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
RETURN_IF_ERROR(ResolveSchemaForStatFiltering(slot_desc, &missing_field, &node));
if (missing_field) {
+ if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue;
// We are selecting a column that is not in the file. We would set its slot to NULL
// during the scan, so any predicate would evaluate to false. Return early. NULL
// comparisons cannot happen here, since predicates with NULL literals are filtered
@@ -1981,6 +1983,8 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
}
if (missing_field) {
+ if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue;
+
return Status(Substitute(
"Unable to find SchemaNode for path '$0' in the schema of file '$1'.",
PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()), filename()));
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
index d1549c529..ba278fcd4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
@@ -11,6 +11,54 @@ select * from functional_parquet.iceberg_alltypes_part
INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
====
---- QUERY
+# Queries with WHERE clauses
+select * from functional_parquet.iceberg_alltypes_part
+where i = 1;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where i = 3;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where p_int = 1;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where p_int = 2;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where p_bool = true;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where p_bool = false;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
# Read only the partition columns.
select p_bool, p_int, p_bigint, p_float,
p_double, p_decimal, p_date, p_string
@@ -33,6 +81,54 @@ select * from functional_parquet.iceberg_alltypes_part_orc
INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
====
---- QUERY
+# Queries with WHERE clauses
+select * from functional_parquet.iceberg_alltypes_part_orc
+where i = 1;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where i = 3;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where p_int = 1;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where p_int = 2;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where p_bool = true;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where p_bool = false;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
# Read only the partition columns.
select p_bool, p_int, p_bigint, p_float,
p_double, p_decimal, p_date, p_string
@@ -58,6 +154,23 @@ select * from functional_parquet.iceberg_legacy_partition_schema_evolution
BIGINT, DOUBLE, DECIMAL, INT
====
---- QUERY
+select * from functional_parquet.iceberg_legacy_partition_schema_evolution
+where p_int_long = 1;
+---- RESULTS
+1,1.100000023841858,2.718,2
+1,1.100000023841858,3.141,1
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
+====
+---- QUERY
+select * from functional_parquet.iceberg_legacy_partition_schema_evolution
+where p_dec_dec = 2.718;
+---- RESULTS
+1,1.100000023841858,2.718,2
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
+====
+---- QUERY
# Read only the partition columns.
select p_int_long, p_float_double, p_dec_dec
from functional_parquet.iceberg_legacy_partition_schema_evolution;
@@ -82,6 +195,23 @@ select * from functional_parquet.iceberg_legacy_partition_schema_evolution_orc
BIGINT, DOUBLE, DECIMAL, INT
====
---- QUERY
+select * from functional_parquet.iceberg_legacy_partition_schema_evolution_orc
+where p_int_long = 1;
+---- RESULTS
+1,1.100000023841858,2.718,2
+1,1.100000023841858,3.141,1
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
+====
+---- QUERY
+select * from functional_parquet.iceberg_legacy_partition_schema_evolution_orc
+where p_dec_dec = 2.718;
+---- RESULTS
+1,1.100000023841858,2.718,2
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
+====
+---- QUERY
# Read only the partition columns.
select p_int_long, p_float_double, p_dec_dec
from functional_parquet.iceberg_legacy_partition_schema_evolution_orc;
@@ -106,3 +236,11 @@ select * from only_part_cols;
---- TYPES
INT, STRING
====
+---- QUERY
+select * from only_part_cols
+where i = 2 and s = 's'
+---- RESULTS
+2,'s'
+---- TYPES
+INT, STRING
+====