You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/02/17 12:38:06 UTC

[impala] branch master updated: IMPALA-11081: Fix incorrect results in partition key scan

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ff7b5db60 IMPALA-11081: Fix incorrect results in partition key scan
ff7b5db60 is described below

commit ff7b5db6002ccb047262cd7118e2e11ab09ef40a
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Fri Feb 3 16:43:08 2023 +0800

    IMPALA-11081: Fix incorrect results in partition key scan
    
    This patch fixes incorrect results caused by short-circuit partition
    key scan in the case where a Parquet/ORC file contains multiple
    blocks.
    
    IMPALA-8834 introduced the optimization that generating only one
    scan range that corresponding to the first block per file. Backends
    only issue footer ranges for Parquet/ORC files for file-metadata-only
    queries(see HdfsScanner::IssueFooterRanges()), which leads to
    incorrect results if the first block doesn't include a file footer.
    This bug is fixed by returning a scan range corresponding to the last
    block for Parquet/ORC files to make sure it contains a file footer.
    
    Testing:
    - Added e2e tests to verify the fix.
    
    Change-Id: I17331ed6c26a747e0509dcbaf427cd52808943b1
    Reviewed-on: http://gerrit.cloudera.org:8080/19471
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/planner/HdfsScanNode.java    | 10 ++++-
 tests/common/test_dimensions.py                    | 15 ++++++++
 tests/metadata/test_partition_metadata.py          |  9 +----
 tests/query_test/test_queries.py                   | 45 +++++++++++++++++++++-
 4 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 3d8243f4d..bb61b4dda 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1382,7 +1382,15 @@ public class HdfsScanNode extends ScanNode {
     boolean fileDescMissingDiskIds = false;
     long fileMaxScanRangeBytes = 0;
     FileSystemUtil.FsType fsType = partition.getFsType();
-    for (int i = 0; i < fileDesc.getNumFileBlocks(); ++i) {
+    int i = 0;
+    if (isPartitionKeyScan_ && (partition.getFileFormat().isParquetBased()
+        || partition.getFileFormat() == HdfsFileFormat.ORC)) {
+      // IMPALA-8834 introduced the optimization for partition key scan by generating
+      // one scan range for each HDFS file. With Parquet and ORC, we start with the last
+      // block is to get a scan range that contains a file footer for short-circuiting.
+      i = fileDesc.getNumFileBlocks() - 1;
+    }
+    for (; i < fileDesc.getNumFileBlocks(); ++i) {
       FbFileBlock block = fileDesc.getFbFileBlock(i);
       int replicaHostCount = FileBlock.getNumReplicaHosts(block);
       if (replicaHostCount == 0) {
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 7c4c3b2b5..16a8c504c 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -27,6 +27,21 @@ from tests.util.filesystem_utils import (
 
 WORKLOAD_DIR = os.environ['IMPALA_WORKLOAD_DIR']
 
+# Map from the test dimension file_format string to the SQL "STORED AS" or "STORED BY"
+# argument.
+FILE_FORMAT_TO_STORED_AS_MAP = {
+  'text': 'TEXTFILE',
+  'seq': 'SEQUENCEFILE',
+  'rc': 'RCFILE',
+  'orc': 'ORC',
+  'parquet': 'PARQUET',
+  'hudiparquet': 'HUDIPARQUET',
+  'avro': 'AVRO',
+  'hbase': "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'",
+  'kudu': "KUDU",
+  'iceberg': "ICEBERG"
+}
+
 # Describes the configuration used to execute a single tests. Contains both the details
 # of what specific table format to target along with the exec options (num_nodes, etc)
 # to use when running the query.
diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py
index 47024b1d1..0b922e38b 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -19,14 +19,9 @@ import pytest
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfFS, SkipIfLocal
 from tests.common.test_dimensions import (create_single_exec_option_dimension,
-    create_uncompressed_text_dimension)
+    create_uncompressed_text_dimension, FILE_FORMAT_TO_STORED_AS_MAP)
 from tests.util.filesystem_utils import WAREHOUSE, FILESYSTEM_PREFIX
 
-# Map from the test dimension file_format string to the SQL "STORED AS"
-# argument.
-STORED_AS_ARGS = { 'text': 'textfile', 'parquet': 'parquet', 'avro': 'avro',
-    'seq': 'sequencefile' }
-
 # Tests specific to partition metadata.
 # TODO: Split up the DDL tests and move some of the partition-specific tests
 # here.
@@ -59,7 +54,7 @@ class TestPartitionMetadata(ImpalaTestSuite):
     # Create the table
     self.client.execute(
         "create table %s (i int) partitioned by(j int) stored as %s location '%s'"
-        % (FQ_TBL_NAME, STORED_AS_ARGS[file_format], TBL_LOCATION))
+        % (FQ_TBL_NAME, FILE_FORMAT_TO_STORED_AS_MAP[file_format], TBL_LOCATION))
 
     # Point both partitions to the same location.
     self.client.execute("alter table %s add partition (j=1) location '%s/p'"
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index db311e913..5d747fcda 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -26,7 +26,9 @@ from tests.common.skip import SkipIfEC, SkipIfCatalogV2, SkipIfNotHdfsMinicluste
 from tests.common.test_dimensions import (
    create_uncompressed_text_dimension, create_exec_option_dimension_from_dict,
    create_client_protocol_dimension, hs2_parquet_constraint,
-   extend_exec_option_dimension)
+   extend_exec_option_dimension, FILE_FORMAT_TO_STORED_AS_MAP)
+from tests.util.filesystem_utils import get_fs_path
+from subprocess import check_call
 
 class TestQueries(ImpalaTestSuite):
 
@@ -326,6 +328,47 @@ class TestPartitionKeyScans(ImpalaTestSuite):
   def test_partition_key_scans_with_joins(self, vector):
     self.run_test_case('QueryTest/partition-key-scans-with-joins', vector)
 
+
+class TestPartitionKeyScansWithMultipleBlocks(ImpalaTestSuite):
+  """Tests for queries that exercise partition key scan optimisation with data files
+  that contain multiple blocks."""
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestPartitionKeyScansWithMultipleBlocks, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format not in ('kudu', 'hbase'))
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  def _build_alltypes_multiblocks_table(self, vector, unique_database):
+    file_format = vector.get_value('table_format').file_format
+    db_suffix = vector.get_value('table_format').db_suffix()
+    src_tbl_name = 'functional' + db_suffix + '.alltypes'
+    src_tbl_loc = self._get_table_location(src_tbl_name, vector)
+    source_file = src_tbl_loc + '/year=2010/month=12/*'
+    tbl_loc = get_fs_path("/test-warehouse/%s.db/alltypes_multiblocks"
+        % (unique_database))
+    file_path = tbl_loc + "/year=2010/month=12"
+
+    check_call(['hdfs', 'dfs', '-mkdir', '-p', file_path])
+    self.client.execute("""create table if not exists %s.alltypes_multiblocks
+        like functional.alltypes stored as %s location '%s';"""
+        % (unique_database, FILE_FORMAT_TO_STORED_AS_MAP[file_format], tbl_loc))
+
+    # set block size to 1024 so the target file occupies multiple blocks
+    check_call(['hdfs', 'dfs', '-Ddfs.block.size=1024', '-cp', '-f', '-d',
+        source_file, file_path])
+    self.client.execute("alter table %s.alltypes_multiblocks recover partitions"
+        % (unique_database))
+
+  def test_partition_key_scans_with_multiple_blocks_table(self, vector, unique_database):
+    self._build_alltypes_multiblocks_table(vector, unique_database)
+    result = self.execute_query_expect_success(self.client,
+          "SELECT max(year) FROM %s.alltypes_multiblocks" % (unique_database))
+    assert int(result.get_data()) == 2010
+
 class TestTopNReclaimQuery(ImpalaTestSuite):
   """Test class to validate that TopN periodically reclaims tuple pool memory
    and runs with a lower memory footprint."""