You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/02/01 17:53:58 UTC

[1/3] incubator-impala git commit: [DOCS] List SCHEDULE_RANDOM_REPLICA in alphabetical order.

Repository: incubator-impala
Updated Branches:
  refs/heads/master a4eb4705c -> e3566ac04


[DOCS] List SCHEDULE_RANDOM_REPLICA in alphabetical order.

Change-Id: I1e5ad55b588b668c4b4dfc700f375e39b2453e28
Reviewed-on: http://gerrit.cloudera.org:8080/5835
Reviewed-by: Lars Volker <lv...@cloudera.com>
Reviewed-by: John Russell <jr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/446d95ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/446d95ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/446d95ee

Branch: refs/heads/master
Commit: 446d95eed38d4aae38235ead0a0f41f3323999c8
Parents: a4eb470
Author: John Russell <jr...@cloudera.com>
Authored: Tue Jan 31 11:08:47 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 31 22:12:40 2017 +0000

----------------------------------------------------------------------
 docs/impala.ditamap | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/446d95ee/docs/impala.ditamap
----------------------------------------------------------------------
diff --git a/docs/impala.ditamap b/docs/impala.ditamap
index 172319e..10876ba 100644
--- a/docs/impala.ditamap
+++ b/docs/impala.ditamap
@@ -205,7 +205,6 @@ under the License.
           <topicref rev="2.6.0 IMPALA-3286" href="topics/impala_prefetch_mode.xml"/>
           <topicref href="topics/impala_query_timeout_s.xml"/>
           <topicref href="topics/impala_request_pool.xml"/>
-          <topicref rev="2.5.0" href="topics/impala_schedule_random_replica.xml"/>
           <topicref rev="2.7.0" href="topics/impala_replica_preference.xml"/>
           <topicref audience="hidden" href="topics/impala_rm_initial_mem.xml"/>
           <topicref href="topics/impala_reservation_request_timeout.xml"/>
@@ -217,6 +216,7 @@ under the License.
           <topicref rev="2.6.0" href="topics/impala_s3_skip_insert_staging.xml"/>
           <topicref rev="2.5.0" href="topics/impala_scan_node_codegen_threshold.xml"/>
           <topicref rev="2.8.0 IMPALA-3671" href="topics/impala_scratch_limit.xml"/>
+          <topicref rev="2.5.0" href="topics/impala_schedule_random_replica.xml"/>
           <!-- This option is for internal use only and might go away without ever being documented. -->
           <!-- <topicref href="topics/impala_seq_compression_mode.xml"/> -->
           <topicref href="topics/impala_support_start_over.xml"/>


[2/3] incubator-impala git commit: IMPALA-3989: Display skew warning for poorly formatted Parquet files

Posted by ta...@apache.org.
IMPALA-3989: Display skew warning for poorly formatted Parquet files

Parquet files are scanned in the granularity of row groups. Each row
group belongs to one or more splits and each split is scanned by a
separate parquet scanner.

If some row groups span multiple splits, then we will most likely end
up seeing some scanners having remote reads and some scanners not
performing scans at all. This will attribute to skew across the
cluster where distribution of scans is uneven.

This change adds a counter (NumScannersWithNoReads) to the scan node's
runtime profile to track the number of parquet scanners that end up
doing no reads becuse of poor formatting.

Change-Id: Ibf48d978383d73efdade733a892e795ebd53c76a
Reviewed-on: http://gerrit.cloudera.org:8080/5400
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8f59ce9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8f59ce9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8f59ce9d

Branch: refs/heads/master
Commit: 8f59ce9dfc636cc9f6f03ca9f5ee289ca7cca602
Parents: 446d95e
Author: Attila Jeges <at...@cloudera.com>
Authored: Fri Dec 2 17:20:16 2016 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Jan 31 22:17:13 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 56 +++++++++++++++++++++++++++-----
 be/src/exec/hdfs-parquet-scanner.h  |  4 +++
 tests/query_test/test_scanners.py   | 56 ++++++++++++++++++++++++++++++++
 3 files changed, 107 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8f59ce9d/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 31439f2..1af711e 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -158,6 +158,7 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
       process_footer_timer_stats_(NULL),
       num_cols_counter_(NULL),
       num_row_groups_counter_(NULL),
+      num_scanners_with_no_reads_counter_(NULL),
       codegend_process_scratch_batch_fn_(NULL) {
   assemble_rows_timer_.Stop();
 }
@@ -170,6 +171,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
       ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
   num_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
+  num_scanners_with_no_reads_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT);
   process_footer_timer_stats_ =
       ADD_SUMMARY_STATS_TIMER(
           scan_node_->runtime_profile(), "FooterProcessingTime");
@@ -311,6 +314,24 @@ static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) {
   return start_offset + (end_offset - start_offset) / 2;
 }
 
+// Returns true if 'row_group' overlaps with 'split_range'.
+static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
+    const DiskIoMgr::ScanRange* split_range) {
+  int64_t row_group_start = GetColumnStartOffset(row_group.columns[0].meta_data);
+
+  const parquet::ColumnMetaData& last_column =
+      row_group.columns[row_group.columns.size() - 1].meta_data;
+  int64_t row_group_end =
+      GetColumnStartOffset(last_column) + last_column.total_compressed_size;
+
+  int64_t split_start = split_range->offset();
+  int64_t split_end = split_start + split_range->len();
+
+  return (split_start >= row_group_start && split_start < row_group_end) ||
+      (split_end > row_group_start && split_end <= row_group_end) ||
+      (split_start <= row_group_start && split_end >= row_group_end);
+}
+
 int HdfsParquetScanner::CountScalarColumns(const vector<ParquetColumnReader*>& column_readers) {
   DCHECK(!column_readers.empty());
   int num_columns = 0;
@@ -431,6 +452,16 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
 }
 
 Status HdfsParquetScanner::NextRowGroup() {
+  const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
+      metadata_range_->meta_data())->original_split;
+  int64_t split_offset = split_range->offset();
+  int64_t split_length = split_range->len();
+
+  HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
+
+  bool start_with_first_row_group = row_group_idx_ == -1;
+  bool misaligned_row_group_skipped = false;
+
   advance_row_group_ = false;
   row_group_rows_read_ = 0;
 
@@ -442,26 +473,33 @@ Status HdfsParquetScanner::NextRowGroup() {
     parse_status_ = Status::OK();
 
     ++row_group_idx_;
-    if (row_group_idx_ >= file_metadata_.row_groups.size()) break;
+    if (row_group_idx_ >= file_metadata_.row_groups.size()) {
+      if (start_with_first_row_group && misaligned_row_group_skipped) {
+        // We started with the first row group and skipped all the row groups because
+        // they were misaligned. The execution flow won't reach this point if there is at
+        // least one non-empty row group which this scanner can process.
+        COUNTER_ADD(num_scanners_with_no_reads_counter_, 1);
+      }
+      break;
+    }
     const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
     // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 'select *'
     // behave consistently for corrupt files that have 'file_metadata_.num_rows == 0'
     // but some data in row groups.
-    if (row_group.num_rows == 0|| file_metadata_.num_rows == 0) continue;
+    if (row_group.num_rows == 0 || file_metadata_.num_rows == 0) continue;
 
-    const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>(
-        metadata_range_->meta_data())->original_split;
-    HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
     RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
         file_desc->filename, file_desc->file_length, row_group));
 
+    // A row group is processed by the scanner whose split overlaps with the row
+    // group's mid point.
     int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group);
-    int64_t split_offset = split_range->offset();
-    int64_t split_length = split_range->len();
     if (!(row_group_mid_pos >= split_offset &&
         row_group_mid_pos < split_offset + split_length)) {
-      // A row group is processed by the scanner whose split overlaps with the row
-      // group's mid point. This row group will be handled by a different scanner.
+      // The mid-point does not fall within the split, this row group will be handled by a
+      // different scanner.
+      // If the row group overlaps with the split, we found a misaligned row group.
+      misaligned_row_group_skipped |= CheckRowGroupOverlapsSplit(row_group, split_range);
       continue;
     }
     COUNTER_ADD(num_row_groups_counter_, 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8f59ce9d/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 15d64ec..02d7d5d 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -443,6 +443,10 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of row groups that need to be read.
   RuntimeProfile::Counter* num_row_groups_counter_;
 
+  /// Number of scanners that end up doing no reads because their splits don't overlap
+  /// with the midpoint of any row-group in the file.
+  RuntimeProfile::Counter* num_scanners_with_no_reads_counter_;
+
   typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
   /// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise.
   ProcessScratchBatchFn codegend_process_scratch_batch_fn_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8f59ce9d/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 21c0d9d..de4f826 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -41,6 +41,7 @@ from tests.common.test_result_verifier import (
     parse_result_rows)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
+from tests.util.hdfs_util import NAMENODE
 from tests.util.get_parquet_metadata import get_parquet_metadata
 from tests.util.test_file_parser import QueryTestSectionReader
 
@@ -314,6 +315,61 @@ class TestParquet(ImpalaTestSuite):
   @SkipIfS3.hdfs_block_size
   @SkipIfIsilon.hdfs_block_size
   @SkipIfLocal.multiple_impalad
+  def test_misaligned_parquet_row_groups(self, vector):
+    """IMPALA-3989: Test that no warnings are issued when misaligned row groups are
+    encountered. Make sure that 'NumScannersWithNoReads' counters are set to the number of
+    scanners that end up doing no reads because of misaligned row groups.
+    """
+    # functional.parquet.alltypes is well-formatted. 'NumScannersWithNoReads' counters are
+    # set to 0.
+    table_name = 'functional_parquet.alltypes'
+    self._misaligned_parquet_row_groups_helper(table_name, 7300)
+    # lineitem_multiblock_parquet/000000_0 is ill-formatted but every scanner reads some
+    # row groups. 'NumScannersWithNoReads' counters are set to 0.
+    table_name = 'functional_parquet.lineitem_multiblock'
+    self._misaligned_parquet_row_groups_helper(table_name, 20000)
+    # lineitem_sixblocks.parquet is ill-formatted but every scanner reads some row groups.
+    # 'NumScannersWithNoReads' counters are set to 0.
+    table_name = 'functional_parquet.lineitem_sixblocks'
+    self._misaligned_parquet_row_groups_helper(table_name, 40000)
+    # Scanning lineitem_one_row_group.parquet finds two scan ranges that end up doing no
+    # reads because the file is poorly formatted.
+    table_name = 'functional_parquet.lineitem_multiblock_one_row_group'
+    self._misaligned_parquet_row_groups_helper(
+        table_name, 40000, num_scanners_with_no_reads=2)
+
+  def _misaligned_parquet_row_groups_helper(
+      self, table_name, rows_in_table, num_scanners_with_no_reads=0, log_prefix=None):
+    """Checks if executing a query logs any warnings and if there are any scanners that
+    end up doing no reads. 'log_prefix' specifies the prefix of the expected warning.
+    'num_scanners_with_no_reads' indicates the expected number of scanners that don't read
+    anything because the underlying file is poorly formatted
+    """
+    query = 'select * from %s' % table_name
+    result = self.client.execute(query)
+    assert len(result.data) == rows_in_table
+    assert (not result.log and not log_prefix) or \
+        (log_prefix and result.log.startswith(log_prefix))
+
+    runtime_profile = str(result.runtime_profile)
+    num_scanners_with_no_reads_list = re.findall(
+        'NumScannersWithNoReads: ([0-9]*)', runtime_profile)
+
+    # This will fail if the number of impalads != 3
+    # The fourth fragment is the "Averaged Fragment"
+    assert len(num_scanners_with_no_reads_list) == 4
+
+    # Calculate the total number of scan ranges that ended up not reading anything because
+    # an underlying file was poorly formatted.
+    # Skip the Averaged Fragment; it comes first in the runtime profile.
+    total = 0
+    for n in num_scanners_with_no_reads_list[1:]:
+      total += int(n)
+    assert total == num_scanners_with_no_reads
+
+  @SkipIfS3.hdfs_block_size
+  @SkipIfIsilon.hdfs_block_size
+  @SkipIfLocal.multiple_impalad
   def test_multiple_blocks(self, vector):
     # For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 blocks, so
     # each impalad should read 1 scan range.


[3/3] incubator-impala git commit: IMPALA-4808: old hash join can reference invalid memory

Posted by ta...@apache.org.
IMPALA-4808: old hash join can reference invalid memory

The bug was that 'probe_rows_exist' could be true even if
there was no current probe row. The node can get into this
state if it takes the branch at line 390.

I tried to reproduce the crash but was unable to after a
few attempts.

Change-Id: Ic068bbc3e029264d1ce814d286e372391639fa68
Reviewed-on: http://gerrit.cloudera.org:8080/5850
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e3566ac0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e3566ac0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e3566ac0

Branch: refs/heads/master
Commit: e3566ac048a122a8c6c88ac696a3cb1f9f31b2fc
Parents: 8f59ce9
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jan 26 12:04:39 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 1 07:56:56 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hash-join-node.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e3566ac0/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 77807f6..ecde742 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -350,7 +350,7 @@ Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos
 
     // If a probe row exists at this point, check whether we need to output the current
     // probe row before getting a new probe batch. (IMPALA-2440)
-    bool probe_row_exists = !probe_side_eos_ || probe_batch_->num_rows() > 0;
+    bool probe_row_exists = probe_batch_->num_rows() > 0;
     if (match_all_probe_ && !matched_probe_ && probe_row_exists) {
       int row_idx = out_batch->AddRow();
       TupleRow* out_row = out_batch->GetRow(row_idx);