You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/08/19 23:42:38 UTC

[1/2] incubator-impala git commit: Add functional and targeted perf tests for joins with empty builds

Repository: incubator-impala
Updated Branches:
  refs/heads/master 1bbd667fd -> 2aa86309d


Add functional and targeted perf tests for joins with empty builds

I wrote these tests for my IMPALA-3987 patch, but other issues block
that optimisations.  These tests exercise an interesting corner case
so I split them out into a separate patch.

The functional tests exercise every join mode for nested loop join and
hash join with an empty build side. The perf test exercises hash join
with an empty build side.

Testing:
Made sure the tests passed with both partitioned and non-partitioned
hash join implementations. Ran the targeted perf query through the
single node perf run script to make sure it worked.

Change-Id: I0a68cafec32011a47c569b254979601237e7f2a5
Reviewed-on: http://gerrit.cloudera.org:8080/4051
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f613dcd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f613dcd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f613dcd0

Branch: refs/heads/master
Commit: f613dcd02da989204026999b26247a024fb199ab
Parents: 1bbd667
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Aug 17 00:35:14 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Aug 19 06:04:18 2016 +0000

----------------------------------------------------------------------
 .../queries/QueryTest/empty-build-joins.test    | 192 +++++++++++++++++++
 .../queries/QueryTest/single-node-nlj.test      |  52 +++++
 .../queries/primitive_empty_build_join_1.test   |  13 ++
 tests/query_test/test_join_queries.py           |   5 +
 4 files changed, 262 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f613dcd0/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test b/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test
new file mode 100644
index 0000000..3aa9994
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test
@@ -0,0 +1,192 @@
+====
+---- QUERY
+# Inner equi-join - executes with hash join.
+select straight_join at.id
+from alltypes at
+  inner join functional.alltypestiny att on at.id = att.id
+where att.int_col = 999
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Right equi-join - executes with hash join.
+select straight_join at.id
+from alltypes at
+  right join functional.alltypestiny att on at.id = att.id
+where att.int_col = 999
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Left equi-join - executes with hash join.
+select straight_join at.id
+from alltypes at
+  left join (
+    select * from functional.alltypestiny where int_col = 999) att on at.id = att.id
+order by at.id desc
+limit 5
+---- RESULTS
+7299
+7298
+7297
+7296
+7295
+---- TYPES
+INT
+====
+---- QUERY
+# Full outer equi-join - executes with hash join.
+select straight_join at.id
+from alltypes at
+  full outer join (
+    select * from functional.alltypestiny where int_col = 999) att on at.id = att.id
+order by at.id desc
+limit 5
+---- RESULTS
+7299
+7298
+7297
+7296
+7295
+---- TYPES
+INT
+====
+---- QUERY
+# Left semi equi-join - executes with hash join.
+select straight_join at.id
+from alltypes at
+where id in (
+  select id from functional.alltypestiny
+  where id = 999)
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Right semi equi-join - executes with hash join.
+select straight_join at.id
+from (select * from functional.alltypestiny att where int_col = 999) att
+  right semi join alltypes at on at.id = att.id
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Left NAAJ equi-join - executes with hash join.
+select straight_join at.id
+from alltypes at
+where id not in (
+  select id from functional.alltypestiny
+  where id = 999)
+order by id desc
+limit 5
+---- RESULTS
+7299
+7298
+7297
+7296
+7295
+---- TYPES
+INT
+====
+---- QUERY
+# Left anti equi-join - executes with hash join.
+select straight_join at.id
+from alltypes at
+where not exists (
+  select id from functional.alltypestiny att
+    where id = 999 and att.id = at.id)
+order by id desc
+limit 5
+---- RESULTS
+7299
+7298
+7297
+7296
+7295
+---- TYPES
+INT
+====
+---- QUERY
+# Right anti equi-join - executes with hash join.
+select straight_join at.id
+from (select * from functional.alltypestiny att where int_col = 999) att
+  right anti join alltypes at on at.id = att.id
+order by at.id desc
+limit 5
+---- RESULTS
+7299
+7298
+7297
+7296
+7295
+---- TYPES
+INT
+====
+---- QUERY
+# Inner non-equi-join - executes with nested loop join.
+select straight_join at.id
+from alltypes at
+  inner join functional.alltypestiny att on at.id < att.id
+where att.int_col = 999
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Cross join - executes with nested loop join.
+select straight_join at.id
+from alltypes at, functional.alltypestiny att
+where att.int_col = 999
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Left non-equi-join - executes with nested loop join.
+select straight_join at.id
+from alltypes at
+  left join (
+    select * from functional.alltypestiny where int_col = 999) att on at.id < att.id
+order by at.id desc
+limit 5
+---- RESULTS
+7299
+7298
+7297
+7296
+7295
+---- TYPES
+INT
+====
+---- QUERY
+# Left semi non-equi-join - executes with nested loop join.
+select straight_join at.id
+from alltypes at
+   left semi join (
+     select * from functional.alltypestiny att where int_col = 999) att on at.id < att.id
+order by at.id desc
+limit 5
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Left anti non-equi-join - executes with nested loop join.
+select straight_join at.id
+from alltypes at left anti join (
+  select * from functional.alltypestiny att
+  where id = 999) att on at.id < att.id
+order by id desc
+limit 5
+---- RESULTS
+7299
+7298
+7297
+7296
+7295
+---- TYPES
+INT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f613dcd0/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
index bb22e16..49cdf9d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test
@@ -160,3 +160,55 @@ left join functional.alltypes a2 on a2.tinyint_col >= 1
 ---- TYPES
 BIGINT
 ====
+---- QUERY
+# Right non-equi-join with empty build.
+select straight_join at.id
+from alltypes at
+  right join functional.alltypestiny att on at.id < att.id
+where att.int_col = 999
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Full outer non-equi-join with empty build.
+select straight_join at.id
+from alltypes at
+  full outer join (
+    select * from functional.alltypestiny where int_col = 999) att on at.id < att.id
+order by at.id desc
+limit 5
+---- RESULTS
+7299
+7298
+7297
+7296
+7295
+---- TYPES
+INT
+====
+---- QUERY
+# Right semi non-equi-join with empty build.
+select straight_join at.id
+from (select * from functional.alltypestiny att where int_col = 999) att
+  right semi join alltypes at on at.id < att.id
+---- RESULTS
+---- TYPES
+INT
+====
+---- QUERY
+# Right anti non-equi-join with empty build.
+select straight_join at.id
+from (select * from functional.alltypestiny att where int_col = 999) att
+  right anti join alltypes at on at.id < att.id
+order by at.id desc
+limit 5
+---- RESULTS
+7299
+7298
+7297
+7296
+7295
+---- TYPES
+INT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f613dcd0/testdata/workloads/targeted-perf/queries/primitive_empty_build_join_1.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/targeted-perf/queries/primitive_empty_build_join_1.test b/testdata/workloads/targeted-perf/queries/primitive_empty_build_join_1.test
new file mode 100644
index 0000000..ef0a47f
--- /dev/null
+++ b/testdata/workloads/targeted-perf/queries/primitive_empty_build_join_1.test
@@ -0,0 +1,13 @@
+====
+---- QUERY: primitive_empty_build_join_1
+-- Description : Join with empty build side and large probe side.
+-- Target test case : Analytic query with selective filters where evaluation of
+--   the join can be short-circuited for a dramatic speedup.
+SELECT /* +straight_join */ *
+FROM lineitem
+INNER JOIN orders ON l_orderkey = o_orderkey
+WHERE o_comment = 'no matching comments'
+---- RESULTS
+---- TYPES
+====
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f613dcd0/tests/query_test/test_join_queries.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_join_queries.py b/tests/query_test/test_join_queries.py
index 3acd97e..14965d8 100644
--- a/tests/query_test/test_join_queries.py
+++ b/tests/query_test/test_join_queries.py
@@ -86,6 +86,11 @@ class TestJoinQueries(ImpalaTestSuite):
     new_vector.get_value('exec_option')['num_nodes'] = 1
     self.run_test_case('QueryTest/single-node-nlj-exhaustive', new_vector)
 
+  def test_empty_build_joins(self, vector):
+    new_vector = copy(vector)
+    new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size')
+    self.run_test_case('QueryTest/empty-build-joins', new_vector)
+
 class TestTPCHJoinQueries(ImpalaTestSuite):
   # Uses the TPC-H dataset in order to have larger joins. Needed for example to test
   # the repartitioning codepaths.


[2/2] incubator-impala git commit: IMPALA-3962: Clean up scratch tuple batch on scan failures

Posted by kw...@apache.org.
IMPALA-3962: Clean up scratch tuple batch on scan failures

The parquet scanner doesn't clean up 'scratch_batch_' properly
which causes it to process a partially filled scratch_batch_
if any of the column reader fails. This change cleans up the
scratch batch if any of the parquet column readers fails.
The clean up involves freeing the mem_pool of scratch_batch_
and setting number of tuples in scratch_batch_ to 0.

This change also extends debug action to emulate the behavior
of exceeding the query's memory limit.

Change-Id: If1e27a1517d09ccaabdae1492b7e1fbf661ae3e5
Reviewed-on: http://gerrit.cloudera.org:8080/3991
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2aa86309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2aa86309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2aa86309

Branch: refs/heads/master
Commit: 2aa86309d15cba8a965c83d81308e32df899ec8a
Parents: f613dcd
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Aug 15 00:19:54 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Aug 19 22:37:28 2016 +0000

----------------------------------------------------------------------
 be/src/exec/exec-node.cc              |  3 +++
 be/src/exec/hdfs-parquet-scanner.cc   | 28 +++++++++++++++-------------
 be/src/exec/hdfs-scan-node.cc         |  4 ++++
 be/src/exec/hdfs-scan-node.h          |  5 +++++
 be/src/exec/hdfs-scanner.cc           |  1 -
 be/src/exec/hdfs-scanner.h            |  5 +++++
 be/src/exec/parquet-column-readers.cc | 27 +++++++++++++++++++++++++++
 be/src/exec/parquet-column-readers.h  |  5 +++++
 common/thrift/PlanNodes.thrift        |  3 ++-
 tests/failure/test_failpoints.py      | 17 ++++++++++-------
 10 files changed, 76 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index aeceb6c..937c6f2 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -437,6 +437,9 @@ Status ExecNode::ExecDebugAction(TExecNodePhase::type phase, RuntimeState* state
         ErrorMsg(TErrorCode::INTERNAL_ERROR, "Debug Action: INJECT_ERROR_LOG"));
     return Status::OK();
   }
+  if (debug_action_ == TDebugAction::MEM_LIMIT_EXCEEDED) {
+    mem_tracker()->MemLimitExceeded(state, "Debug Action: MEM_LIMIT_EXCEEDED");
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 613761b..ee5f4d9 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -343,11 +343,11 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) {
   }
 
   while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) {
+    // Transfer resources and clear streams if there is any leftover from the previous
+    // row group. We will create new streams for the next row group.
+    FlushRowGroupResources(row_batch);
+    context_->ClearStreams();
     if (!advance_row_group_) {
-      // End of the previous row group. Transfer resources and clear streams because
-      // we will create new streams for the next row group.
-      FlushRowGroupResources(row_batch);
-      context_->ClearStreams();
       Status status =
           ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
       if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
@@ -501,17 +501,19 @@ Status HdfsParquetScanner::AssembleRows(
             scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
             scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
       }
-      if (UNLIKELY(!continue_execution)) {
-        *skip_row_group = true;
-        return Status::OK();
-      }
       // Check that all column readers populated the same number of values.
-      if (c != 0 && UNLIKELY(last_num_tuples != scratch_batch_->num_tuples)) {
-        parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' "
-            "had $2 remaining values but expected $3", filename(),
-            col_reader->schema_element().name, last_num_tuples,
-            scratch_batch_->num_tuples));
+      bool num_tuples_mismatch = c != 0 && last_num_tuples != scratch_batch_->num_tuples;
+      if (UNLIKELY(!continue_execution || num_tuples_mismatch)) {
+        // Skipping this row group. Free up all the resources with this row group.
+        scratch_batch_->mem_pool()->FreeAll();
+        scratch_batch_->num_tuples = 0;
         *skip_row_group = true;
+        if (num_tuples_mismatch) {
+          parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' "
+              "had $2 remaining values but expected $3", filename(),
+              col_reader->schema_element().name, last_num_tuples,
+              scratch_batch_->num_tuples));
+        }
         return Status::OK();
       }
       last_num_tuples = scratch_batch_->num_tuples;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 1421c0b..6077f99 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -451,6 +451,10 @@ void HdfsScanNode::TransferToScanNodePool(MemPool* pool) {
   scan_node_pool_->AcquireData(pool, false);
 }
 
+Status HdfsScanNode::TriggerDebugAction() {
+  return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_);
+}
+
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   runtime_state_ = state;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 05ee50e..ae38856 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -317,6 +317,7 @@ class HdfsScanNode : public ScanNode {
 
  protected:
   friend class ScannerContext;
+  friend class HdfsScanner;
 
   RuntimeState* runtime_state_;
 
@@ -571,6 +572,10 @@ class HdfsScanNode : public ScanNode {
   /// true if all filters arrived within the time limit (as measured from the time of
   /// RuntimeFilterBank::RegisterFilter()), false otherwise.
   bool WaitForRuntimeFilters(int32_t time_ms);
+
+  /// Calls ExecDebugAction(). Returns the status based on the debug action specified
+  /// for the query.
+  Status TriggerDebugAction();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 6b4373e..81cd1f0 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -669,4 +669,3 @@ void HdfsScanner::ReportColumnParseError(const SlotDescriptor* desc,
     if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = Status(ss.str());
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index a5e3b51..53711ab 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -378,6 +378,11 @@ class HdfsScanner {
   /// This is called from WriteAlignedTuples.
   bool ReportTupleParseError(FieldLocation* fields, uint8_t* errors, int row_idx);
 
+  /// Triggers debug action of the scan node. This is currently used by parquet column
+  /// readers to exercise various failure paths in parquet scanner. Returns the status
+  /// returned by the scan node's TriggerDebugAction().
+  Status TriggerDebugAction() { return scan_node_->TriggerDebugAction(); }
+
   /// Utility function to append an error message for an invalid row.  This is called
   /// from ReportTupleParseError()
   /// row_idx is the index of the row in the current batch.  Subclasses should override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index c7e3e17..48acb27 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -54,6 +54,15 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
 // Also, this limit is in place to prevent impala from reading corrupt parquet files.
 DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes");
 
+// Trigger debug action on every 128 tuples produced. This is useful in exercising the
+// failure or cancellation path.
+#ifndef NDEBUG
+#define DEBUG_ACTION_TRIGGER (127)
+#define SHOULD_TRIGGER_DEBUG_ACTION(x) ((x & DEBUG_ACTION_TRIGGER) == 0)
+#else
+#define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
+#endif
+
 namespace impala {
 
 const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate "
@@ -330,6 +339,9 @@ class ScalarColumnReader : public BaseScalarColumnReader {
       }
       val_count += ret_val_count;
       num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
+      if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
+        continue_execution &= TriggerDebugAction();
+      }
     }
     *num_values = val_count;
     return continue_execution;
@@ -630,6 +642,15 @@ class BoolColumnReader : public BaseScalarColumnReader {
   BitReader bool_values_;
 };
 
+bool ParquetColumnReader::TriggerDebugAction() {
+  Status status = parent_->TriggerDebugAction();
+  if (!status.ok()) {
+    if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status);
+    return false;
+  }
+  return true;
+}
+
 bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
     int tuple_size, uint8_t* tuple_mem, int* num_values) {
   int val_count = 0;
@@ -645,6 +666,9 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
     if (pos_slot_desc_ != NULL) ReadPosition(tuple);
     continue_execution = ReadValue(pool, tuple);
     ++val_count;
+    if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
+      continue_execution &= TriggerDebugAction();
+    }
   }
   *num_values = val_count;
   return continue_execution;
@@ -658,6 +682,9 @@ bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
     Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size);
     continue_execution = ReadNonRepeatedValue(pool, tuple);
     ++val_count;
+    if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
+      continue_execution &= TriggerDebugAction();
+    }
   }
   *num_values = val_count;
   return continue_execution;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 8435e71..e1a3061 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -287,6 +287,11 @@ class ParquetColumnReader {
     // rep_level_ is always valid and equal to 0 if col not in collection.
     if (max_rep_level() == 0) rep_level_ = 0;
   }
+
+  /// Trigger debug action. Returns false if the debug action deems that the
+  /// parquet column reader should halt execution. In which case, 'parse_status_'
+  /// is also updated.
+  bool TriggerDebugAction();
 };
 
 /// Reader for a single column from the parquet file.  It's associated with a

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 43b64d2..3e31120 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -64,7 +64,8 @@ enum TExecNodePhase {
 enum TDebugAction {
   WAIT,
   FAIL,
-  INJECT_ERROR_LOG
+  INJECT_ERROR_LOG,
+  MEM_LIMIT_EXCEEDED,
 }
 
 // Preference for replica selection

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index d43bfd7..6eac663 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -29,20 +29,25 @@ from tests.common.skip import SkipIf, SkipIfS3, SkipIfIsilon, SkipIfLocal
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import TestDimension
 
-FAILPOINT_ACTION = ['FAIL', 'CANCEL']
+FAILPOINT_ACTION = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED']
 FAILPOINT_LOCATION = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE']
+# Map debug actions to their corresponding query options' values.
+FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT',
+                        'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'}
 
 # The goal of this query is to use all of the node types.
 # TODO: This query could be simplified a bit...
 QUERY = """
-select a.int_col, count(b.int_col) int_sum from functional_hbase.alltypesagg a
+select a.int_col, count(b.int_col) int_sum, count(l.l_shipdate)
+from functional_hbase.alltypesagg a, tpch_nested_parquet.customer c, c.c_orders.o_lineitems l
 join
   (select * from alltypes
    where year=2009 and month=1 order by int_col limit 2500
    union all
    select * from alltypes
    where year=2009 and month=2 limit 3000) b
-on (a.int_col = b.int_col)
+on (a.int_col = b.int_col) and (a.int_col = c.c_custkey)
+where c.c_mktsegment = 'BUILDING'
 group by a.int_col
 order by int_sum
 """
@@ -109,7 +114,6 @@ class TestFailpoints(ImpalaTestSuite):
         lambda v: (v.get_value('location') != 'PREPARE_SCANNER' or
             v.get_value('target_node')[0] == 'SCAN HDFS'))
 
-
   def test_failpoints(self, vector):
     query = QUERY
     node_type, node_ids = vector.get_value('target_node')
@@ -117,14 +121,13 @@ class TestFailpoints(ImpalaTestSuite):
     location = vector.get_value('location')
 
     for node_id in node_ids:
-      debug_action = '%d:%s:%s' % (node_id, location,
-                                   'WAIT' if action == 'CANCEL' else 'FAIL')
+      debug_action = '%d:%s:%s' % (node_id, location, FAILPOINT_ACTION_MAP[action])
       LOG.info('Current dubug action: SET DEBUG_ACTION=%s' % debug_action)
       vector.get_value('exec_option')['debug_action'] = debug_action
 
       if action == 'CANCEL':
         self.__execute_cancel_action(query, vector)
-      elif action == 'FAIL':
+      elif action == 'FAIL' or action == 'MEM_LIMIT_EXCEEDED':
         self.__execute_fail_action(query, vector)
       else:
         assert 0, 'Unknown action: %s' % action