You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/08/25 12:23:47 UTC

[impala] branch branch-4.1.1 updated (f6ee249ac -> 44dc157a2)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git


    from f6ee249ac IMPALA-11317/IMPALA-11316/IMPALA-11315: impala-shell Python 3 fixes
     new b28e133f0 IMPALA-11249: Fix add_test_dimensions() locations to call super()
     new 051c59bd8 IMPALA-9410: Support resolving ORC file columns by names
     new e535f4b8d IMPALA-11346: Migrated partitioned Iceberg tables might return ERROR when WHERE condition is used on partition column
     new 0b2f6b7f3 IMPALA-11345: Parquet Bloom filtering failure if column is added to the schema
     new 44dc157a2 IMPALA-11344: Missing slots in all cases should be allowed to be read

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/file-metadata-utils.cc                 |   1 -
 be/src/exec/hdfs-orc-scanner.cc                    |   5 +-
 be/src/exec/orc-column-readers.cc                  |  23 +-
 be/src/exec/orc-metadata-utils.cc                  |  93 ++++-
 be/src/exec/orc-metadata-utils.h                   |  11 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  72 +++-
 be/src/service/query-options.cc                    |   7 +
 be/src/service/query-options.h                     | 462 ++++++++++-----------
 common/thrift/ImpalaService.thrift                 |   3 +
 common/thrift/Query.thrift                         |   3 +
 .../queries/QueryTest/iceberg-migrated-tables.test | 146 ++++++-
 tests/authorization/test_ranger.py                 |   8 +
 tests/common/test_dimensions.py                    |   7 +
 tests/custom_cluster/test_client_ssl.py            |   1 +
 tests/query_test/test_insert.py                    |  22 +-
 tests/query_test/test_nested_types.py              |  26 +-
 tests/query_test/test_parquet_bloom_filter.py      |  21 +
 tests/query_test/test_scanners.py                  |  61 +++
 tests/shell/test_shell_client.py                   |  11 +-
 tests/shell/test_shell_commandline.py              |  10 +-
 tests/shell/test_shell_interactive.py              |  16 +-
 21 files changed, 723 insertions(+), 286 deletions(-)


[impala] 01/05: IMPALA-11249: Fix add_test_dimensions() locations to call super()

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b28e133f008e5196bf2bc7f2114cc45d0dfbe963
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Sat May 21 19:10:11 2022 -0700

    IMPALA-11249: Fix add_test_dimensions() locations to call super()
    
    The original issue is that the strict HS2 shell tests
    are not running in precommit or nightly jobs, but they
    do run in local developer environments. Investigating
    this showed that the shell tests were running with a
    weird set of test dimensions that includes
    table_format_and_file_extension. That dimension is only
    used in test_insert.py::TestInsertFileExtension.
    
    What is happening is that the shell tests and other
    locations are running add_test_dimensions() without
    calling super(..., cls).add_test_dimensions(). The
    behavior is unclear, but there is clearly cross-talk
    between the different tests that do this.
    
    This changes all add_test_dimensions() locations to
    call super(..., cls).add_test_dimensions() if they
    don't already. Each location has been tuned to run
    the same set of tests as before (except the shell
    tests which now run the strict HS2 tests).
    
    As part of this, several shell tests need to be
    skipped or fixed for strict HS2.
    
    Testing:
     - Ran core job
     - Ran tests locally to verify the set of tests
       didn't change.
    
    Change-Id: Ib20fd479d3b91ed0ed89a0bc5623cd2a5a458614
    Reviewed-on: http://gerrit.cloudera.org:8080/18557
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18905
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Quanlong Huang <hu...@gmail.com>
---
 tests/authorization/test_ranger.py      |  8 ++++++++
 tests/custom_cluster/test_client_ssl.py |  1 +
 tests/query_test/test_insert.py         | 22 +++++++++++++++-------
 tests/query_test/test_nested_types.py   |  5 ++++-
 tests/shell/test_shell_client.py        | 11 +++++++++--
 tests/shell/test_shell_commandline.py   | 10 ++++++++--
 tests/shell/test_shell_interactive.py   | 16 +++++++++++++---
 7 files changed, 58 insertions(+), 15 deletions(-)

diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 875886b12..d0caa7b17 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -1591,6 +1591,7 @@ class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
+    super(TestRangerColumnMaskingComplexTypesInSelectList, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_dimension(create_orc_dimension(cls.get_workload()))
     cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -1598,6 +1599,13 @@ class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite):
     cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
         disable_codegen_options=[True]))
 
+  @classmethod
+  def add_custom_cluster_constraints(cls):
+    # Do not call the super() implementation, because this class needs to relax
+    # the set of constraints. The usual constraints only run on uncompressed text.
+    # This disables that constraint to let us run against only ORC.
+    return
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
     impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py
index 66a3fff78..4add40501 100644
--- a/tests/custom_cluster/test_client_ssl.py
+++ b/tests/custom_cluster/test_client_ssl.py
@@ -140,6 +140,7 @@ class TestClientSsl(CustomClusterTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
+    super(TestClientSsl, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
 
   @pytest.mark.execute_serially
diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py
index 401d9b960..30d75fdc3 100644
--- a/tests/query_test/test_insert.py
+++ b/tests/query_test/test_insert.py
@@ -29,7 +29,9 @@ from tests.common.skip import SkipIfABFS, SkipIfEC, SkipIfLocal, \
     SkipIfHive2, SkipIfNotHdfsMinicluster, SkipIfS3, SkipIfDockerizedCluster
 from tests.common.test_dimensions import (
     create_exec_option_dimension,
-    create_uncompressed_text_dimension)
+    create_uncompressed_text_dimension,
+    create_single_exec_option_dimension,
+    is_supported_insert_format)
 from tests.common.test_result_verifier import (
     QueryTestResult,
     parse_result_rows)
@@ -335,20 +337,26 @@ class TestInsertFileExtension(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
-    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension(
-        'table_format_and_file_extension',
-        *[('parquet', '.parq'), ('textfile', '.txt')]))
+    super(TestInsertFileExtension, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        is_supported_insert_format(v.get_value('table_format')))
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
 
   @classmethod
   def setup_class(cls):
     super(TestInsertFileExtension, cls).setup_class()
 
   def test_file_extension(self, vector, unique_database):
-    table_format = vector.get_value('table_format_and_file_extension')[0]
-    file_extension = vector.get_value('table_format_and_file_extension')[1]
+    table_format = vector.get_value('table_format').file_format
+    if table_format == 'parquet':
+      file_extension = '.parq'
+      stored_as_format = 'parquet'
+    else:
+      file_extension = '.txt'
+      stored_as_format = 'textfile'
     table_name = "{0}_table".format(table_format)
     ctas_query = "create table {0}.{1} stored as {2} as select 1".format(
-        unique_database, table_name, table_format)
+        unique_database, table_name, stored_as_format)
     self.execute_query_expect_success(self.client, ctas_query)
     for path in self.filesystem_client.ls("test-warehouse/{0}.db/{1}".format(
         unique_database, table_name)):
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index 3ca64f885..ef64e1181 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -36,7 +36,8 @@ from tests.common.skip import (
     SkipIfNotHdfsMinicluster
     )
 from tests.common.test_dimensions import (create_exec_option_dimension,
-    create_exec_option_dimension_from_dict, create_client_protocol_dimension)
+    create_exec_option_dimension_from_dict, create_client_protocol_dimension,
+    create_orc_dimension)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_HDFS
 
@@ -185,9 +186,11 @@ class TestNestedTypesInSelectListWithBeeswax(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
+    super(TestNestedTypesInSelectListWithBeeswax, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('protocol') == 'beeswax')
+    cls.ImpalaTestMatrix.add_dimension(create_orc_dimension(cls.get_workload()))
     cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
         disable_codegen_options=[True]))
 
diff --git a/tests/shell/test_shell_client.py b/tests/shell/test_shell_client.py
index 5fa1d5caa..1388b527f 100644
--- a/tests/shell/test_shell_client.py
+++ b/tests/shell/test_shell_client.py
@@ -20,8 +20,9 @@
 
 from shell.impala_client import ImpalaBeeswaxClient, ImpalaHS2Client
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.test_dimensions import create_client_protocol_dimension
-from tests.common.test_dimensions import create_client_protocol_no_strict_dimension
+from tests.common.test_dimensions import (
+  create_client_protocol_dimension, create_client_protocol_no_strict_dimension,
+  create_uncompressed_text_dimension, create_single_exec_option_dimension)
 from util import get_impalad_host_port
 
 
@@ -34,6 +35,12 @@ class TestShellClient(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
+    super(TestShellClient, cls).add_test_dimensions()
+    # Limit to uncompressed text with default exec options
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    # Run with beeswax and HS2
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_no_strict_dimension())
 
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index cd53404c2..de81c9761 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -34,8 +34,9 @@ from tests.common.environ import ImpalaTestClusterProperties
 from tests.common.impala_service import ImpaladService
 from tests.common.impala_test_suite import ImpalaTestSuite, IMPALAD_HS2_HOST_PORT
 from tests.common.skip import SkipIf
-from tests.common.test_dimensions import create_client_protocol_dimension
-from tests.common.test_dimensions import create_client_protocol_strict_dimension
+from tests.common.test_dimensions import (
+  create_client_protocol_dimension, create_client_protocol_strict_dimension,
+  create_uncompressed_text_dimension, create_single_exec_option_dimension)
 from time import sleep, time
 from util import (get_impalad_host_port, assert_var_substitution, run_impala_shell_cmd,
                   ImpalaShell, IMPALA_SHELL_EXECUTABLE, SHELL_IS_PYTHON_2,
@@ -134,6 +135,11 @@ class TestImpalaShell(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
+    super(TestImpalaShell, cls).add_test_dimensions()
+    # Limit to uncompressed text with default exec options
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
     # Run with both beeswax and HS2 to ensure that behaviour is the same.
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_strict_dimension())
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index 2d1a4aa7a..0a6ad20b4 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -40,8 +40,9 @@ from tempfile import NamedTemporaryFile
 from tests.common.impala_service import ImpaladService
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfLocal
-from tests.common.test_dimensions import create_client_protocol_dimension
-from tests.common.test_dimensions import create_client_protocol_strict_dimension
+from tests.common.test_dimensions import (
+  create_client_protocol_dimension, create_client_protocol_strict_dimension,
+  create_uncompressed_text_dimension, create_single_exec_option_dimension)
 from tests.shell.util import get_unused_port
 from util import (assert_var_substitution, ImpalaShell, get_impalad_port, get_shell_cmd,
                   get_open_sessions_metric, IMPALA_SHELL_EXECUTABLE, spawn_shell)
@@ -165,6 +166,11 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
+    super(TestImpalaShellInteractive, cls).add_test_dimensions()
+    # Limit to uncompressed text with default exec options
+    cls.ImpalaTestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
     # Run with both beeswax and HS2 to ensure that behaviour is the same.
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_strict_dimension())
@@ -681,6 +687,8 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
 
   def test_commandline_flag_disable_live_progress(self, vector):
     """Test the command line flag disable_live_progress with live_progress."""
+    if vector.get_value('strict_hs2_protocol'):
+      pytest.skip("Live option not supported in strict hs2 mode.")
     # By default, shell option live_progress is set to True in the interactive mode.
     cmds = "set all;"
     result = run_impala_shell_interactive(vector, cmds)
@@ -704,13 +712,15 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
 
     cmds = "set all;"
     # override the default option through command line argument.
-    args = ['--strict_h2_protocol']
+    args = ['--strict_hs2_protocol']
     result = run_impala_shell_interactive(vector, cmds, shell_args=args)
     assert "\tLIVE_PROGRESS: False" in result.stdout
     assert "\tLIVE_SUMMARY: False" in result.stdout
 
   def test_live_option_configuration(self, vector):
     """Test the optional configuration file with live_progress and live_summary."""
+    if vector.get_value('strict_hs2_protocol'):
+      pytest.skip("Live option not supported in strict hs2 mode.")
     # Positive tests
     # set live_summary and live_progress as True with config file
     rcfile_path = os.path.join(QUERY_FILE_PATH, 'good_impalarc3')


[impala] 05/05: IMPALA-11344: Missing slots in all cases should be allowed to be read

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 44dc157a2c10578b82518012aa2e9aa9288dc6e5
Author: ttttttz <24...@qq.com>
AuthorDate: Wed Jun 22 11:53:28 2022 +0800

    IMPALA-11344: Missing slots in all cases should be allowed to be read
    
    When selecting only the missing fields of ORC files and the missing fields
    contain non-partition fields, the query will fail due to `Parse error in
    possibly corrupt ORC file: '$filename'. No columns found for this scan`.
    We should allow read missing slots in all cases.
    
    Testing:
    - Added a test to test_scanners.py that ensures the query can be
      executed successfully when selecting only the missing fields of
      ORC files.
    Change-Id: I15dca47ba5f7a93bfd5fcba3cab4ac6d64459023
    Reviewed-on: http://gerrit.cloudera.org:8080/18652
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18907
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
 be/src/exec/orc-column-readers.cc | 23 +--------------
 tests/query_test/test_scanners.py | 60 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 61 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc
index 644ac325f..7c9ae072d 100644
--- a/be/src/exec/orc-column-readers.cc
+++ b/be/src/exec/orc-column-readers.cc
@@ -530,28 +530,7 @@ Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
   }
   int num_rows_read = scratch_batch->num_tuples - scratch_batch_idx;
   if (children_.empty()) {
-    // We allow empty 'children_' for original files, because we might select the
-    // synthetic 'rowid' field which is not present in original files.
-    // We also allow empty 'children_' when we need to validate row batches of a zero slot
-    // scan. In that case 'children_' is empty and only 'row_validator_' owns an ORC
-    // vector batch (the write id batch).
-    bool valid_empty_children = scanner_->acid_original_file_ ||
-         (scanner_->row_batches_need_validation_ &&
-          scanner_->scan_node_->IsZeroSlotTableScan());
-    if (!valid_empty_children) {
-      bool only_partitions = true;
-      for (SlotDescriptor* slot : tuple_desc_->slots()) {
-        if (!scanner_->IsPartitionKeySlot(slot)) {
-          only_partitions = false;
-          break;
-        }
-      }
-      if (!only_partitions) {
-        return Status(Substitute("Parse error in possibly corrupt ORC file: '$0'. "
-            "No columns found for this scan.",
-            scanner_->filename()));
-      }
-    }
+    // We allow empty 'children_' in all cases.
     DCHECK_EQ(0, num_rows_read);
     num_rows_read = std::min(scratch_batch->capacity - scratch_batch->num_tuples,
                              NumElements() - row_idx_);
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 112b53dc0..ca15d43a2 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1734,6 +1734,66 @@ class TestOrc(ImpalaTestSuite):
 
     self.run_test_case('QueryTest/hive2-pre-gregorian-date-orc', vector, unique_database)
 
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  @SkipIfS3.hive
+  @SkipIfGCS.hive
+  @SkipIfCOS.hive
+  def test_missing_field_orc(self, unique_database):
+    # Test scanning orc files with missing fields in file meta.
+    orc_tbl_name = unique_database + ".missing_field_orc"
+    self.client.execute("create table %s (f0 int) stored as orc" % orc_tbl_name)
+    self.run_stmt_in_hive("insert into table %s select 1" % orc_tbl_name)
+    self.client.execute("refresh %s" % orc_tbl_name)
+
+    self.client.execute("alter table %s add columns(f1 int)" % orc_tbl_name)
+    result = self.client.execute("select f1 from %s " % orc_tbl_name)
+    assert result.data == ['NULL']
+
+    self.client.execute("alter table %s add columns(f2 STRUCT<s0:STRING, s1:STRING>)"
+                        % orc_tbl_name)
+    result = self.client.execute("select f2.s0 from %s " % orc_tbl_name)
+    assert result.data == ['NULL']
+
+    orc_tbl_name = unique_database + ".missing_field_full_txn_test"
+    self.client.execute("create table %s(f0 int) stored as orc "
+                        "tblproperties('transactional'='true')" % orc_tbl_name)
+    self.run_stmt_in_hive("insert into %s values(0)" % orc_tbl_name)
+    self.run_stmt_in_hive("alter table %s add columns(f1 int)" % orc_tbl_name)
+    self.run_stmt_in_hive("insert into %s values(1,1)" % orc_tbl_name)
+    self.client.execute("refresh %s" % orc_tbl_name)
+    result = self.client.execute("select f1 from %s" % orc_tbl_name)
+    assert len(result.data) == 2
+    assert '1' in result.data
+    assert 'NULL' in result.data
+
+    # TODO: add a test case for Iceberg tables once IMPALA-10542 is done.
+    # orc_tbl_name = unique_database + ".missing_field_iceberg_test"
+    # self.client.execute("create table %s (f0 int) stored as iceberg "
+    #                     "tblproperties('write.format.default' = 'orc')"
+    #                     % orc_tbl_name)
+    # self.run_stmt_in_hive("insert into %s values(0)" % orc_tbl_name)
+    # self.run_stmt_in_hive("alter table %s add columns(f1 int)" % orc_tbl_name)
+    # self.run_stmt_in_hive("insert into %s values(1,1)" % orc_tbl_name)
+    # self.client.execute("refresh %s" % orc_tbl_name)
+    # result = self.client.execute("select f1 from %s" % orc_tbl_name)
+    # assert len(result.data) == 2
+    # assert '1' in result.data
+    # assert 'NULL' in result.data
+
+    orc_tbl_name = unique_database + ".lineitem_orc_ext"
+    test_file = "/test-warehouse/tpch.lineitem_orc_def"
+    create_sql = "create external table %s like tpch_orc_def.lineitem " \
+                 "location '%s'" % (orc_tbl_name, test_file)
+    self.client.execute(create_sql)
+    self.client.execute("alter table %s add columns (new_col int)" % orc_tbl_name)
+    result = self.execute_query("select count(*) from %s where new_col is null"
+                                % orc_tbl_name)
+    assert len(result.data) == 1
+    assert '6001215' in result.data
+
 
 class TestScannerReservation(ImpalaTestSuite):
   @classmethod


[impala] 02/05: IMPALA-9410: Support resolving ORC file columns by names

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 051c59bd80dfb8291dd4ef61419db7db2c41cab4
Author: Gergely Fürnstáhl <gf...@cloudera.com>
AuthorDate: Wed Apr 20 10:34:27 2022 +0200

    IMPALA-9410: Support resolving ORC file columns by names
    
    Added query option and implementation to be able to resolve columns by
    names.
    
    Changed secondary resolution strategy for iceberg orc tables to name
    based resolution.
    
    Testing:
    
    Added new test dimension for orc tests, added results to now working
    iceberg migrated table test
    
    Change-Id: I29562a059160c19eb58ccea76aa959d2e408f8de
    Reviewed-on: http://gerrit.cloudera.org:8080/18397
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18906
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
    Tested-by: Quanlong Huang <hu...@gmail.com>
---
 be/src/exec/hdfs-orc-scanner.cc                    |   5 +-
 be/src/exec/orc-metadata-utils.cc                  |  93 ++++-
 be/src/exec/orc-metadata-utils.h                   |  11 +-
 be/src/service/query-options.cc                    |   7 +
 be/src/service/query-options.h                     | 462 ++++++++++-----------
 common/thrift/ImpalaService.thrift                 |   3 +
 common/thrift/Query.thrift                         |   3 +
 .../queries/QueryTest/iceberg-migrated-tables.test |   8 +-
 tests/common/test_dimensions.py                    |   7 +
 tests/query_test/test_nested_types.py              |  23 +-
 tests/query_test/test_scanners.py                  |   1 +
 11 files changed, 378 insertions(+), 245 deletions(-)

diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index cb02dac52..f6643e9b4 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -357,8 +357,9 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
   RETURN_IF_ERROR(footer_status);
 
   bool is_table_full_acid = scan_node_->hdfs_table()->IsTableFullAcid();
-  schema_resolver_.reset(new OrcSchemaResolver(*scan_node_->hdfs_table(),
-      &reader_->getType(), filename(), is_table_full_acid));
+  schema_resolver_.reset(
+      new OrcSchemaResolver(*scan_node_->hdfs_table(), &reader_->getType(), filename(),
+          is_table_full_acid, state_->query_options().orc_schema_resolution));
   bool is_file_full_acid = schema_resolver_->HasFullAcidV2Schema();
   acid_original_file_ = is_table_full_acid && !is_file_full_acid;
   if (is_table_full_acid) {
diff --git a/be/src/exec/orc-metadata-utils.cc b/be/src/exec/orc-metadata-utils.cc
index 890db5792..b08e5c61f 100644
--- a/be/src/exec/orc-metadata-utils.cc
+++ b/be/src/exec/orc-metadata-utils.cc
@@ -37,16 +37,21 @@ inline int GetFieldIdFromStr(const std::string& str) {
 }
 
 OrcSchemaResolver::OrcSchemaResolver(const HdfsTableDescriptor& tbl_desc,
-    const orc::Type* root, const char* filename, bool is_table_acid) :
-    tbl_desc_(tbl_desc), root_(root), filename_(filename),
+    const orc::Type* root, const char* filename, bool is_table_acid,
+    TSchemaResolutionStrategy::type schema_resolution)
+  : schema_resolution_strategy_(schema_resolution),
+    tbl_desc_(tbl_desc),
+    root_(root),
+    filename_(filename),
     is_table_full_acid_(is_table_acid) {
   DetermineFullAcidSchema();
-  schema_resolution_strategy_ = TSchemaResolutionStrategy::POSITION;
   if (tbl_desc_.IsIcebergTable() && root_->getSubtypeCount() > 0) {
     // Use FIELD_ID-based column resolution for Iceberg tables if possible.
     const orc::Type* first_child =  root_->getSubtype(0);
     if (first_child->hasAttributeKey(ICEBERG_FIELD_ID)) {
       schema_resolution_strategy_ = TSchemaResolutionStrategy::FIELD_ID;
+    } else {
+      schema_resolution_strategy_ = TSchemaResolutionStrategy::NAME;
     }
   }
 }
@@ -55,6 +60,8 @@ Status OrcSchemaResolver::ResolveColumn(const SchemaPath& col_path,
     const orc::Type** node, bool* pos_field, bool* missing_field) const {
   if (schema_resolution_strategy_ == TSchemaResolutionStrategy::POSITION) {
     return ResolveColumnByPosition(col_path, node, pos_field, missing_field);
+  } else if (schema_resolution_strategy_ == TSchemaResolutionStrategy::NAME) {
+    return ResolveColumnByName(col_path, node, pos_field, missing_field);
   } else if (schema_resolution_strategy_ == TSchemaResolutionStrategy::FIELD_ID) {
     return ResolveColumnByIcebergFieldId(col_path, node, pos_field, missing_field);
   } else {
@@ -162,6 +169,86 @@ Status OrcSchemaResolver::ValidateMap(const ColumnType& type,
   return Status::OK();
 }
 
+Status OrcSchemaResolver::ResolveColumnByName(const SchemaPath& col_path,
+    const orc::Type** node, bool* pos_field, bool* missing_field) const {
+  const ColumnType* table_col_type = nullptr;
+  *node = root_;
+  *pos_field = false;
+  *missing_field = false;
+  if (col_path.empty()) return Status::OK();
+  SchemaPath table_path, file_path;
+  TranslateColPaths(col_path, &table_path, &file_path);
+
+  int i = 0;
+
+  // Resolve table and file ACID differences
+  int table_idx = table_path[i];
+  int file_idx = file_path[i];
+  if (table_idx == -1 || file_idx == -1) {
+    DCHECK_NE(table_idx, file_idx);
+    if (table_idx == -1) {
+      DCHECK_EQ(*node, root_);
+      *node = (*node)->getSubtype(file_idx);
+    } else {
+      DCHECK(table_col_type == nullptr);
+      table_col_type = &tbl_desc_.col_descs()[table_idx].type();
+    }
+    i++;
+  }
+
+  for (; i < table_path.size(); ++i) {
+    table_idx = table_path[i];
+    if (table_col_type == nullptr) {
+      // non ACID table, or top level user column in ACID table
+      table_col_type = &tbl_desc_.col_descs()[table_idx].type();
+      const std::string& name = tbl_desc_.col_descs()[table_idx].name();
+      *node = FindChildWithName(*node, name);
+      if (*node == nullptr) {
+        *missing_field = true;
+        return Status::OK();
+      }
+      RETURN_IF_ERROR(ValidateType(*table_col_type, **node, table_path, i));
+      continue;
+    } else if (table_col_type->type == TYPE_STRUCT) {
+      // Resolve struct field by name.
+      DCHECK_LT(table_idx, table_col_type->field_names.size());
+      const std::string& name = table_col_type->field_names[table_idx];
+      *node = FindChildWithName(*node, name);
+    } else if (table_col_type->type == TYPE_ARRAY) {
+      if (table_idx == SchemaPathConstants::ARRAY_POS) {
+        *pos_field = true;
+        break; // return *node as the ARRAY node
+      }
+      DCHECK_EQ(table_idx, SchemaPathConstants::ARRAY_ITEM);
+      *node = (*(node))->getSubtype(table_idx);
+    } else if (table_col_type->type == TYPE_MAP) {
+      DCHECK(table_idx == SchemaPathConstants::MAP_KEY
+          || table_idx == SchemaPathConstants::MAP_VALUE);
+      // At this point we've found a MAP with a matching name. It's safe to resolve
+      // the child (key or value) by position.
+      *node = (*(node))->getSubtype(table_idx);
+    }
+    if (*node == nullptr) {
+      *missing_field = true;
+      return Status::OK();
+    }
+    table_col_type = &table_col_type->children[table_idx];
+    RETURN_IF_ERROR(ValidateType(*table_col_type, **node, table_path, i));
+  }
+  return Status::OK();
+}
+
+const orc::Type* OrcSchemaResolver::FindChildWithName(
+    const orc::Type* node, const std::string& name) const {
+  for (int i = 0; i < node->getSubtypeCount(); ++i) {
+    const orc::Type* child = node->getSubtype(i);
+    DCHECK(child != nullptr);
+    const std::string& fieldName = node->getFieldName(i);
+    if (iequals(fieldName, name)) return child;
+  }
+  return nullptr;
+}
+
 Status OrcSchemaResolver::ResolveColumnByIcebergFieldId(const SchemaPath& col_path,
     const orc::Type** node, bool* pos_field, bool* missing_field) const {
   const ColumnType* table_col_type = nullptr;
diff --git a/be/src/exec/orc-metadata-utils.h b/be/src/exec/orc-metadata-utils.h
index af23afb11..aaab7d46f 100644
--- a/be/src/exec/orc-metadata-utils.h
+++ b/be/src/exec/orc-metadata-utils.h
@@ -42,7 +42,8 @@ constexpr int CURRENT_TRANSCACTION_TYPE_ID = 5;
 class OrcSchemaResolver {
  public:
   OrcSchemaResolver(const HdfsTableDescriptor& tbl_desc, const orc::Type* root,
-      const char* filename, bool is_table_acid);
+      const char* filename, bool is_table_acid,
+      TSchemaResolutionStrategy::type schema_resolution);
 
   /// Resolve SchemaPath into orc::Type (ORC column representation)
   /// 'pos_field' is set to true if 'col_path' reference the index field of an array
@@ -94,12 +95,20 @@ class OrcSchemaResolver {
   Status ResolveColumnByPosition(const SchemaPath& col_path, const orc::Type** node,
       bool* pos_field, bool* missing_field) const;
 
+  /// Resolve column based on name.
+  Status ResolveColumnByName(const SchemaPath& col_path, const orc::Type** node,
+      bool* pos_field, bool* missing_field) const;
+
   /// Resolve column based on the Iceberg field ids. This way we will retrieve the
   /// Iceberg field ids from the HMS table via 'col_path', then find the corresponding
   /// field in the ORC file.
   Status ResolveColumnByIcebergFieldId(const SchemaPath& col_path, const orc::Type** node,
       bool* pos_field, bool* missing_field) const;
 
+  /// Finds child of 'node' whose column name matches to provided 'name'.
+  const orc::Type* FindChildWithName(
+      const orc::Type* node, const std::string& name) const;
+
   /// Finds child of 'node' that has Iceberg field id equals to 'field_id'.
   const orc::Type* FindChildWithFieldId(const orc::Type* node, const int field_id) const;
 
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index c5a1206ff..20105b5e3 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1222,6 +1222,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_test_replan(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::ORC_SCHEMA_RESOLUTION: {
+        TSchemaResolutionStrategy::type enum_type;
+        RETURN_IF_ERROR(GetThriftEnum(value, "orc schema resolution",
+            _TSchemaResolutionStrategy_VALUES_TO_NAMES, &enum_type));
+        query_options->__set_orc_schema_resolution(enum_type);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index fa9691b2c..48444a4f9 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -48,241 +48,233 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // option in the enum TImpalaQueryOptions (defined in ImpalaService.thrift)
 // plus one. Thus, the second argument to the DCHECK has to be updated every
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
-#define QUERY_OPTS_TABLE\
-  DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::LOCK_MAX_WAIT_TIME_S + 1);\
-  REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
-  QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
-  REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
-  QUERY_OPT_FN(batch_size, BATCH_SIZE, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(debug_action, DEBUG_ACTION, TQueryOptionLevel::DEVELOPMENT)\
-  REMOVED_QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT)\
-  REMOVED_QUERY_OPT_FN(disable_cached_reads, DISABLE_CACHED_READS)\
-  QUERY_OPT_FN(disable_outermost_topn, DISABLE_OUTERMOST_TOPN,\
-      TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(disable_codegen, DISABLE_CODEGEN, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(explain_level, EXPLAIN_LEVEL, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(hbase_cache_blocks, HBASE_CACHE_BLOCKS, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(hbase_caching, HBASE_CACHING, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(max_errors, MAX_ERRORS, TQueryOptionLevel::ADVANCED)\
-  REMOVED_QUERY_OPT_FN(max_io_buffers, MAX_IO_BUFFERS)\
-  QUERY_OPT_FN(max_scan_range_length, MAX_SCAN_RANGE_LENGTH,\
-      TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(mem_limit, MEM_LIMIT, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(num_nodes, NUM_NODES, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(num_scanner_threads, NUM_SCANNER_THREADS, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(compression_codec, COMPRESSION_CODEC, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(parquet_file_size, PARQUET_FILE_SIZE, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(request_pool, REQUEST_POOL, TQueryOptionLevel::REGULAR)\
-  REMOVED_QUERY_OPT_FN(reservation_request_timeout, RESERVATION_REQUEST_TIMEOUT)\
-  QUERY_OPT_FN(sync_ddl, SYNC_DDL, TQueryOptionLevel::REGULAR)\
-  REMOVED_QUERY_OPT_FN(v_cpu_cores, V_CPU_CORES)\
-  REMOVED_QUERY_OPT_FN(rm_initial_mem, RM_INITIAL_MEM)\
-  QUERY_OPT_FN(query_timeout_s, QUERY_TIMEOUT_S, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS, TQueryOptionLevel::REGULAR)\
-  REMOVED_QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\
-  QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(replica_preference, REPLICA_PREFERENCE, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(schedule_random_replica, SCHEDULE_RANDOM_REPLICA,\
-      TQueryOptionLevel::ADVANCED)\
-  REMOVED_QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD)\
-  QUERY_OPT_FN(disable_streaming_preaggregations, DISABLE_STREAMING_PREAGGREGATIONS,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(runtime_filter_mode, RUNTIME_FILTER_MODE, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(runtime_bloom_filter_size, RUNTIME_BLOOM_FILTER_SIZE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(runtime_filter_wait_time_ms, RUNTIME_FILTER_WAIT_TIME_MS,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(disable_row_runtime_filtering, DISABLE_ROW_RUNTIME_FILTERING,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(mt_dop, MT_DOP, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(strict_mode, STRICT_MODE, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(scratch_limit, SCRATCH_LIMIT, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(enable_expr_rewrites, ENABLE_EXPR_REWRITES, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(enable_cnf_rewrites, ENABLE_CNF_REWRITES, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(decimal_v2, DECIMAL_V2, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(parquet_dictionary_filtering, PARQUET_DICTIONARY_FILTERING,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(parquet_array_resolution, PARQUET_ARRAY_RESOLUTION,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(parquet_read_statistics, PARQUET_READ_STATISTICS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(disable_codegen_rows_threshold, DISABLE_CODEGEN_ROWS_THRESHOLD,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(default_spillable_buffer_size, DEFAULT_SPILLABLE_BUFFER_SIZE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(min_spillable_buffer_size, MIN_SPILLABLE_BUFFER_SIZE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(max_row_size, MAX_ROW_SIZE, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(idle_session_timeout, IDLE_SESSION_TIMEOUT, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(compute_stats_min_sample_size, COMPUTE_STATS_MIN_SAMPLE_SIZE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(exec_time_limit_s, EXEC_TIME_LIMIT_S, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(shuffle_distinct_exprs, SHUFFLE_DISTINCT_EXPRS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(max_mem_estimate_for_admission, MAX_MEM_ESTIMATE_FOR_ADMISSION,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(thread_reservation_limit, THREAD_RESERVATION_LIMIT,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(thread_reservation_aggregate_limit, THREAD_RESERVATION_AGGREGATE_LIMIT,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(kudu_read_mode, KUDU_READ_MODE, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(allow_erasure_coded_files, ALLOW_ERASURE_CODED_FILES,\
-      TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(timezone, TIMEZONE, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(scan_bytes_limit, SCAN_BYTES_LIMIT,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(topn_bytes_limit, TOPN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(client_identifier, CLIENT_IDENTIFIER, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(resource_trace_ratio, RESOURCE_TRACE_RATIO, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(num_remote_executor_candidates, NUM_REMOTE_EXECUTOR_CANDIDATES,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(num_rows_produced_limit, NUM_ROWS_PRODUCED_LIMIT,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(\
-      planner_testcase_mode, PLANNER_TESTCASE_MODE, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(default_file_format, DEFAULT_FILE_FORMAT, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(parquet_timestamp_type, PARQUET_TIMESTAMP_TYPE,\
-      TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(parquet_read_page_index, PARQUET_READ_PAGE_INDEX,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(parquet_write_page_index, PARQUET_WRITE_PAGE_INDEX,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(parquet_page_row_count_limit, PARQUET_PAGE_ROW_COUNT_LIMIT,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(disable_hdfs_num_rows_estimate, DISABLE_HDFS_NUM_ROWS_ESTIMATE,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(default_hints_insert_statement, DEFAULT_HINTS_INSERT_STATEMENT,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(spool_query_results, SPOOL_QUERY_RESULTS,\
-      TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(default_transactional_type, DEFAULT_TRANSACTIONAL_TYPE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(statement_expression_limit, STATEMENT_EXPRESSION_LIMIT,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(max_statement_length_bytes, MAX_STATEMENT_LENGTH_BYTES,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(disable_data_cache, DISABLE_DATA_CACHE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(max_result_spooling_mem, MAX_RESULT_SPOOLING_MEM,\
-      TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(max_spilled_result_spooling_mem, MAX_SPILLED_RESULT_SPOOLING_MEM,\
-      TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(disable_hbase_num_rows_estimate, DISABLE_HBASE_NUM_ROWS_ESTIMATE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(fetch_rows_timeout_ms, FETCH_ROWS_TIMEOUT_MS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(now_string, NOW_STRING, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(preagg_bytes_limit, PREAGG_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(max_cnf_exprs, MAX_CNF_EXPRS, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(kudu_snapshot_read_timestamp_micros, KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(retry_failed_queries, RETRY_FAILED_QUERIES, TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(enabled_runtime_filter_types, ENABLED_RUNTIME_FILTER_TYPES,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(async_codegen, ASYNC_CODEGEN, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(enable_distinct_semi_join_optimization,\
-      ENABLE_DISTINCT_SEMI_JOIN_OPTIMIZATION, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(sort_run_bytes_limit, SORT_RUN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(max_fs_writers, MAX_FS_WRITERS, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(refresh_updated_hms_partitions,\
-      REFRESH_UPDATED_HMS_PARTITIONS, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(spool_all_results_for_retries, SPOOL_ALL_RESULTS_FOR_RETRIES,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(runtime_filter_error_rate, RUNTIME_FILTER_ERROR_RATE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(use_local_tz_for_unix_timestamp_conversions,\
-      USE_LOCAL_TZ_FOR_UNIX_TIMESTAMP_CONVERSIONS, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(convert_legacy_hive_parquet_utc_timestamps,\
-      CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(enable_outer_join_to_inner_transformation,\
-      ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(targeted_kudu_scan_range_length, TARGETED_KUDU_SCAN_RANGE_LENGTH,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(report_skew_limit, REPORT_SKEW_LIMIT,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(optimize_simple_limit, OPTIMIZE_SIMPLE_LIMIT,\
-      TQueryOptionLevel::REGULAR)\
-  QUERY_OPT_FN(use_dop_for_costing, USE_DOP_FOR_COSTING,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(broadcast_to_partition_factor, BROADCAST_TO_PARTITION_FACTOR,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(join_rows_produced_limit, JOIN_ROWS_PRODUCED_LIMIT,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(utf8_mode, UTF8_MODE, TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(analytic_rank_pushdown_threshold,\
-      ANALYTIC_RANK_PUSHDOWN_THRESHOLD, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(minmax_filter_threshold, MINMAX_FILTER_THRESHOLD,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(minmax_filtering_level, MINMAX_FILTERING_LEVEL,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(compute_column_minmax_stats, COMPUTE_COLUMN_MINMAX_STATS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(show_column_minmax_stats, SHOW_COLUMN_MINMAX_STATS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(default_ndv_scale, DEFAULT_NDV_SCALE, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(kudu_replica_selection, KUDU_REPLICA_SELECTION,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(delete_stats_in_truncate, DELETE_STATS_IN_TRUNCATE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(parquet_bloom_filtering, PARQUET_BLOOM_FILTERING,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(minmax_filter_sorted_columns, MINMAX_FILTER_SORTED_COLUMNS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(minmax_filter_fast_code_path, MINMAX_FILTER_FAST_CODE_PATH,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(enable_kudu_transaction, ENABLE_KUDU_TRANSACTION,\
-      TQueryOptionLevel::DEVELOPMENT)\
-  QUERY_OPT_FN(minmax_filter_partition_columns, MINMAX_FILTER_PARTITION_COLUMNS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(parquet_bloom_filter_write, PARQUET_BLOOM_FILTER_WRITE,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(orc_read_statistics, ORC_READ_STATISTICS,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(enable_async_ddl_execution, ENABLE_ASYNC_DDL_EXECUTION,\
-      TQueryOptionLevel::ADVANCED) \
-  QUERY_OPT_FN(enable_async_load_data_execution, ENABLE_ASYNC_LOAD_DATA_EXECUTION,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(parquet_late_materialization_threshold,\
-      PARQUET_LATE_MATERIALIZATION_THRESHOLD, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(parquet_dictionary_runtime_filter_entry_limit,\
-      PARQUET_DICTIONARY_RUNTIME_FILTER_ENTRY_LIMIT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(abort_java_udf_on_exception,\
-      ABORT_JAVA_UDF_ON_EXCEPTION, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(orc_async_read, ORC_ASYNC_READ, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(runtime_in_list_filter_entry_limit,\
-      RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(enable_replan, ENABLE_REPLAN,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(test_replan, TEST_REPLAN,\
-      TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S, TQueryOptionLevel::REGULAR)\
-  ;
+#define QUERY_OPTS_TABLE                                                                 \
+  DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
+      TImpalaQueryOptions::ORC_SCHEMA_RESOLUTION + 1);                                   \
+  REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
+  QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
+  REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
+  QUERY_OPT_FN(batch_size, BATCH_SIZE, TQueryOptionLevel::DEVELOPMENT)                   \
+  QUERY_OPT_FN(debug_action, DEBUG_ACTION, TQueryOptionLevel::DEVELOPMENT)               \
+  REMOVED_QUERY_OPT_FN(default_order_by_limit, DEFAULT_ORDER_BY_LIMIT)                   \
+  REMOVED_QUERY_OPT_FN(disable_cached_reads, DISABLE_CACHED_READS)                       \
+  QUERY_OPT_FN(                                                                          \
+      disable_outermost_topn, DISABLE_OUTERMOST_TOPN, TQueryOptionLevel::DEVELOPMENT)    \
+  QUERY_OPT_FN(disable_codegen, DISABLE_CODEGEN, TQueryOptionLevel::REGULAR)             \
+  QUERY_OPT_FN(explain_level, EXPLAIN_LEVEL, TQueryOptionLevel::REGULAR)                 \
+  QUERY_OPT_FN(hbase_cache_blocks, HBASE_CACHE_BLOCKS, TQueryOptionLevel::ADVANCED)      \
+  QUERY_OPT_FN(hbase_caching, HBASE_CACHING, TQueryOptionLevel::ADVANCED)                \
+  QUERY_OPT_FN(max_errors, MAX_ERRORS, TQueryOptionLevel::ADVANCED)                      \
+  REMOVED_QUERY_OPT_FN(max_io_buffers, MAX_IO_BUFFERS)                                   \
+  QUERY_OPT_FN(                                                                          \
+      max_scan_range_length, MAX_SCAN_RANGE_LENGTH, TQueryOptionLevel::DEVELOPMENT)      \
+  QUERY_OPT_FN(mem_limit, MEM_LIMIT, TQueryOptionLevel::REGULAR)                         \
+  QUERY_OPT_FN(num_nodes, NUM_NODES, TQueryOptionLevel::DEVELOPMENT)                     \
+  QUERY_OPT_FN(num_scanner_threads, NUM_SCANNER_THREADS, TQueryOptionLevel::REGULAR)     \
+  QUERY_OPT_FN(compression_codec, COMPRESSION_CODEC, TQueryOptionLevel::REGULAR)         \
+  QUERY_OPT_FN(parquet_file_size, PARQUET_FILE_SIZE, TQueryOptionLevel::ADVANCED)        \
+  QUERY_OPT_FN(request_pool, REQUEST_POOL, TQueryOptionLevel::REGULAR)                   \
+  REMOVED_QUERY_OPT_FN(reservation_request_timeout, RESERVATION_REQUEST_TIMEOUT)         \
+  QUERY_OPT_FN(sync_ddl, SYNC_DDL, TQueryOptionLevel::REGULAR)                           \
+  REMOVED_QUERY_OPT_FN(v_cpu_cores, V_CPU_CORES)                                         \
+  REMOVED_QUERY_OPT_FN(rm_initial_mem, RM_INITIAL_MEM)                                   \
+  QUERY_OPT_FN(query_timeout_s, QUERY_TIMEOUT_S, TQueryOptionLevel::REGULAR)             \
+  QUERY_OPT_FN(buffer_pool_limit, BUFFER_POOL_LIMIT, TQueryOptionLevel::ADVANCED)        \
+  QUERY_OPT_FN(appx_count_distinct, APPX_COUNT_DISTINCT, TQueryOptionLevel::ADVANCED)    \
+  QUERY_OPT_FN(disable_unsafe_spills, DISABLE_UNSAFE_SPILLS, TQueryOptionLevel::REGULAR) \
+  REMOVED_QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)                       \
+  QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD,         \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS,               \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(replica_preference, REPLICA_PREFERENCE, TQueryOptionLevel::ADVANCED)      \
+  QUERY_OPT_FN(                                                                          \
+      schedule_random_replica, SCHEDULE_RANDOM_REPLICA, TQueryOptionLevel::ADVANCED)     \
+  REMOVED_QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD)         \
+  QUERY_OPT_FN(disable_streaming_preaggregations, DISABLE_STREAMING_PREAGGREGATIONS,     \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(runtime_filter_mode, RUNTIME_FILTER_MODE, TQueryOptionLevel::REGULAR)     \
+  QUERY_OPT_FN(                                                                          \
+      runtime_bloom_filter_size, RUNTIME_BLOOM_FILTER_SIZE, TQueryOptionLevel::ADVANCED) \
+  QUERY_OPT_FN(runtime_filter_wait_time_ms, RUNTIME_FILTER_WAIT_TIME_MS,                 \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(disable_row_runtime_filtering, DISABLE_ROW_RUNTIME_FILTERING,             \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(                                                                          \
+      max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS, TQueryOptionLevel::ADVANCED)     \
+  QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8,             \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION,   \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(mt_dop, MT_DOP, TQueryOptionLevel::REGULAR)                               \
+  QUERY_OPT_FN(                                                                          \
+      s3_skip_insert_staging, S3_SKIP_INSERT_STAGING, TQueryOptionLevel::REGULAR)        \
+  QUERY_OPT_FN(                                                                          \
+      runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE, TQueryOptionLevel::ADVANCED)     \
+  QUERY_OPT_FN(                                                                          \
+      runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE, TQueryOptionLevel::ADVANCED)     \
+  QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE, TQueryOptionLevel::ADVANCED)                \
+  QUERY_OPT_FN(strict_mode, STRICT_MODE, TQueryOptionLevel::DEVELOPMENT)                 \
+  QUERY_OPT_FN(scratch_limit, SCRATCH_LIMIT, TQueryOptionLevel::REGULAR)                 \
+  QUERY_OPT_FN(enable_expr_rewrites, ENABLE_EXPR_REWRITES, TQueryOptionLevel::ADVANCED)  \
+  QUERY_OPT_FN(enable_cnf_rewrites, ENABLE_CNF_REWRITES, TQueryOptionLevel::ADVANCED)    \
+  QUERY_OPT_FN(decimal_v2, DECIMAL_V2, TQueryOptionLevel::DEVELOPMENT)                   \
+  QUERY_OPT_FN(parquet_dictionary_filtering, PARQUET_DICTIONARY_FILTERING,               \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      parquet_array_resolution, PARQUET_ARRAY_RESOLUTION, TQueryOptionLevel::REGULAR)    \
+  QUERY_OPT_FN(                                                                          \
+      parquet_read_statistics, PARQUET_READ_STATISTICS, TQueryOptionLevel::ADVANCED)     \
+  QUERY_OPT_FN(default_join_distribution_mode, DEFAULT_JOIN_DISTRIBUTION_MODE,           \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(disable_codegen_rows_threshold, DISABLE_CODEGEN_ROWS_THRESHOLD,           \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(default_spillable_buffer_size, DEFAULT_SPILLABLE_BUFFER_SIZE,             \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      min_spillable_buffer_size, MIN_SPILLABLE_BUFFER_SIZE, TQueryOptionLevel::ADVANCED) \
+  QUERY_OPT_FN(max_row_size, MAX_ROW_SIZE, TQueryOptionLevel::REGULAR)                   \
+  QUERY_OPT_FN(idle_session_timeout, IDLE_SESSION_TIMEOUT, TQueryOptionLevel::REGULAR)   \
+  QUERY_OPT_FN(compute_stats_min_sample_size, COMPUTE_STATS_MIN_SAMPLE_SIZE,             \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(exec_time_limit_s, EXEC_TIME_LIMIT_S, TQueryOptionLevel::REGULAR)         \
+  QUERY_OPT_FN(                                                                          \
+      shuffle_distinct_exprs, SHUFFLE_DISTINCT_EXPRS, TQueryOptionLevel::ADVANCED)       \
+  QUERY_OPT_FN(max_mem_estimate_for_admission, MAX_MEM_ESTIMATE_FOR_ADMISSION,           \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      thread_reservation_limit, THREAD_RESERVATION_LIMIT, TQueryOptionLevel::REGULAR)    \
+  QUERY_OPT_FN(thread_reservation_aggregate_limit, THREAD_RESERVATION_AGGREGATE_LIMIT,   \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(kudu_read_mode, KUDU_READ_MODE, TQueryOptionLevel::ADVANCED)              \
+  QUERY_OPT_FN(allow_erasure_coded_files, ALLOW_ERASURE_CODED_FILES,                     \
+      TQueryOptionLevel::DEVELOPMENT)                                                    \
+  QUERY_OPT_FN(timezone, TIMEZONE, TQueryOptionLevel::REGULAR)                           \
+  QUERY_OPT_FN(scan_bytes_limit, SCAN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)          \
+  QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT)                 \
+  QUERY_OPT_FN(topn_bytes_limit, TOPN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)          \
+  QUERY_OPT_FN(client_identifier, CLIENT_IDENTIFIER, TQueryOptionLevel::ADVANCED)        \
+  QUERY_OPT_FN(resource_trace_ratio, RESOURCE_TRACE_RATIO, TQueryOptionLevel::ADVANCED)  \
+  QUERY_OPT_FN(num_remote_executor_candidates, NUM_REMOTE_EXECUTOR_CANDIDATES,           \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      num_rows_produced_limit, NUM_ROWS_PRODUCED_LIMIT, TQueryOptionLevel::ADVANCED)     \
+  QUERY_OPT_FN(                                                                          \
+      planner_testcase_mode, PLANNER_TESTCASE_MODE, TQueryOptionLevel::DEVELOPMENT)      \
+  QUERY_OPT_FN(default_file_format, DEFAULT_FILE_FORMAT, TQueryOptionLevel::REGULAR)     \
+  QUERY_OPT_FN(                                                                          \
+      parquet_timestamp_type, PARQUET_TIMESTAMP_TYPE, TQueryOptionLevel::DEVELOPMENT)    \
+  QUERY_OPT_FN(                                                                          \
+      parquet_read_page_index, PARQUET_READ_PAGE_INDEX, TQueryOptionLevel::ADVANCED)     \
+  QUERY_OPT_FN(                                                                          \
+      parquet_write_page_index, PARQUET_WRITE_PAGE_INDEX, TQueryOptionLevel::ADVANCED)   \
+  QUERY_OPT_FN(parquet_page_row_count_limit, PARQUET_PAGE_ROW_COUNT_LIMIT,               \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(disable_hdfs_num_rows_estimate, DISABLE_HDFS_NUM_ROWS_ESTIMATE,           \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(default_hints_insert_statement, DEFAULT_HINTS_INSERT_STATEMENT,           \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(spool_query_results, SPOOL_QUERY_RESULTS, TQueryOptionLevel::DEVELOPMENT) \
+  QUERY_OPT_FN(default_transactional_type, DEFAULT_TRANSACTIONAL_TYPE,                   \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(statement_expression_limit, STATEMENT_EXPRESSION_LIMIT,                   \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(max_statement_length_bytes, MAX_STATEMENT_LENGTH_BYTES,                   \
+      TQueryOptionLevel::REGULAR)                                                        \
+  QUERY_OPT_FN(disable_data_cache, DISABLE_DATA_CACHE, TQueryOptionLevel::ADVANCED)      \
+  QUERY_OPT_FN(                                                                          \
+      max_result_spooling_mem, MAX_RESULT_SPOOLING_MEM, TQueryOptionLevel::DEVELOPMENT)  \
+  QUERY_OPT_FN(max_spilled_result_spooling_mem, MAX_SPILLED_RESULT_SPOOLING_MEM,         \
+      TQueryOptionLevel::DEVELOPMENT)                                                    \
+  QUERY_OPT_FN(disable_hbase_num_rows_estimate, DISABLE_HBASE_NUM_ROWS_ESTIMATE,         \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      fetch_rows_timeout_ms, FETCH_ROWS_TIMEOUT_MS, TQueryOptionLevel::ADVANCED)         \
+  QUERY_OPT_FN(now_string, NOW_STRING, TQueryOptionLevel::DEVELOPMENT)                   \
+  QUERY_OPT_FN(parquet_object_store_split_size, PARQUET_OBJECT_STORE_SPLIT_SIZE,         \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT) \
+  QUERY_OPT_FN(                                                                          \
+      broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)         \
+  QUERY_OPT_FN(preagg_bytes_limit, PREAGG_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)      \
+  QUERY_OPT_FN(max_cnf_exprs, MAX_CNF_EXPRS, TQueryOptionLevel::ADVANCED)                \
+  QUERY_OPT_FN(kudu_snapshot_read_timestamp_micros, KUDU_SNAPSHOT_READ_TIMESTAMP_MICROS, \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(retry_failed_queries, RETRY_FAILED_QUERIES, TQueryOptionLevel::REGULAR)   \
+  QUERY_OPT_FN(enabled_runtime_filter_types, ENABLED_RUNTIME_FILTER_TYPES,               \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(async_codegen, ASYNC_CODEGEN, TQueryOptionLevel::DEVELOPMENT)             \
+  QUERY_OPT_FN(enable_distinct_semi_join_optimization,                                   \
+      ENABLE_DISTINCT_SEMI_JOIN_OPTIMIZATION, TQueryOptionLevel::ADVANCED)               \
+  QUERY_OPT_FN(sort_run_bytes_limit, SORT_RUN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)  \
+  QUERY_OPT_FN(max_fs_writers, MAX_FS_WRITERS, TQueryOptionLevel::ADVANCED)              \
+  QUERY_OPT_FN(refresh_updated_hms_partitions, REFRESH_UPDATED_HMS_PARTITIONS,           \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(spool_all_results_for_retries, SPOOL_ALL_RESULTS_FOR_RETRIES,             \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      runtime_filter_error_rate, RUNTIME_FILTER_ERROR_RATE, TQueryOptionLevel::ADVANCED) \
+  QUERY_OPT_FN(use_local_tz_for_unix_timestamp_conversions,                              \
+      USE_LOCAL_TZ_FOR_UNIX_TIMESTAMP_CONVERSIONS, TQueryOptionLevel::ADVANCED)          \
+  QUERY_OPT_FN(convert_legacy_hive_parquet_utc_timestamps,                               \
+      CONVERT_LEGACY_HIVE_PARQUET_UTC_TIMESTAMPS, TQueryOptionLevel::ADVANCED)           \
+  QUERY_OPT_FN(enable_outer_join_to_inner_transformation,                                \
+      ENABLE_OUTER_JOIN_TO_INNER_TRANSFORMATION, TQueryOptionLevel::ADVANCED)            \
+  QUERY_OPT_FN(targeted_kudu_scan_range_length, TARGETED_KUDU_SCAN_RANGE_LENGTH,         \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(report_skew_limit, REPORT_SKEW_LIMIT, TQueryOptionLevel::ADVANCED)        \
+  QUERY_OPT_FN(optimize_simple_limit, OPTIMIZE_SIMPLE_LIMIT, TQueryOptionLevel::REGULAR) \
+  QUERY_OPT_FN(use_dop_for_costing, USE_DOP_FOR_COSTING, TQueryOptionLevel::ADVANCED)    \
+  QUERY_OPT_FN(broadcast_to_partition_factor, BROADCAST_TO_PARTITION_FACTOR,             \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      join_rows_produced_limit, JOIN_ROWS_PRODUCED_LIMIT, TQueryOptionLevel::ADVANCED)   \
+  QUERY_OPT_FN(utf8_mode, UTF8_MODE, TQueryOptionLevel::DEVELOPMENT)                     \
+  QUERY_OPT_FN(analytic_rank_pushdown_threshold, ANALYTIC_RANK_PUSHDOWN_THRESHOLD,       \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      minmax_filter_threshold, MINMAX_FILTER_THRESHOLD, TQueryOptionLevel::ADVANCED)     \
+  QUERY_OPT_FN(                                                                          \
+      minmax_filtering_level, MINMAX_FILTERING_LEVEL, TQueryOptionLevel::ADVANCED)       \
+  QUERY_OPT_FN(compute_column_minmax_stats, COMPUTE_COLUMN_MINMAX_STATS,                 \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      show_column_minmax_stats, SHOW_COLUMN_MINMAX_STATS, TQueryOptionLevel::ADVANCED)   \
+  QUERY_OPT_FN(default_ndv_scale, DEFAULT_NDV_SCALE, TQueryOptionLevel::ADVANCED)        \
+  QUERY_OPT_FN(                                                                          \
+      kudu_replica_selection, KUDU_REPLICA_SELECTION, TQueryOptionLevel::ADVANCED)       \
+  QUERY_OPT_FN(                                                                          \
+      delete_stats_in_truncate, DELETE_STATS_IN_TRUNCATE, TQueryOptionLevel::ADVANCED)   \
+  QUERY_OPT_FN(                                                                          \
+      parquet_bloom_filtering, PARQUET_BLOOM_FILTERING, TQueryOptionLevel::ADVANCED)     \
+  QUERY_OPT_FN(minmax_filter_sorted_columns, MINMAX_FILTER_SORTED_COLUMNS,               \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(minmax_filter_fast_code_path, MINMAX_FILTER_FAST_CODE_PATH,               \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(                                                                          \
+      enable_kudu_transaction, ENABLE_KUDU_TRANSACTION, TQueryOptionLevel::DEVELOPMENT)  \
+  QUERY_OPT_FN(minmax_filter_partition_columns, MINMAX_FILTER_PARTITION_COLUMNS,         \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(parquet_bloom_filter_write, PARQUET_BLOOM_FILTER_WRITE,                   \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(orc_read_statistics, ORC_READ_STATISTICS, TQueryOptionLevel::ADVANCED)    \
+  QUERY_OPT_FN(enable_async_ddl_execution, ENABLE_ASYNC_DDL_EXECUTION,                   \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(enable_async_load_data_execution, ENABLE_ASYNC_LOAD_DATA_EXECUTION,       \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(parquet_late_materialization_threshold,                                   \
+      PARQUET_LATE_MATERIALIZATION_THRESHOLD, TQueryOptionLevel::ADVANCED)               \
+  QUERY_OPT_FN(parquet_dictionary_runtime_filter_entry_limit,                            \
+      PARQUET_DICTIONARY_RUNTIME_FILTER_ENTRY_LIMIT, TQueryOptionLevel::ADVANCED)        \
+  QUERY_OPT_FN(abort_java_udf_on_exception, ABORT_JAVA_UDF_ON_EXCEPTION,                 \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(orc_async_read, ORC_ASYNC_READ, TQueryOptionLevel::ADVANCED)              \
+  QUERY_OPT_FN(runtime_in_list_filter_entry_limit, RUNTIME_IN_LIST_FILTER_ENTRY_LIMIT,   \
+      TQueryOptionLevel::ADVANCED)                                                       \
+  QUERY_OPT_FN(enable_replan, ENABLE_REPLAN, TQueryOptionLevel::ADVANCED)                \
+  QUERY_OPT_FN(test_replan, TEST_REPLAN, TQueryOptionLevel::ADVANCED)                    \
+  QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S, TQueryOptionLevel::REGULAR)   \
+  QUERY_OPT_FN(orc_schema_resolution, ORC_SCHEMA_RESOLUTION, TQueryOptionLevel::REGULAR);
 
 /// Enforce practical limits on some query options to avoid undesired query state.
 static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 8a118d241..60844d6c6 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -733,6 +733,9 @@ enum TImpalaQueryOptions {
 
   // Maximum wait time on HMS ACID lock in seconds.
   LOCK_MAX_WAIT_TIME_S = 145
+
+  // Determines how to resolve ORC files' schemas. Valid values are "position" and "name".
+  ORC_SCHEMA_RESOLUTION = 146;
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 4d45ad1c8..3e4726c14 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -591,6 +591,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   146: optional i32 lock_max_wait_time_s = 300
+
+  // See comment in ImpalaService.thrift
+  147: optional TSchemaResolutionStrategy orc_schema_resolution = 0;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
index db40813e5..d1549c529 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
@@ -74,10 +74,12 @@ BIGINT, DOUBLE, DECIMAL
 # * Partition FLOAT column to DOUBLE
 # * Partition DECIMAL(5,3) column to DECIMAL(8,3)
 # * Non-partition column has been moved to end of the schema
-# Currently this fails due to IMPALA-9410
 select * from functional_parquet.iceberg_legacy_partition_schema_evolution_orc
----- CATCH
-Parse error in possibly corrupt ORC file
+---- RESULTS
+1,1.100000023841858,2.718,2
+1,1.100000023841858,3.141,1
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
 ====
 ---- QUERY
 # Read only the partition columns.
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 4ac912b47..5a5788aac 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -164,6 +164,13 @@ def hs2_text_constraint(v):
           v.get_value('table_format').file_format == 'text' and
           v.get_value('table_format').compression_codec == 'none')
 
+
+def orc_schema_resolution_constraint(v):
+  """ Constraint to use multiple orc_schema_resolution only in case of orc files"""
+  file_format = v.get_value('table_format').file_format
+  orc_schema_resolution = v.get_value('orc_schema_resolution')
+  return file_format == 'orc' or orc_schema_resolution == 0
+
 # Common sets of values for the exec option vectors
 ALL_BATCH_SIZES = [0]
 
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index ef64e1181..23e46e6e5 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -37,7 +37,7 @@ from tests.common.skip import (
     )
 from tests.common.test_dimensions import (create_exec_option_dimension,
     create_exec_option_dimension_from_dict, create_client_protocol_dimension,
-    create_orc_dimension)
+    create_orc_dimension, orc_schema_resolution_constraint)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_HDFS
 
@@ -48,6 +48,13 @@ class TestNestedTypes(ImpalaTestSuite):
   def get_workload(self):
     return 'functional-query'
 
+  @staticmethod
+  def orc_schema_resolution_constraint(vector):
+    """ Constraint to use multiple orc_schema_resolution only in case of orc files"""
+    file_format = vector.get_value('table_format').file_format
+    orc_schema_resolution = vector.get_value('orc_schema_resolution')
+    return file_format == 'orc' or orc_schema_resolution == 0
+
   @classmethod
   def add_test_dimensions(cls):
     super(TestNestedTypes, cls).add_test_dimensions()
@@ -55,6 +62,8 @@ class TestNestedTypes(ImpalaTestSuite):
         v.get_value('table_format').file_format in ['parquet', 'orc'])
     cls.ImpalaTestMatrix.add_dimension(
         ImpalaTestDimension('mt_dop', 0, 2))
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+    cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
 
   def test_scanner_basic(self, vector):
     """Queries that do not materialize arrays."""
@@ -135,6 +144,8 @@ class TestNestedStructsInSelectList(ImpalaTestSuite):
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('protocol') == 'hs2')
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+    cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
 
   def test_struct_in_select_list(self, vector, unique_database):
     """Queries where a struct column is in the select list"""
@@ -170,6 +181,8 @@ class TestNestedTArraysInSelectList(ImpalaTestSuite):
         create_exec_option_dimension_from_dict({
             'disable_codegen': ['False', 'True']}))
     cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+    cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
 
   def test_array_in_select_list(self, vector, unique_database):
     """Queries where an array column is in the select list"""
@@ -215,6 +228,8 @@ class TestComputeStatsWithNestedTypes(ImpalaTestSuite):
     super(TestComputeStatsWithNestedTypes, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format in ['parquet', 'orc'])
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+    cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
 
   def test_compute_stats_with_structs(self, vector):
     """COMPUTE STATS and SHOW COLUMN STATS for tables with structs"""
@@ -232,6 +247,8 @@ class TestZippingUnnest(ImpalaTestSuite):
     super(TestZippingUnnest, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format in ['parquet', 'orc'])
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+    cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
 
   def test_zipping_unnest_in_from_clause(self, vector):
     """Queries where zipping unnest is executed by providing UNNEST() in the from clause.
@@ -261,6 +278,8 @@ class TestNestedTypesNoMtDop(ImpalaTestSuite):
     super(TestNestedTypesNoMtDop, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format in ['parquet', 'orc'])
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+    cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
 
   def test_tpch(self, vector):
     """Queries over the larger nested TPCH dataset."""
@@ -818,6 +837,8 @@ class TestMaxNestingDepth(ImpalaTestSuite):
     super(TestMaxNestingDepth, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format in ['parquet', 'orc'])
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
+    cls.ImpalaTestMatrix.add_constraint(orc_schema_resolution_constraint)
 
   def test_max_nesting_depth(self, vector, unique_database):
     """Tests that Impala can scan Parquet and ORC files having complex types of
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 53a15f0ae..112b53dc0 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1477,6 +1477,7 @@ class TestOrc(ImpalaTestSuite):
     super(TestOrc, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(
       lambda v: v.get_value('table_format').file_format == 'orc')
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('orc_schema_resolution', 0, 1))
 
   @SkipIfS3.hdfs_block_size
   @SkipIfGCS.hdfs_block_size


[impala] 04/05: IMPALA-11345: Parquet Bloom filtering failure if column is added to the schema

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0b2f6b7f35ab8de31d27631d2b693915848c99a5
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Mon Jul 25 16:22:27 2022 +0200

    IMPALA-11345: Parquet Bloom filtering failure if column is added to the
    schema
    
    If a new column was added to an existing table with existing data and
    Parquet Bloom filtering was turned ON, queries having an equality
    conjunct on the new column failed.
    
    This was because the old Parquet data files did not have the new column
    in their schema and could not find a column for the conjunct. This was
    treated as an error and the query failed.
    
    After this patch this situation is no longer treated as an error and the
    conjunct is simply disregarded for Bloom filtering in the files that
    lack the new column.
    
    Testing:
     - added the test
       TestParquetBloomFilter::test_parquet_bloom_filtering_schema_change in
       tests/query_test/test_parquet_bloom_filter.py that checks that a
       query as described above does not fail.
    
    Merge conflicts:
     - hdfs-parquet-scanner.cc removes usage of NeedDataInFile().
    
    Change-Id: Ief3e6b6358d3dff3abe5beeda752033a7e8e16a6
    Reviewed-on: http://gerrit.cloudera.org:8080/18779
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18888
    Tested-by: Quanlong Huang <hu...@gmail.com>
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-scanner.cc   | 72 ++++++++++++++++++++++++---
 tests/query_test/test_parquet_bloom_filter.py | 21 ++++++++
 2 files changed, 87 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index d8058c875..04b1fddaa 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -1924,14 +1924,40 @@ Status HdfsParquetScanner::ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64
   return Status::OK();
 }
 
+void LogMissingFields(google::LogSeverity log_level, const std::string& text_before,
+    const std::string& text_after, const std::unordered_set<std::string>& paths) {
+  stringstream s;
+  s << text_before;
+  s << "[";
+  size_t i = 0;
+  for (const std::string& path : paths) {
+    s << path;
+    if (i + 1 < paths.size()) {
+      s << ", ";
+    }
+    i++;
+  }
+
+  s << "]. ";
+  s << text_after;
+  VLOG(log_level) << s.str();
+}
+
 // Create a map from column index to EQ conjuncts for Bloom filtering.
 Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
   // EQ conjuncts are represented as a LE and a GE conjunct with the same
   // value. This map is used to pair them to form EQ conjuncts.
-  // The value is a vector because there may be multiple GE or LE conjuncts on a column.
+  // The value is a set because there may be multiple GE or LE conjuncts on a column.
   unordered_map<int, std::unordered_set<std::pair<std::string, const Literal*>>>
       conjunct_halves;
 
+  // Slot paths for which no data is found in the file. It is expected for example if it
+  // is a partition column and unexpected for example if the column was added to the table
+  // schema after the current file was written and therefore the current file does
+  // not have the column.
+  std::unordered_set<std::string> unexpected_missing_fields;
+  std::unordered_set<std::string> expected_missing_fields;
+
   for (ScalarExprEvaluator* eval : stats_conjunct_evals_) {
     const ScalarExpr& expr = eval->root();
     const string& function_name = expr.function_name();
@@ -1983,13 +2009,24 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
       }
 
       if (missing_field) {
-        if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue;
-
-        return Status(Substitute(
-            "Unable to find SchemaNode for path '$0' in the schema of file '$1'.",
-            PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()), filename()));
+        if (!file_metadata_utils_.IsValuePartitionCol(slot_desc)) {
+          // If a column is added to the schema of an existing table, the schemas of the
+          // old parquet data files do not contain the new column: see IMPALA-11345. This
+          // is not an error, we simply disregard this column in Bloom filtering in this
+          // scanner.
+          unexpected_missing_fields.emplace(
+              PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()));
+        } else {
+          // If the data is not expected to be in the file, we disregard the conjuncts for
+          // the purposes of Bloom filtering.
+          expected_missing_fields.emplace(
+              PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()));
+        }
+        continue;
       }
 
+      DCHECK(node != nullptr);
+
       if (!IsParquetBloomFilterSupported(node->element->type, child_slot_ref->type())) {
         continue;
       }
@@ -2029,6 +2066,29 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
     }
   }
 
+  // Log expected and unexpected missing fields.
+  if (!unexpected_missing_fields.empty()) {
+    LogMissingFields(google::WARNING,
+        Substitute(
+          "Unable to find SchemaNode for the following paths in the schema of "
+          "file '$0': ",
+          filename()),
+        "This may be because the column may have been added to the table schema after "
+        "writing this file. Disregarding conjuncts on this path for the purpose of "
+        "Parquet Bloom filtering in this file.",
+        unexpected_missing_fields);
+  }
+
+  if (!expected_missing_fields.empty()) {
+    LogMissingFields(google::INFO,
+        Substitute(
+          "Data for the following paths is not expected to be present in file '$0': ",
+          filename()),
+        "Disregarding conjuncts on this path for the purpose of Parquet Bloom filtering "
+        "in this file.",
+        expected_missing_fields);
+  }
+
   return Status::OK();
 }
 
diff --git a/tests/query_test/test_parquet_bloom_filter.py b/tests/query_test/test_parquet_bloom_filter.py
index eda93999b..11af6d28d 100644
--- a/tests/query_test/test_parquet_bloom_filter.py
+++ b/tests/query_test/test_parquet_bloom_filter.py
@@ -95,6 +95,27 @@ class TestParquetBloomFilter(ImpalaTestSuite):
     vector.get_value('exec_option')['parquet_bloom_filtering'] = False
     self.run_test_case('QueryTest/parquet-bloom-filter-disabled', vector, unique_database)
 
+  def test_parquet_bloom_filtering_schema_change(self, vector, unique_database):
+    """ Regression test for IMPALA-11345. Tests that the query does not fail when a new
+    column is added to the table schema but the old Parquet files do not contain it and
+    therefore no column is found for a conjunct while preparing Bloom filtering. """
+    vector.get_value('exec_option')['parquet_bloom_filtering'] = True
+
+    tbl_name = 'changed_schema'
+
+    stmts = [
+      'create table {db}.{tbl} (id INT) stored as parquet',
+      'insert into {db}.{tbl} values (1),(2),(3)',
+      'alter table {db}.{tbl} add columns (name STRING)',
+      'insert into {db}.{tbl} values (4, "James")',
+      'select * from {db}.{tbl} where name in ("Lily")'
+    ]
+
+    for stmt in stmts:
+      self.execute_query_expect_success(self.client,
+          stmt.format(db=str(unique_database), tbl=tbl_name),
+          vector.get_value('exec_option'))
+
   def test_write_parquet_bloom_filter(self, vector, unique_database, tmpdir):
     # Get Bloom filters from the first row group of file PARQUET_TEST_FILE.
     reference_col_to_bloom_filter = self._get_first_row_group_bloom_filters(


[impala] 03/05: IMPALA-11346: Migrated partitioned Iceberg tables might return ERROR when WHERE condition is used on partition column

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.1.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e535f4b8dd1f8c1b3f6911cb99efe2a96a7e441f
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Thu Jun 9 17:21:28 2022 +0200

    IMPALA-11346: Migrated partitioned Iceberg tables might return ERROR when WHERE condition is used on partition column
    
    Identity-partitioned columns are not necessarily stored in the data
    files. E.g. when we migrate a legacy partitioned table to Iceberg
    without rewriting the data files, the partition columns won't be
    present in the files.
    
    The Parquet scanner does a few optimizations to eliminate row groups,
    i.e. filtering based on stats, bloom filters, etc. When a column is
    not present in the data file that has some predicate on, then it is
    assumed that the whole row group doesn't pass the filtering criteria.
    
    But for Iceberg some files might contain partition columns, while
    other files doesn't, so we need to prepare the scanners to handle
    such cases.
    
    The ORC scanner doesn't have that many optimizations so it didn't
    ran into this issue.
    
    Testing:
     * e2e tests
    
    Merge conflicts due to missing 23d09638d:
     * file-metadata-utils.cc resolves trivial conflicts
     * hdfs-parquet-scanner.cc removes usage of NeedDataInFile()
    
    Change-Id: Ie706317888981f634d792fb570f3eab1ec11a4f4
    Reviewed-on: http://gerrit.cloudera.org:8080/18605
    Reviewed-by: Csaba Ringhofer <cs...@cloudera.com>
    Reviewed-by: Tamas Mate <tm...@apache.org>
    Reviewed-by: <li...@sensorsdata.cn>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/18887
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
    Tested-by: Quanlong Huang <hu...@gmail.com>
---
 be/src/exec/file-metadata-utils.cc                 |   1 -
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |   4 +
 .../queries/QueryTest/iceberg-migrated-tables.test | 138 +++++++++++++++++++++
 3 files changed, 142 insertions(+), 1 deletion(-)

diff --git a/be/src/exec/file-metadata-utils.cc b/be/src/exec/file-metadata-utils.cc
index 708b4f5c5..2a9230996 100644
--- a/be/src/exec/file-metadata-utils.cc
+++ b/be/src/exec/file-metadata-utils.cc
@@ -114,7 +114,6 @@ Tuple* FileMetadataUtils::CreateTemplateTuple(MemPool* mem_pool) {
 bool FileMetadataUtils::IsValuePartitionCol(const SlotDescriptor* slot_desc) {
   DCHECK(context_ != nullptr);
   DCHECK(file_desc_ != nullptr);
-  if (slot_desc->parent() != scan_node_->tuple_desc()) return false;
   if (slot_desc->col_pos() < scan_node_->num_partition_keys()) {
     return true;
   }
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index f79e02b91..d8058c875 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -566,6 +566,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
     RETURN_IF_ERROR(ResolveSchemaForStatFiltering(slot_desc, &missing_field, &node));
 
     if (missing_field) {
+      if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue;
       // We are selecting a column that is not in the file. We would set its slot to NULL
       // during the scan, so any predicate would evaluate to false. Return early. NULL
       // comparisons cannot happen here, since predicates with NULL literals are filtered
@@ -707,6 +708,7 @@ Status HdfsParquetScanner::EvaluateOverlapForRowGroup(
     RETURN_IF_ERROR(ResolveSchemaForStatFiltering(slot_desc, &missing_field, &node));
 
     if (missing_field) {
+      if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue;
       // We are selecting a column that is not in the file. We would set its slot to NULL
       // during the scan, so any predicate would evaluate to false. Return early. NULL
       // comparisons cannot happen here, since predicates with NULL literals are filtered
@@ -1981,6 +1983,8 @@ Status HdfsParquetScanner::CreateColIdx2EqConjunctMap() {
       }
 
       if (missing_field) {
+        if (file_metadata_utils_.IsValuePartitionCol(slot_desc)) continue;
+
         return Status(Substitute(
             "Unable to find SchemaNode for path '$0' in the schema of file '$1'.",
             PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()), filename()));
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
index d1549c529..ba278fcd4 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-migrated-tables.test
@@ -11,6 +11,54 @@ select * from functional_parquet.iceberg_alltypes_part
 INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
 ====
 ---- QUERY
+# Queries with WHERE clauses
+select * from functional_parquet.iceberg_alltypes_part
+where i = 1;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where i = 3;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where p_int = 1;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where p_int = 2;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where p_bool = true;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part
+where p_bool = false;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
 # Read only the partition columns.
 select p_bool, p_int, p_bigint, p_float,
        p_double, p_decimal, p_date, p_string
@@ -33,6 +81,54 @@ select * from functional_parquet.iceberg_alltypes_part_orc
 INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
 ====
 ---- QUERY
+# Queries with WHERE clauses
+select * from functional_parquet.iceberg_alltypes_part_orc
+where i = 1;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where i = 3;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where p_int = 1;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where p_int = 2;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where p_bool = true;
+---- RESULTS
+1,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+2,true,1,11,1.100000023841858,2.222,123.321,2022-02-22,'impala'
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc
+where p_bool = false;
+---- RESULTS
+---- TYPES
+INT, BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, DECIMAL, DATE, STRING
+====
+---- QUERY
 # Read only the partition columns.
 select p_bool, p_int, p_bigint, p_float,
        p_double, p_decimal, p_date, p_string
@@ -58,6 +154,23 @@ select * from functional_parquet.iceberg_legacy_partition_schema_evolution
 BIGINT, DOUBLE, DECIMAL, INT
 ====
 ---- QUERY
+select * from functional_parquet.iceberg_legacy_partition_schema_evolution
+where p_int_long = 1;
+---- RESULTS
+1,1.100000023841858,2.718,2
+1,1.100000023841858,3.141,1
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
+====
+---- QUERY
+select * from functional_parquet.iceberg_legacy_partition_schema_evolution
+where p_dec_dec = 2.718;
+---- RESULTS
+1,1.100000023841858,2.718,2
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
+====
+---- QUERY
 # Read only the partition columns.
 select p_int_long, p_float_double, p_dec_dec
 from functional_parquet.iceberg_legacy_partition_schema_evolution;
@@ -82,6 +195,23 @@ select * from functional_parquet.iceberg_legacy_partition_schema_evolution_orc
 BIGINT, DOUBLE, DECIMAL, INT
 ====
 ---- QUERY
+select * from functional_parquet.iceberg_legacy_partition_schema_evolution_orc
+where p_int_long = 1;
+---- RESULTS
+1,1.100000023841858,2.718,2
+1,1.100000023841858,3.141,1
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
+====
+---- QUERY
+select * from functional_parquet.iceberg_legacy_partition_schema_evolution_orc
+where p_dec_dec = 2.718;
+---- RESULTS
+1,1.100000023841858,2.718,2
+---- TYPES
+BIGINT, DOUBLE, DECIMAL, INT
+====
+---- QUERY
 # Read only the partition columns.
 select p_int_long, p_float_double, p_dec_dec
 from functional_parquet.iceberg_legacy_partition_schema_evolution_orc;
@@ -106,3 +236,11 @@ select * from only_part_cols;
 ---- TYPES
 INT, STRING
 ====
+---- QUERY
+select * from only_part_cols
+where i = 2 and s = 's'
+---- RESULTS
+2,'s'
+---- TYPES
+INT, STRING
+====