You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/02/01 17:53:59 UTC

[2/3] incubator-impala git commit: IMPALA-3989: Display skew warning for poorly formatted Parquet files

IMPALA-3989: Display skew warning for poorly formatted Parquet files

Parquet files are scanned in the granularity of row groups. Each row
group belongs to one or more splits and each split is scanned by a
separate parquet scanner.

If some row groups span multiple splits, then we will most likely end
up seeing some scanners having remote reads and some scanners not
performing scans at all. This will attribute to skew across the
cluster where distribution of scans is uneven.

This change adds a counter (NumScannersWithNoReads) to the scan node's
runtime profile to track the number of parquet scanners that end up
doing no reads becuse of poor formatting.

Change-Id: Ibf48d978383d73efdade733a892e795ebd53c76a
Reviewed-on: http://gerrit.cloudera.org:8080/5400
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8f59ce9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8f59ce9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8f59ce9d

Branch: refs/heads/master
Commit: 8f59ce9dfc636cc9f6f03ca9f5ee289ca7cca602
Parents: 446d95e
Author: Attila Jeges <at...@cloudera.com>
Authored: Fri Dec 2 17:20:16 2016 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 31 22:17:13 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 56 +++++++++++++++++++++++++++-----
 be/src/exec/hdfs-parquet-scanner.h  |  4 +++
 tests/query_test/test_scanners.py   | 56 ++++++++++++++++++++++++++++++++
 3 files changed, 107 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8f59ce9d/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 31439f2..1af711e 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -158,6 +158,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
       process_footer_timer_stats_(NULL),
       num_cols_counter_(NULL),
       num_row_groups_counter_(NULL),
+      num_scanners_with_no_reads_counter_(NULL),
       codegend_process_scratch_batch_fn_(NULL) {
   assemble_rows_timer_.Stop();
 }
@@ -170,6 +171,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
       ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
   num_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
+  num_scanners_with_no_reads_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT);
   process_footer_timer_stats_ =
       ADD_SUMMARY_STATS_TIMER(
           scan_node_->runtime_profile(), "FooterProcessingTime");
@@ -311,6 +314,24 @@ static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) {
   return start_offset + (end_offset - start_offset) / 2;
 }
 
+// Returns true if 'row_group' overlaps with 'split_range'.
+static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
+    const DiskIoMgr::ScanRange* split_range) {
+  int64_t row_group_start = GetColumnStartOffset(row_group.columns[0].meta_data);
+
+  const parquet::ColumnMetaData& last_column =
+      row_group.columns[row_group.columns.size() - 1].meta_data;
+  int64_t row_group_end =
+      GetColumnStartOffset(last_column) + last_column.total_compressed_size;
+
+  int64_t split_start = split_range->offset();
+  int64_t split_end = split_start + split_range->len();
+
+  return (split_start >= row_group_start && split_start < row_group_end) ||
+      (split_end > row_group_start && split_end <= row_group_end) ||
+      (split_start <= row_group_start && split_end >= row_group_end);
+}
+
 int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& column_readers) {
   DCHECK(!column_readers.empty());
   int num_columns = 0;
@@ -431,6 +452,16 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
 }
 
 Status HdfsParquetScanner::NextRowGroup() {
+  const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
+      metadata_range_->meta_data())->original_split;
+  int64_t split_offset = split_range->offset();
+  int64_t split_length = split_range->len();
+
+  HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
+
+  bool start_with_first_row_group = row_group_idx_ == -1;
+  bool misaligned_row_group_skipped = false;
+
   advance_row_group_ = false;
   row_group_rows_read_ = 0;
 
@@ -442,26 +473,33 @@ Status HdfsParquetScanner::NextRowGroup() {
     parse_status_ = Status::OK();
 
     ++row_group_idx_;
-    if (row_group_idx_ >= file_metadata_.row_groups.size()) break;
+    if (row_group_idx_ >= file_metadata_.row_groups.size()) {
+      if (start_with_first_row_group && misaligned_row_group_skipped) {
+        // We started with the first row group and skipped all the row groups because
+        // they were misaligned. The execution flow won't reach this point if there is at
+        // least one non-empty row group which this scanner can process.
+        COUNTER_ADD(num_scanners_with_no_reads_counter_, 1);
+      }
+      break;
+    }
     const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
     // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 'select *'
     // behave consistently for corrupt files that have 'file_metadata_.num_rows == 0'
     // but some data in row groups.
-    if (row_group.num_rows == 0|| file_metadata_.num_rows == 0) continue;
+    if (row_group.num_rows == 0 || file_metadata_.num_rows == 0) continue;
 
-    const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
-        metadata_range_->meta_data())->original_split;
-    HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
     RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
         file_desc->filename, file_desc->file_length, row_group));
 
+    // A row group is processed by the scanner whose split overlaps with the row
+    // group's mid point.
     int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group);
-    int64_t split_offset = split_range->offset();
-    int64_t split_length = split_range->len();
     if (!(row_group_mid_pos >= split_offset &&
         row_group_mid_pos < split_offset + split_length)) {
-      // A row group is processed by the scanner whose split overlaps with the row
-      // group's mid point. This row group will be handled by a different scanner.
+      // The mid-point does not fall within the split, this row group will be handled by a
+      // different scanner.
+      // If the row group overlaps with the split, we found a misaligned row group.
+      misaligned_row_group_skipped |= CheckRowGroupOverlapsSplit(row_group, split_range);
       continue;
     }
     COUNTER_ADD(num_row_groups_counter_, 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8f59ce9d/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 15d64ec..02d7d5d 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -443,6 +443,10 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of row groups that need to be read.
   RuntimeProfile::Counter* num_row_groups_counter_;
 
+  /// Number of scanners that end up doing no reads because their splits don't overlap
+  /// with the midpoint of any row-group in the file.
+  RuntimeProfile::Counter* num_scanners_with_no_reads_counter_;
+
   typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
   /// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise.
   ProcessScratchBatchFn codegend_process_scratch_batch_fn_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8f59ce9d/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 21c0d9d..de4f826 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -41,6 +41,7 @@ from tests.common.test_result_verifier import (
     parse_result_rows)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
+from tests.util.hdfs_util import NAMENODE
 from tests.util.get_parquet_metadata import get_parquet_metadata
 from tests.util.test_file_parser import QueryTestSectionReader
 
@@ -314,6 +315,61 @@ class TestParquet(ImpalaTestSuite):
   @SkipIfS3.hdfs_block_size
   @SkipIfIsilon.hdfs_block_size
   @SkipIfLocal.multiple_impalad
+  def test_misaligned_parquet_row_groups(self, vector):
+    """IMPALA-3989: Test that no warnings are issued when misaligned row groups are
+    encountered. Make sure that 'NumScannersWithNoReads' counters are set to the number of
+    scanners that end up doing no reads because of misaligned row groups.
+    """
+    # functional.parquet.alltypes is well-formatted. 'NumScannersWithNoReads' counters are
+    # set to 0.
+    table_name = 'functional_parquet.alltypes'
+    self._misaligned_parquet_row_groups_helper(table_name, 7300)
+    # lineitem_multiblock_parquet/000000_0 is ill-formatted but every scanner reads some
+    # row groups. 'NumScannersWithNoReads' counters are set to 0.
+    table_name = 'functional_parquet.lineitem_multiblock'
+    self._misaligned_parquet_row_groups_helper(table_name, 20000)
+    # lineitem_sixblocks.parquet is ill-formatted but every scanner reads some row groups.
+    # 'NumScannersWithNoReads' counters are set to 0.
+    table_name = 'functional_parquet.lineitem_sixblocks'
+    self._misaligned_parquet_row_groups_helper(table_name, 40000)
+    # Scanning lineitem_one_row_group.parquet finds two scan ranges that end up doing no
+    # reads because the file is poorly formatted.
+    table_name = 'functional_parquet.lineitem_multiblock_one_row_group'
+    self._misaligned_parquet_row_groups_helper(
+        table_name, 40000, num_scanners_with_no_reads=2)
+
+  def _misaligned_parquet_row_groups_helper(
+      self, table_name, rows_in_table, num_scanners_with_no_reads=0, log_prefix=None):
+    """Checks if executing a query logs any warnings and if there are any scanners that
+    end up doing no reads. 'log_prefix' specifies the prefix of the expected warning.
+    'num_scanners_with_no_reads' indicates the expected number of scanners that don't read
+    anything because the underlying file is poorly formatted
+    """
+    query = 'select * from %s' % table_name
+    result = self.client.execute(query)
+    assert len(result.data) == rows_in_table
+    assert (not result.log and not log_prefix) or \
+        (log_prefix and result.log.startswith(log_prefix))
+
+    runtime_profile = str(result.runtime_profile)
+    num_scanners_with_no_reads_list = re.findall(
+        'NumScannersWithNoReads: ([0-9]*)', runtime_profile)
+
+    # This will fail if the number of impalads != 3
+    # The fourth fragment is the "Averaged Fragment"
+    assert len(num_scanners_with_no_reads_list) == 4
+
+    # Calculate the total number of scan ranges that ended up not reading anything because
+    # an underlying file was poorly formatted.
+    # Skip the Averaged Fragment; it comes first in the runtime profile.
+    total = 0
+    for n in num_scanners_with_no_reads_list[1:]:
+      total += int(n)
+    assert total == num_scanners_with_no_reads
+
+  @SkipIfS3.hdfs_block_size
+  @SkipIfIsilon.hdfs_block_size
+  @SkipIfLocal.multiple_impalad
   def test_multiple_blocks(self, vector):
     # For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 blocks, so
     # each impalad should read 1 scan range.