You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/08/19 23:42:39 UTC

[2/2] incubator-impala git commit: IMPALA-3962: Clean up scratch tuple batch on scan failures

IMPALA-3962: Clean up scratch tuple batch on scan failures

The parquet scanner doesn't clean up 'scratch_batch_' properly
which causes it to process a partially filled scratch_batch_
if any of the column reader fails. This change cleans up the
scratch batch if any of the parquet column readers fails.
The clean up involves freeing the mem_pool of scratch_batch_
and setting number of tuples in scratch_batch_ to 0.

This change also extends debug action to emulate the behavior
of exceeding the query's memory limit.

Change-Id: If1e27a1517d09ccaabdae1492b7e1fbf661ae3e5
Reviewed-on: http://gerrit.cloudera.org:8080/3991
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2aa86309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2aa86309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2aa86309

Branch: refs/heads/master
Commit: 2aa86309d15cba8a965c83d81308e32df899ec8a
Parents: f613dcd
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Aug 15 00:19:54 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Aug 19 22:37:28 2016 +0000

----------------------------------------------------------------------
 be/src/exec/exec-node.cc              |  3 +++
 be/src/exec/hdfs-parquet-scanner.cc   | 28 +++++++++++++++-------------
 be/src/exec/hdfs-scan-node.cc         |  4 ++++
 be/src/exec/hdfs-scan-node.h          |  5 +++++
 be/src/exec/hdfs-scanner.cc           |  1 -
 be/src/exec/hdfs-scanner.h            |  5 +++++
 be/src/exec/parquet-column-readers.cc | 27 +++++++++++++++++++++++++++
 be/src/exec/parquet-column-readers.h  |  5 +++++
 common/thrift/PlanNodes.thrift        |  3 ++-
 tests/failure/test_failpoints.py      | 17 ++++++++++-------
 10 files changed, 76 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index aeceb6c..937c6f2 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -437,6 +437,9 @@ Status ExecNode::ExecDebugAction(TExecNodePhase::type phase, RuntimeState* state
         ErrorMsg(TErrorCode::INTERNAL_ERROR, "Debug Action: INJECT_ERROR_LOG"));
     return Status::OK();
   }
+  if (debug_action_ == TDebugAction::MEM_LIMIT_EXCEEDED) {
+    mem_tracker()->MemLimitExceeded(state, "Debug Action: MEM_LIMIT_EXCEEDED");
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 613761b..ee5f4d9 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -343,11 +343,11 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) {
   }
 
   while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) {
+    // Transfer resources and clear streams if there is any leftover from the previous
+    // row group. We will create new streams for the next row group.
+    FlushRowGroupResources(row_batch);
+    context_->ClearStreams();
     if (!advance_row_group_) {
-      // End of the previous row group. Transfer resources and clear streams because
-      // we will create new streams for the next row group.
-      FlushRowGroupResources(row_batch);
-      context_->ClearStreams();
       Status status =
           ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
       if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
@@ -501,17 +501,19 @@ Status HdfsParquetScanner::AssembleRows(
             scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
             scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
       }
-      if (UNLIKELY(!continue_execution)) {
-        *skip_row_group = true;
-        return Status::OK();
-      }
       // Check that all column readers populated the same number of values.
-      if (c != 0 && UNLIKELY(last_num_tuples != scratch_batch_->num_tuples)) {
-        parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' "
-            "had $2 remaining values but expected $3", filename(),
-            col_reader->schema_element().name, last_num_tuples,
-            scratch_batch_->num_tuples));
+      bool num_tuples_mismatch = c != 0 && last_num_tuples != scratch_batch_->num_tuples;
+      if (UNLIKELY(!continue_execution || num_tuples_mismatch)) {
+        // Skipping this row group. Free up all the resources with this row group.
+        scratch_batch_->mem_pool()->FreeAll();
+        scratch_batch_->num_tuples = 0;
         *skip_row_group = true;
+        if (num_tuples_mismatch) {
+          parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' "
+              "had $2 remaining values but expected $3", filename(),
+              col_reader->schema_element().name, last_num_tuples,
+              scratch_batch_->num_tuples));
+        }
         return Status::OK();
       }
       last_num_tuples = scratch_batch_->num_tuples;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 1421c0b..6077f99 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -451,6 +451,10 @@ void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
   scan_node_pool_->AcquireData(pool, false);
 }
 
+Status HdfsScanNode::TriggerDebugAction() {
+  return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_);
+}
+
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   runtime_state_ = state;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 05ee50e..ae38856 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -317,6 +317,7 @@ class HdfsScanNode : public ScanNode {
 
  protected:
   friend class ScannerContext;
+  friend class HdfsScanner;
 
   RuntimeState* runtime_state_;
 
@@ -571,6 +572,10 @@ class HdfsScanNode : public ScanNode {
   /// true if all filters arrived within the time limit (as measured from the time of
   /// RuntimeFilterBank::RegisterFilter()), false otherwise.
   bool WaitForRuntimeFilters(int32_t time_ms);
+
+  /// Calls ExecDebugAction(). Returns the status based on the debug action specified
+  /// for the query.
+  Status TriggerDebugAction();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 6b4373e..81cd1f0 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -669,4 +669,3 @@ void HdfsScanner::ReportColumnParseError(const SlotDescriptor* desc,
     if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = Status(ss.str());
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index a5e3b51..53711ab 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -378,6 +378,11 @@ class HdfsScanner {
   /// This is called from WriteAlignedTuples.
   bool ReportTupleParseError(FieldLocation* fields, uint8_t* errors, int row_idx);
 
+  /// Triggers debug action of the scan node. This is currently used by parquet column
+  /// readers to exercise various failure paths in parquet scanner. Returns the status
+  /// returned by the scan node's TriggerDebugAction().
+  Status TriggerDebugAction() { return scan_node_->TriggerDebugAction(); }
+
   /// Utility function to append an error message for an invalid row.  This is called
   /// from ReportTupleParseError()
   /// row_idx is the index of the row in the current batch.  Subclasses should override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index c7e3e17..48acb27 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -54,6 +54,15 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
 // Also, this limit is in place to prevent impala from reading corrupt parquet files.
 DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
 
+// Trigger debug action on every 128 tuples produced. This is useful in exercising the
+// failure or cancellation path.
+#ifndef NDEBUG
+#define DEBUG_ACTION_TRIGGER (127)
+#define SHOULD_TRIGGER_DEBUG_ACTION(x) ((x & DEBUG_ACTION_TRIGGER) == 0)
+#else
+#define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
+#endif
+
 namespace impala {
 
 const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate "
@@ -330,6 +339,9 @@ class ScalarColumnReader : public BaseScalarColumnReader {
       }
       val_count += ret_val_count;
       num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+      if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
+        continue_execution &= TriggerDebugAction();
+      }
     }
     *num_values = val_count;
     return continue_execution;
@@ -630,6 +642,15 @@ class BoolColumnReader : public BaseScalarColumnReader {
   BitReader bool_values_;
 };
 
+bool ParquetColumnReader::TriggerDebugAction() {
+  Status status = parent_->TriggerDebugAction();
+  if (!status.ok()) {
+    if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status);
+    return false;
+  }
+  return true;
+}
+
 bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
     int tuple_size, uint8_t* tuple_mem, int* num_values) {
   int val_count = 0;
@@ -645,6 +666,9 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
     if (pos_slot_desc_ != NULL) ReadPosition(tuple);
     continue_execution = ReadValue(pool, tuple);
     ++val_count;
+    if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
+      continue_execution &= TriggerDebugAction();
+    }
   }
   *num_values = val_count;
   return continue_execution;
@@ -658,6 +682,9 @@ bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
     Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size);
     continue_execution = ReadNonRepeatedValue(pool, tuple);
     ++val_count;
+    if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
+      continue_execution &= TriggerDebugAction();
+    }
   }
   *num_values = val_count;
   return continue_execution;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 8435e71..e1a3061 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -287,6 +287,11 @@ class ParquetColumnReader {
     // rep_level_ is always valid and equal to 0 if col not in collection.
     if (max_rep_level() == 0) rep_level_ = 0;
   }
+
+  /// Trigger debug action. Returns false if the debug action deems that the
+  /// parquet column reader should halt execution. In which case, 'parse_status_'
+  /// is also updated.
+  bool TriggerDebugAction();
 };
 
 /// Reader for a single column from the parquet file.  It's associated with a

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 43b64d2..3e31120 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -64,7 +64,8 @@ enum TExecNodePhase {
 enum TDebugAction {
   WAIT,
   FAIL,
-  INJECT_ERROR_LOG
+  INJECT_ERROR_LOG,
+  MEM_LIMIT_EXCEEDED,
 }
 
 // Preference for replica selection

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index d43bfd7..6eac663 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -29,20 +29,25 @@ from tests.common.skip import SkipIf, SkipIfS3, SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import TestDimension
 
-FAILPOINT_ACTION = ['FAIL', 'CANCEL']
+FAILPOINT_ACTION = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
 FAILPOINT_LOCATION = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE']
+# Map debug actions to their corresponding query options' values.
+FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
+                        'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'}
 
 # The goal of this query is to use all of the node types.
 # TODO: This query could be simplified a bit...
 QUERY = """
-select a.int_col, count(b.int_col) int_sum from functional_hbase.alltypesagg a
+select a.int_col, count(b.int_col) int_sum, count(l.l_shipdate)
+from functional_hbase.alltypesagg a, tpch_nested_parquet.customer c, c.c_orders.o_lineitems l
 join
   (select * from alltypes
    where year=2009 and month=1 order by int_col limit 2500
    union all
    select * from alltypes
    where year=2009 and month=2 limit 3000) b
-on (a.int_col = b.int_col)
+on (a.int_col = b.int_col) and (a.int_col = c.c_custkey)
+where c.c_mktsegment = 'BUILDING'
 group by a.int_col
 order by int_sum
 """
@@ -109,7 +114,6 @@ class TestFailpoints(ImpalaTestSuite):
         lambda v: (v.get_value('location') != 'PREPARE_SCANNER' or
             v.get_value('target_node')[0] == 'SCAN HDFS'))
 
-
   def test_failpoints(self, vector):
     query = QUERY
     node_type, node_ids = vector.get_value('target_node')
@@ -117,14 +121,13 @@ class TestFailpoints(ImpalaTestSuite):
     location = vector.get_value('location')
 
     for node_id in node_ids:
-      debug_action = '%d:%s:%s' % (node_id, location,
-                                   'WAIT' if action == 'CANCEL' else 'FAIL')
+      debug_action = '%d:%s:%s' % (node_id, location, FAILPOINT_ACTION_MAP[action])
       LOG.info('Current dubug action: SET DEBUG_ACTION=%s' % debug_action)
       vector.get_value('exec_option')['debug_action'] = debug_action
 
       if action == 'CANCEL':
         self.__execute_cancel_action(query, vector)
-      elif action == 'FAIL':
+      elif action == 'FAIL' or action == 'MEM_LIMIT_EXCEEDED':
         self.__execute_fail_action(query, vector)
       else:
         assert 0, 'Unknown action: %s' % action