You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/13 06:09:39 UTC

[08/10] incubator-impala git commit: IMPALA-3311: fix string data coming out of aggs in subplans

IMPALA-3311: fix string data coming out of aggs in subplans

The problem: varlen data (e.g. strings) produced by aggregations is
freed by FreeLocalAllocations() after passing up the output
batch. This works for streaming operators or blocking operators that
copy their input, but results in memory corruption when the output
reaches non-copying blocking operators, e.g. SubplanNode and
NestedLoopJoinNode.

The fix: this patch makes the PartitionedAggregationNode copy out
produced string data if the node is in a subplan. Otherwise it calls
MarkNeedsToReturn() on the output batch. Marking the batch would work
in the subplan case as well, but would likely be less efficient since
it would result in many small batches coming out of the subplan.

The patch includes a test case. However, this test only exposes the
problem with an ASAN build and the --disable_mem_pools flag, which we
don't currently have automated testing for.

Change-Id: Iada891504c261ba54f4eb8c9d7e4e5223668d7b9
Reviewed-on: http://gerrit.cloudera.org:8080/2929
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7767d300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7767d300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7767d300

Branch: refs/heads/master
Commit: 7767d300a3f018c8c8b32fa72abe5c126900a2be
Parents: cb37774
Author: Skye Wanderman-Milne <sk...@cloudera.com>
Authored: Thu May 12 17:03:12 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu May 12 23:06:36 2016 -0700

----------------------------------------------------------------------
 be/src/exec/partitioned-aggregation-node.cc     | 55 ++++++++++++++++++++
 be/src/exec/partitioned-aggregation-node.h      | 14 +++++
 be/src/exprs/agg-fn-evaluator.h                 |  1 +
 .../queries/QueryTest/nested-types-runtime.test | 16 ++++++
 .../queries/subplan_aggregation.test            | 11 ++++
 5 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 0bf51a9..b7dca61 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -360,6 +360,61 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) {
 
 Status PartitionedAggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch,
     bool* eos) {
+  int first_row_idx = row_batch->num_rows();
+  RETURN_IF_ERROR(GetNextInternal(state, row_batch, eos));
+  RETURN_IF_ERROR(HandleOutputStrings(row_batch, first_row_idx));
+  return Status::OK();
+}
+
+Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
+    int first_row_idx) {
+  if (!needs_finalize_ && !needs_serialize_) return Status::OK();
+  // String data returned by Serialize() or Finalize() is from local expr allocations in
+  // the agg function contexts, and will be freed on the next GetNext() call by
+  // FreeLocalAllocations(). The data either needs to be copied out or sent up the plan
+  // tree via MarkNeedToReturn(). (See IMPALA-3311)
+  for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
+    const SlotDescriptor* slot_desc = aggregate_evaluators_[i]->output_slot_desc();
+    DCHECK(!slot_desc->type().IsCollectionType()) << "producing collections NYI";
+    if (!slot_desc->type().IsVarLenStringType()) continue;
+    if (IsInSubplan()) {
+      // Copy string data to the row batch's pool. This is more efficient than
+      // MarkNeedToReturn() in a subplan since we are likely producing many small batches.
+      RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch, first_row_idx,
+              row_batch->tuple_data_pool()));
+    } else {
+      row_batch->MarkNeedToReturn();
+      break;
+    }
+  }
+  return Status::OK();
+}
+
+Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor* slot_desc,
+    RowBatch* row_batch, int first_row_idx, MemPool* pool) {
+  DCHECK(slot_desc->type().IsVarLenStringType());
+  DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
+  FOREACH_ROW(row_batch, first_row_idx, batch_iter) {
+    Tuple* tuple = batch_iter.Get()->GetTuple(0);
+    StringValue* sv = reinterpret_cast<StringValue*>(
+        tuple->GetSlot(slot_desc->tuple_offset()));
+    if (sv == NULL || sv->len == 0) continue;
+    char* new_ptr = reinterpret_cast<char*>(pool->TryAllocate(sv->len));
+    if (new_ptr == NULL) {
+      Status s = Status::MemLimitExceeded();
+      s.AddDetail(Substitute("Cannot perform aggregation at node with id $0."
+              " Failed to allocate $1 output bytes.", id_, sv->len));
+      state_->SetMemLimitExceeded();
+      return s;
+    }
+    memcpy(new_ptr, sv->ptr, sv->len);
+    sv->ptr = new_ptr;
+  }
+  return Status::OK();
+}
+
+Status PartitionedAggregationNode::GetNextInternal(RuntimeState* state,
+    RowBatch* row_batch, bool* eos) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
   RETURN_IF_CANCELLED(state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 0b94511..ab560c5 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -390,6 +390,20 @@ class PartitionedAggregationNode : public ExecNode {
   /// a temporary buffer.
   boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
 
+  /// Materializes 'row_batch' in either grouping or non-grouping case.
+  Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
+
+  /// Helper function called by GetNextInternal() to ensure that string data referenced in
+  /// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' indexes the
+  /// first row that should be processed in 'row_batch'.
+  Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx);
+
+  /// Copies string data from the specified slot into 'pool', and sets the StringValues'
+  /// ptrs to the copied data. Copies data from all tuples in 'row_batch' from
+  /// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type.
+  Status CopyStringData(const SlotDescriptor* slot_desc, RowBatch* row_batch,
+      int first_row_idx, MemPool* pool);
+
   /// Constructs singleton output tuple, allocating memory from pool.
   Tuple* ConstructSingletonOutputTuple(
       const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* pool);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index 98fb4a1..e9598ea 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -118,6 +118,7 @@ class AggFnEvaluator {
   const std::string& fn_name() const { return fn_.name.function_name; }
   const std::string& update_symbol() const { return fn_.aggregate_fn.update_fn_symbol; }
   const std::string& merge_symbol() const { return fn_.aggregate_fn.merge_fn_symbol; }
+  const SlotDescriptor* output_slot_desc() const { return output_slot_desc_; }
 
   static std::string DebugString(const std::vector<AggFnEvaluator*>& exprs);
   std::string DebugString() const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test
index 2e38d1d..35e27c5 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-runtime.test
@@ -450,3 +450,19 @@ inner join t2.int_array a
 ---- TYPES
 bigint
 ====
+---- QUERY
+# IMPALA-3311: test string data coming out of an agg in a subplan
+select id, m from complextypestbl t,
+(select min(cast(item as string)) m from t.int_array) v
+---- RESULTS
+1,'1'
+2,'1'
+3,'NULL'
+4,'NULL'
+5,'NULL'
+6,'NULL'
+7,'NULL'
+8,'-1'
+---- TYPES
+BIGINT,STRING
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7767d300/testdata/workloads/perf-regression/queries/subplan_aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/perf-regression/queries/subplan_aggregation.test b/testdata/workloads/perf-regression/queries/subplan_aggregation.test
new file mode 100644
index 0000000..b9ea894
--- /dev/null
+++ b/testdata/workloads/perf-regression/queries/subplan_aggregation.test
@@ -0,0 +1,11 @@
+====
+---- QUERY: subplan_aggregation
+-- Description: Agg in subplan produces string output that's fed to non-trivial parent
+-- plan
+-- Target test case: Regression test for IMPALA-3311
+select c_custkey, max(m) from customer c,
+(select max(o_orderstatus) m from c.c_orders) v
+group by c_custkey order by 1 limit 1
+---- RESULTS
+---- TYPES
+====