You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/02/17 12:38:06 UTC
[impala] branch master updated: IMPALA-11081: Fix incorrect results in partition key scan
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ff7b5db60 IMPALA-11081: Fix incorrect results in partition key scan
ff7b5db60 is described below
commit ff7b5db6002ccb047262cd7118e2e11ab09ef40a
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Fri Feb 3 16:43:08 2023 +0800
IMPALA-11081: Fix incorrect results in partition key scan
This patch fixes incorrect results caused by short-circuit partition
key scan in the case where a Parquet/ORC file contains multiple
blocks.
IMPALA-8834 introduced the optimization that generating only one
scan range that corresponding to the first block per file. Backends
only issue footer ranges for Parquet/ORC files for file-metadata-only
queries(see HdfsScanner::IssueFooterRanges()), which leads to
incorrect results if the first block doesn't include a file footer.
This bug is fixed by returning a scan range corresponding to the last
block for Parquet/ORC files to make sure it contains a file footer.
Testing:
- Added e2e tests to verify the fix.
Change-Id: I17331ed6c26a747e0509dcbaf427cd52808943b1
Reviewed-on: http://gerrit.cloudera.org:8080/19471
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../org/apache/impala/planner/HdfsScanNode.java | 10 ++++-
tests/common/test_dimensions.py | 15 ++++++++
tests/metadata/test_partition_metadata.py | 9 +----
tests/query_test/test_queries.py | 45 +++++++++++++++++++++-
4 files changed, 70 insertions(+), 9 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 3d8243f4d..bb61b4dda 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1382,7 +1382,15 @@ public class HdfsScanNode extends ScanNode {
boolean fileDescMissingDiskIds = false;
long fileMaxScanRangeBytes = 0;
FileSystemUtil.FsType fsType = partition.getFsType();
- for (int i = 0; i < fileDesc.getNumFileBlocks(); ++i) {
+ int i = 0;
+ if (isPartitionKeyScan_ && (partition.getFileFormat().isParquetBased()
+ || partition.getFileFormat() == HdfsFileFormat.ORC)) {
+ // IMPALA-8834 introduced the optimization for partition key scan by generating
+ // one scan range for each HDFS file. With Parquet and ORC, we start with the last
+ // block is to get a scan range that contains a file footer for short-circuiting.
+ i = fileDesc.getNumFileBlocks() - 1;
+ }
+ for (; i < fileDesc.getNumFileBlocks(); ++i) {
FbFileBlock block = fileDesc.getFbFileBlock(i);
int replicaHostCount = FileBlock.getNumReplicaHosts(block);
if (replicaHostCount == 0) {
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index 7c4c3b2b5..16a8c504c 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -27,6 +27,21 @@ from tests.util.filesystem_utils import (
WORKLOAD_DIR = os.environ['IMPALA_WORKLOAD_DIR']
+# Map from the test dimension file_format string to the SQL "STORED AS" or "STORED BY"
+# argument.
+FILE_FORMAT_TO_STORED_AS_MAP = {
+ 'text': 'TEXTFILE',
+ 'seq': 'SEQUENCEFILE',
+ 'rc': 'RCFILE',
+ 'orc': 'ORC',
+ 'parquet': 'PARQUET',
+ 'hudiparquet': 'HUDIPARQUET',
+ 'avro': 'AVRO',
+ 'hbase': "'org.apache.hadoop.hive.hbase.HBaseStorageHandler'",
+ 'kudu': "KUDU",
+ 'iceberg': "ICEBERG"
+}
+
# Describes the configuration used to execute a single tests. Contains both the details
# of what specific table format to target along with the exec options (num_nodes, etc)
# to use when running the query.
diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py
index 47024b1d1..0b922e38b 100644
--- a/tests/metadata/test_partition_metadata.py
+++ b/tests/metadata/test_partition_metadata.py
@@ -19,14 +19,9 @@ import pytest
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfFS, SkipIfLocal
from tests.common.test_dimensions import (create_single_exec_option_dimension,
- create_uncompressed_text_dimension)
+ create_uncompressed_text_dimension, FILE_FORMAT_TO_STORED_AS_MAP)
from tests.util.filesystem_utils import WAREHOUSE, FILESYSTEM_PREFIX
-# Map from the test dimension file_format string to the SQL "STORED AS"
-# argument.
-STORED_AS_ARGS = { 'text': 'textfile', 'parquet': 'parquet', 'avro': 'avro',
- 'seq': 'sequencefile' }
-
# Tests specific to partition metadata.
# TODO: Split up the DDL tests and move some of the partition-specific tests
# here.
@@ -59,7 +54,7 @@ class TestPartitionMetadata(ImpalaTestSuite):
# Create the table
self.client.execute(
"create table %s (i int) partitioned by(j int) stored as %s location '%s'"
- % (FQ_TBL_NAME, STORED_AS_ARGS[file_format], TBL_LOCATION))
+ % (FQ_TBL_NAME, FILE_FORMAT_TO_STORED_AS_MAP[file_format], TBL_LOCATION))
# Point both partitions to the same location.
self.client.execute("alter table %s add partition (j=1) location '%s/p'"
diff --git a/tests/query_test/test_queries.py b/tests/query_test/test_queries.py
index db311e913..5d747fcda 100644
--- a/tests/query_test/test_queries.py
+++ b/tests/query_test/test_queries.py
@@ -26,7 +26,9 @@ from tests.common.skip import SkipIfEC, SkipIfCatalogV2, SkipIfNotHdfsMinicluste
from tests.common.test_dimensions import (
create_uncompressed_text_dimension, create_exec_option_dimension_from_dict,
create_client_protocol_dimension, hs2_parquet_constraint,
- extend_exec_option_dimension)
+ extend_exec_option_dimension, FILE_FORMAT_TO_STORED_AS_MAP)
+from tests.util.filesystem_utils import get_fs_path
+from subprocess import check_call
class TestQueries(ImpalaTestSuite):
@@ -326,6 +328,47 @@ class TestPartitionKeyScans(ImpalaTestSuite):
def test_partition_key_scans_with_joins(self, vector):
self.run_test_case('QueryTest/partition-key-scans-with-joins', vector)
+
+class TestPartitionKeyScansWithMultipleBlocks(ImpalaTestSuite):
+ """Tests for queries that exercise partition key scan optimisation with data files
+ that contain multiple blocks."""
+ @classmethod
+ def add_test_dimensions(cls):
+ super(TestPartitionKeyScansWithMultipleBlocks, cls).add_test_dimensions()
+ cls.ImpalaTestMatrix.add_constraint(lambda v:
+ v.get_value('table_format').file_format not in ('kudu', 'hbase'))
+
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ def _build_alltypes_multiblocks_table(self, vector, unique_database):
+ file_format = vector.get_value('table_format').file_format
+ db_suffix = vector.get_value('table_format').db_suffix()
+ src_tbl_name = 'functional' + db_suffix + '.alltypes'
+ src_tbl_loc = self._get_table_location(src_tbl_name, vector)
+ source_file = src_tbl_loc + '/year=2010/month=12/*'
+ tbl_loc = get_fs_path("/test-warehouse/%s.db/alltypes_multiblocks"
+ % (unique_database))
+ file_path = tbl_loc + "/year=2010/month=12"
+
+ check_call(['hdfs', 'dfs', '-mkdir', '-p', file_path])
+ self.client.execute("""create table if not exists %s.alltypes_multiblocks
+ like functional.alltypes stored as %s location '%s';"""
+ % (unique_database, FILE_FORMAT_TO_STORED_AS_MAP[file_format], tbl_loc))
+
+ # set block size to 1024 so the target file occupies multiple blocks
+ check_call(['hdfs', 'dfs', '-Ddfs.block.size=1024', '-cp', '-f', '-d',
+ source_file, file_path])
+ self.client.execute("alter table %s.alltypes_multiblocks recover partitions"
+ % (unique_database))
+
+ def test_partition_key_scans_with_multiple_blocks_table(self, vector, unique_database):
+ self._build_alltypes_multiblocks_table(vector, unique_database)
+ result = self.execute_query_expect_success(self.client,
+ "SELECT max(year) FROM %s.alltypes_multiblocks" % (unique_database))
+ assert int(result.get_data()) == 2010
+
class TestTopNReclaimQuery(ImpalaTestSuite):
"""Test class to validate that TopN periodically reclaims tuple pool memory
and runs with a lower memory footprint."""