You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2016/10/23 00:31:20 UTC

[1/2] incubator-impala git commit: IMPALA-4120: Incorrect results with LEAD() analytic function

Repository: incubator-impala
Updated Branches:
  refs/heads/master 48085274f -> ff6b450ad


IMPALA-4120: Incorrect results with LEAD() analytic function

This change fixes a memory management problem with LEAD()/LAG()
analytic functions which led to incorrect result. In particular,
the update functions specified for these analytic functions only
make a shallow copy of StringVal (i.e. copying only the pointer
and the length of the string) without copying the string itself.
This may lead to problem if the string is created from some UDFs
which do local allocations whose buffer may be freed and reused
before the result tuple is copied out. This change fixes the problem
above by allocating a buffer at the Init() functions of these
analytic functions to track the intermediate value. In addition,
when the value is copied out in GetValue(), it will be copied into
the MemPool belonging to the AnalyticEvalNode and attached to the
outgoing row batches. This change also fixes a missing free of
local allocations in QueryMaintenance().

Change-Id: I85bb1745232d8dd383a6047c86019c6378ab571f
Reviewed-on: http://gerrit.cloudera.org:8080/4740
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/51268c05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/51268c05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/51268c05

Branch: refs/heads/master
Commit: 51268c053ffe41dc1aa9f1b250878113d4225258
Parents: 4808527
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Sep 26 16:04:27 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Oct 22 07:39:37 2016 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc               | 88 +++++++++++++-------
 be/src/exec/analytic-eval-node.h                | 28 ++++---
 be/src/exprs/agg-fn-evaluator.h                 | 27 +++---
 be/src/exprs/aggregate-functions-ir.cc          | 11 ++-
 be/src/udf/udf.cc                               |  3 +-
 .../org/apache/impala/catalog/BuiltinsDb.java   |  4 +-
 .../queries/QueryTest/analytic-fns.test         | 14 ++++
 7 files changed, 120 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 21a0805..06b0467 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -359,7 +359,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
     }
   }
 
-  Status status = Status::OK();
+  Status status;
   // Buffer the entire input row to be returned later with the analytic eval results.
   if (UNLIKELY(!input_stream_->AddRow(row, &status))) {
     // AddRow returns false if an error occurs (available via status()) or there is
@@ -377,45 +377,62 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
     }
   }
   DCHECK(status.ok());
-  return status;
+  return Status::OK();
 }
 
-void AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
+Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
   VLOG_ROW << id() << " AddResultTuple idx=" << stream_idx;
   DCHECK(curr_tuple_ != NULL);
-  Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(),
-      curr_tuple_pool_.get());
+  MemPool* cur_tuple_pool = curr_tuple_pool_.get();
+  Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), cur_tuple_pool);
 
   AggFnEvaluator::GetValue(evaluators_, fn_ctxs_, curr_tuple_, result_tuple);
+  // Copy any string data in 'result_tuple' into 'cur_tuple_pool_'.
+  for (const SlotDescriptor* slot_desc : result_tuple_desc_->slots()) {
+    if (!slot_desc->type().IsVarLenStringType()) continue;
+    StringValue* sv = reinterpret_cast<StringValue*>(
+        result_tuple->GetSlot(slot_desc->tuple_offset()));
+    if (sv == NULL || sv->len == 0) continue;
+    char* new_ptr = reinterpret_cast<char*>(cur_tuple_pool->TryAllocate(sv->len));
+    if (UNLIKELY(new_ptr == NULL)) {
+      return cur_tuple_pool->mem_tracker()->MemLimitExceeded(NULL,
+          "Failed to allocate memory for analytic function's result.", sv->len);
+    }
+    memcpy(new_ptr, sv->ptr, sv->len);
+    sv->ptr = new_ptr;
+  }
+
   DCHECK_GT(stream_idx, last_result_idx_);
   result_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, result_tuple));
   last_result_idx_ = stream_idx;
   VLOG_ROW << id() << " Added result tuple, final state: " << DebugStateString(true);
+  return Status::OK();
 }
 
-inline void AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition,
+inline Status AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition,
     int64_t stream_idx, TupleRow* row) {
   // The analytic fns are finalized after the previous row if we found a new partition
   // or the window is a RANGE and the order by exprs changed. For ROWS windows we do not
   // need to compare the current row to the previous row.
   VLOG_ROW << id() << " TryAddResultTupleForPrevRow partition=" << next_partition
            << " idx=" << stream_idx;
-  if (fn_scope_ == ROWS) return;
-  if (next_partition || (fn_scope_ == RANGE && window_.__isset.window_end &&
-      !PrevRowCompare(order_by_eq_expr_ctx_))) {
-    AddResultTuple(stream_idx - 1);
+  if (fn_scope_ != ROWS && (next_partition || (fn_scope_ == RANGE &&
+      window_.__isset.window_end && !PrevRowCompare(order_by_eq_expr_ctx_)))) {
+    RETURN_IF_ERROR(AddResultTuple(stream_idx - 1));
   }
+  return Status::OK();
 }
 
-inline void AnalyticEvalNode::TryAddResultTupleForCurrRow(int64_t stream_idx,
+inline Status AnalyticEvalNode::TryAddResultTupleForCurrRow(int64_t stream_idx,
     TupleRow* row) {
   VLOG_ROW << id() << " TryAddResultTupleForCurrRow idx=" << stream_idx;
   // We only add results at this point for ROWS windows (unless unbounded following)
-  if (fn_scope_ != ROWS || !window_.__isset.window_end) return;
-
   // Nothing to add if the end offset is before the start of the partition.
-  if (stream_idx - rows_end_offset_ < curr_partition_idx_) return;
-  AddResultTuple(stream_idx - rows_end_offset_);
+  if (fn_scope_ == ROWS && window_.__isset.window_end &&
+      stream_idx - rows_end_offset_ >= curr_partition_idx_) {
+    RETURN_IF_ERROR(AddResultTuple(stream_idx - rows_end_offset_));
+  }
+  return Status::OK();
 }
 
 inline void AnalyticEvalNode::TryRemoveRowsBeforeWindow(int64_t stream_idx) {
@@ -435,21 +452,27 @@ inline void AnalyticEvalNode::TryRemoveRowsBeforeWindow(int64_t stream_idx) {
   window_tuples_.pop_front();
 }
 
-inline void AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
+inline Status AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
     int64_t prev_partition_idx) {
   DCHECK_LT(prev_partition_idx, partition_idx);
   // For PARTITION, RANGE, or ROWS with UNBOUNDED PRECEDING: add a result tuple for the
   // remaining rows in the partition that do not have an associated result tuple yet.
   if (fn_scope_ != ROWS || !window_.__isset.window_end) {
-    if (last_result_idx_ < partition_idx - 1) AddResultTuple(partition_idx - 1);
-    return;
+    if (last_result_idx_ < partition_idx - 1) {
+      RETURN_IF_ERROR(AddResultTuple(partition_idx - 1));
+    }
+    return Status::OK();
   }
 
   // lead() is re-written to a ROWS window with an end bound FOLLOWING. Any remaining
   // results need the default value (set by Init()). If this is the case, the start bound
   // is UNBOUNDED PRECEDING (DCHECK in Init()).
   for (int i = 0; i < evaluators_.size(); ++i) {
-    if (is_lead_fn_[i]) evaluators_[i]->Init(fn_ctxs_[i], curr_tuple_);
+    if (is_lead_fn_[i]) {
+      // Needs to call Finalize() to release resources.
+      evaluators_[i]->Finalize(fn_ctxs_[i], curr_tuple_, dummy_result_tuple_);
+      evaluators_[i]->Init(fn_ctxs_[i], curr_tuple_);
+    }
   }
 
   // If the start bound is not UNBOUNDED PRECEDING and there are still rows in the
@@ -470,13 +493,16 @@ inline void AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
       AggFnEvaluator::Remove(evaluators_, fn_ctxs_, remove_row, curr_tuple_);
       window_tuples_.pop_front();
     }
-    AddResultTuple(last_result_idx_ + 1);
+    RETURN_IF_ERROR(AddResultTuple(last_result_idx_ + 1));
   }
 
   // If there are still rows between the row with the last result (AddResultTuple() may
   // have updated last_result_idx_) and the partition boundary, add the current results
   // for the remaining rows with the same result tuple (curr_tuple_ is not modified).
-  if (last_result_idx_ < partition_idx - 1) AddResultTuple(partition_idx - 1);
+  if (last_result_idx_ < partition_idx - 1) {
+    RETURN_IF_ERROR(AddResultTuple(partition_idx - 1));
+  }
+  return Status::OK();
 }
 
 inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
@@ -523,7 +549,7 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
 
   if (fn_scope_ == ROWS && stream_idx > 0 && (!window_.__isset.window_end ||
         window_.window_end.type == TAnalyticWindowBoundaryType::FOLLOWING)) {
-    TryAddRemainingResults(stream_idx, prev_partition_stream_idx);
+    RETURN_IF_ERROR(TryAddRemainingResults(stream_idx, prev_partition_stream_idx));
   }
   window_tuples_.clear();
 
@@ -557,10 +583,10 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
       // 2) Insert the initial result tuple at first_val_null_offset_. This happens when
       //    the end bound was actually Y PRECEDING.
       if (first_val_null_offset_ != -1) {
-        AddResultTuple(curr_partition_idx_ + first_val_null_offset_ - 1);
+        RETURN_IF_ERROR(AddResultTuple(curr_partition_idx_ + first_val_null_offset_ - 1));
       }
     } else {
-      AddResultTuple(curr_partition_idx_ - rows_end_offset_ - 1);
+      RETURN_IF_ERROR(AddResultTuple(curr_partition_idx_ - rows_end_offset_ - 1));
     }
   }
   return Status::OK();
@@ -614,7 +640,7 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
   if (UNLIKELY(stream_idx == 0 && curr_child_batch_->num_rows() > 0)) {
     TupleRow* row = curr_child_batch_->GetRow(0);
     RETURN_IF_ERROR(AddRow(0, row));
-    TryAddResultTupleForCurrRow(0, row);
+    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(0, row));
     prev_input_row_ = row;
     ++batch_idx;
     ++stream_idx;
@@ -647,19 +673,19 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
       // partition_by_eq_expr_ctx_ checks equality over the predicate exprs
       next_partition = !PrevRowCompare(partition_by_eq_expr_ctx_);
     }
-    TryAddResultTupleForPrevRow(next_partition, stream_idx, row);
+    RETURN_IF_ERROR(TryAddResultTupleForPrevRow(next_partition, stream_idx, row));
     if (next_partition) RETURN_IF_ERROR(InitNextPartition(state, stream_idx));
 
     // The evaluators_ are updated with the current row.
     RETURN_IF_ERROR(AddRow(stream_idx, row));
 
-    TryAddResultTupleForCurrRow(stream_idx, row);
+    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(stream_idx, row));
     prev_input_row_ = row;
   }
 
   if (UNLIKELY(input_eos_ && stream_idx > curr_partition_idx_)) {
     // We need to add the results for the last row(s).
-    TryAddRemainingResults(stream_idx, curr_partition_idx_);
+    RETURN_IF_ERROR(TryAddRemainingResults(stream_idx, curr_partition_idx_));
   }
 
   // Transfer resources to prev_tuple_pool_ when enough resources have accumulated
@@ -846,7 +872,10 @@ void AnalyticEvalNode::Close(RuntimeState* state) {
     }
     evaluators_[i]->Close(state);
   }
-  for (int i = 0; i < fn_ctxs_.size(); ++i) fn_ctxs_[i]->impl()->Close();
+  for (int i = 0; i < fn_ctxs_.size(); ++i) {
+    fn_ctxs_[i]->impl()->FreeLocalAllocations();
+    fn_ctxs_[i]->impl()->Close();
+  }
 
   if (partition_by_eq_expr_ctx_ != NULL) partition_by_eq_expr_ctx_->Close(state);
   if (order_by_eq_expr_ctx_ != NULL) order_by_eq_expr_ctx_->Close(state);
@@ -878,6 +907,7 @@ Status AnalyticEvalNode::QueryMaintenance(RuntimeState* state) {
   for (int i = 0; i < evaluators_.size(); ++i) {
     ExprContext::FreeLocalAllocations(evaluators_[i]->input_expr_ctxs());
   }
+  ExprContext::FreeLocalAllocations(fn_ctxs_);
   return ExecNode::QueryMaintenance(state);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 68b5bbc..579ad5b 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -132,21 +132,24 @@ class AnalyticEvalNode : public ExecNode {
   Status AddRow(int64_t stream_idx, TupleRow* row);
 
   /// Determines if there is a window ending at the previous row, and if so, calls
-  /// AddResultTuple() with the index of the previous row in input_stream_. next_partition
-  /// indicates if the current row is the start of a new partition. stream_idx is the
-  /// index of the current input row from input_stream_.
-  void TryAddResultTupleForPrevRow(bool next_partition, int64_t stream_idx,
+  /// AddResultTuple() with the index of the previous row in 'input_stream_'.
+  /// 'next_partition' indicates if the current row is the start of a new partition.
+  /// 'stream_idx' is the index of the current input row from 'input_stream_'.
+  /// Returns an error when memory limit is exceeded.
+  Status TryAddResultTupleForPrevRow(bool next_partition, int64_t stream_idx,
       TupleRow* row);
 
   /// Determines if there is a window ending at the current row, and if so, calls
-  /// AddResultTuple() with the index of the current row in input_stream_. stream_idx is
-  /// the index of the current input row from input_stream_.
-  void TryAddResultTupleForCurrRow(int64_t stream_idx, TupleRow* row);
+  /// AddResultTuple() with the index of the current row in 'input_stream_'.
+  /// 'stream_idx' is the index of the current input row from 'input_stream_'.
+  /// Returns an error when memory limit is exceeded.
+  Status TryAddResultTupleForCurrRow(int64_t stream_idx, TupleRow* row);
 
   /// Adds additional result tuples at the end of a partition, e.g. if the end bound is
   /// FOLLOWING. partition_idx is the index into input_stream_ of the new partition,
-  /// prev_partition_idx is the index of the previous partition.
-  void TryAddRemainingResults(int64_t partition_idx, int64_t prev_partition_idx);
+  /// 'prev_partition_idx' is the index of the previous partition.
+  /// Returns an error when memory limit is exceeded.
+  Status TryAddRemainingResults(int64_t partition_idx, int64_t prev_partition_idx);
 
   /// Removes rows from curr_tuple_ (by calling AggFnEvaluator::Remove()) that are no
   /// longer in the window (i.e. they are before the window start boundary). stream_idx
@@ -159,9 +162,10 @@ class AnalyticEvalNode : public ExecNode {
   Status InitNextPartition(RuntimeState* state, int64_t stream_idx);
 
   /// Produces a result tuple with analytic function results by calling GetValue() or
-  /// Finalize() for curr_tuple_ on the evaluators_. The result tuple is stored in
-  /// result_tuples_ with the index into input_stream_ specified by stream_idx.
-  void AddResultTuple(int64_t stream_idx);
+  /// Finalize() for 'curr_tuple_' on the 'evaluators_'. The result tuple is stored in
+  /// 'result_tuples_' with the index into 'input_stream_' specified by 'stream_idx'.
+  /// Returns an error when memory limit is exceeded.
+  Status AddResultTuple(int64_t stream_idx);
 
   /// Gets the number of rows that are ready to be returned by subsequent calls to
   /// GetNextOutputBatch().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index cde969c..b3ecda0 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -147,9 +147,11 @@ class AggFnEvaluator {
   void Finalize(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
 
   /// Puts the finalized value from Tuple* src in Tuple* dst just as Finalize() does.
-  /// However, unlike Finalize(), GetValue() does not clean up state in src. GetValue()
-  /// can be called repeatedly with the same src. Only used internally for analytic fn
-  /// builtins.
+  /// However, unlike Finalize(), GetValue() does not clean up state in src.
+  /// GetValue() can be called repeatedly with the same src. Only used internally for
+  /// analytic fn builtins. Note that StringVal result is from local allocation (which
+  /// will be freed in the next QueryMaintenance()) so it needs to be copied out if it
+  /// needs to survive beyond QueryMaintenance() (e.g. if 'dst' lives in a row batch).
   void GetValue(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
 
   /// Helper functions for calling the above functions on many evaluators.
@@ -229,10 +231,13 @@ class AggFnEvaluator {
   /// fn must be a function that implement's the UDA Update() signature.
   void Update(FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst, void* fn);
 
-  /// Sets up the arguments to call fn. This converts from the agg-expr signature,
+  /// Sets up the arguments to call 'fn'. This converts from the agg-expr signature,
   /// taking TupleRow to the UDA signature taking AnvVals. Writes the serialize/finalize
-  /// result to the given destination slot/tuple. The fn can be NULL to indicate the src
-  /// value should simply be written into the destination.
+  /// result to the given destination slot/tuple. 'fn' can be NULL to indicate the src
+  /// value should simply be written into the destination. Note that StringVal result is
+  /// from local allocation (which will be freed in the next QueryMaintenance()) so it
+  /// needs to be copied out if it needs to survive beyond QueryMaintenance() (e.g. if
+  /// 'dst' lives in a row batch).
   void SerializeOrFinalize(FunctionContext* agg_fn_ctx, Tuple* src,
       const SlotDescriptor* dst_slot_desc, Tuple* dst, void* fn);
 
@@ -265,7 +270,7 @@ inline void AggFnEvaluator::GetValue(
 }
 
 inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Init(fn_ctxs[i], dst);
@@ -279,28 +284,28 @@ inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evaluators,
   }
 }
 inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Remove(fn_ctxs[i], src, dst);
   }
 }
 inline void AggFnEvaluator::Serialize(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Serialize(fn_ctxs[i], dst);
   }
 }
 inline void AggFnEvaluator::GetValue(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->GetValue(fn_ctxs[i], src, dst);
   }
 }
 inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Finalize(fn_ctxs[i], src, dst);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exprs/aggregate-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 44b72a3..dd81005 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1583,10 +1583,19 @@ void AggregateFunctions::OffsetFnInit(FunctionContext* ctx, T* dst) {
   *dst = *static_cast<T*>(ctx->GetConstantArg(2));
 }
 
+template <>
+void AggregateFunctions::OffsetFnInit(FunctionContext* ctx, StringVal* dst) {
+  DCHECK_EQ(ctx->GetNumArgs(), 3);
+  DCHECK(ctx->IsArgConstant(1));
+  DCHECK(ctx->IsArgConstant(2));
+  DCHECK_EQ(ctx->GetArgType(0)->type, ctx->GetArgType(2)->type);
+  CopyStringVal(ctx, *static_cast<StringVal*>(ctx->GetConstantArg(2)), dst);
+}
+
 template <typename T>
 void AggregateFunctions::OffsetFnUpdate(FunctionContext* ctx, const T& src,
     const BigIntVal&, const T& default_value, T* dst) {
-  *dst = src;
+  UpdateVal(ctx, src, dst);
 }
 
 // Stamp out the templates for the types we need.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 13634dc..4ad8d8b 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -194,7 +194,8 @@ void FunctionContextImpl::Close() {
   stringstream error_ss;
   if (!debug_) {
     if (pool_->net_allocations() > 0) {
-      error_ss << "Memory leaked via FunctionContext::Allocate()";
+      error_ss << "Memory leaked via FunctionContext::Allocate() "
+               << "or FunctionContext::AllocateLocal()";
     } else if (pool_->net_allocations() < 0) {
       error_ss << "FunctionContext::Free() called on buffer that was already freed or "
                   "was not allocated.";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index fb4accc..4fcecb2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1035,7 +1035,9 @@ public class BuiltinsDb extends Db {
           db, "lead", Lists.newArrayList(t, Type.BIGINT, t), t, t,
           prefix + OFFSET_FN_INIT_SYMBOL.get(t),
           prefix + OFFSET_FN_UPDATE_SYMBOL.get(t),
-          null, null, null));
+          null,
+          t == Type.STRING ? stringValGetValue : null,
+          t == Type.STRING ? stringValSerializeOrFinalize : null));
 
       // lead() and lag() the default offset and the default value should be
       // rewritten to call the overrides that take all parameters.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
index 93431bf..62c9a67 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
@@ -1893,3 +1893,17 @@ DECIMAL, DECIMAL
 12345,1234
 132842,1234
 ====
+---- QUERY
+# Regression test for IMPALA-4120: Invoke UDFs which do local allocations and verifies
+# that the results are copied out.
+select count(*) from (
+select
+  from_unixtime(lead(bigint_col, 1) over (order by id), 'yyyyMMddHH:mm:ss') as a,
+  lead(from_unixtime(bigint_col, 'yyyyMMddHH:mm:ss'), 1) over (order by id) AS b
+from functional.alltypes) x
+where x.a = x.b
+---- TYPES
+BIGINT
+---- RESULTS
+7299
+====


[2/2] incubator-impala git commit: IMPALA-4285/IMPALA-4286: Fixes for Parquet scanner with MT_DOP > 0.

Posted by kw...@apache.org.
IMPALA-4285/IMPALA-4286: Fixes for Parquet scanner with MT_DOP > 0.

IMPALA-4258: The problem was that there was a reference to
HdfsScanner::batch_ hidden inside WriteEmptyTuples(). The batch_
reference is NULL when the scanner is run with MT_DOP > 1.

IMPALA-4286: When there are no scan ranges HdfsScanNodeBase::Open()
exits early without initializing the reader context. This lead to
a DCHECK in IoMgr::GetNextRange() called from HdfsScanNodeMt.
The fix is to remove that unnecessary short-circuit Open().

I combined these two bugfixes because the new basic test covers
both cases.

Testing: Added a new test_mt_dop.py test. A private code/hdfs
run passed.

Change-Id: I79c0f6fd2aeb4bc6fa5f87219a485194fef2db1b
Reviewed-on: http://gerrit.cloudera.org:8080/4767
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ff6b450a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ff6b450a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ff6b450a

Branch: refs/heads/master
Commit: ff6b450ad380ce840e18875a89d9cf98058277a3
Parents: 51268c0
Author: Alex Behm <al...@cloudera.com>
Authored: Wed Oct 19 23:27:14 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Oct 22 10:24:24 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-avro-scanner.cc                |  2 +-
 be/src/exec/hdfs-parquet-scanner.cc             |  2 +-
 be/src/exec/hdfs-rcfile-scanner.cc              |  2 +-
 be/src/exec/hdfs-scan-node-base.cc              |  2 -
 be/src/exec/hdfs-scanner.cc                     | 64 ++------------------
 be/src/exec/hdfs-scanner.h                      | 14 ++---
 be/src/exec/hdfs-sequence-scanner.cc            |  4 +-
 be/src/exec/hdfs-text-scanner.cc                |  2 +-
 .../queries/QueryTest/mt-dop.test               |  9 +++
 tests/query_test/test_mt_dop.py                 | 47 ++++++++++++++
 10 files changed, 73 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 88d6d3a..91a9d03 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -538,7 +538,7 @@ Status HdfsAvroScanner::ProcessRange() {
       int num_to_commit;
       if (scan_node_->materialized_slots().empty()) {
         // No slots to materialize (e.g. count(*)), no need to decode data
-        num_to_commit = WriteEmptyTuples(context_, tuple_row, max_tuples);
+        num_to_commit = WriteTemplateTuples(tuple_row, max_tuples);
       } else {
         if (codegend_decode_avro_data_ != NULL) {
           num_to_commit = codegend_decode_avro_data_(this, max_tuples, pool, &data,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e91a7ec..542f4cc 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -339,7 +339,7 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
     int rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
     int max_tuples = min(row_batch->capacity(), rows_remaining);
     TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
-    int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
+    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
     Status status = CommitRows(row_batch, num_to_commit);
     assemble_rows_timer_.Stop();
     RETURN_IF_ERROR(status);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index f43b2aa..012a424 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -485,7 +485,7 @@ Status HdfsRCFileScanner::ProcessRange() {
         // If there are no materialized slots (e.g. count(*) or just partition cols)
         // we can shortcircuit the parse loop
         row_pos_ += max_tuples;
-        int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples);
+        int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
         COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
         RETURN_IF_ERROR(CommitRows(num_to_commit));
         continue;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index cf6708c..957338d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -351,8 +351,6 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) {
 Status HdfsScanNodeBase::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Open(state));
 
-  if (file_descs_.empty()) return Status::OK();
-
   // Open collection conjuncts
   for (const auto& entry: conjuncts_map_) {
     // conjuncts_ are already opened in ExecNode::Open()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 0b6e8c5..3885522 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -211,66 +211,14 @@ Status HdfsScanner::CommitRows(int num_rows) {
   return Status::OK();
 }
 
-// In this code path, no slots were materialized from the input files.  The only
-// slots are from partition keys.  This lets us simplify writing out the batches.
-//   1. template_tuple_ is the complete tuple.
-//   2. Eval conjuncts against the tuple.
-//   3. If it passes, stamp out 'num_tuples' copies of it into the row_batch.
-int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) {
-  DCHECK_GT(num_tuples, 0);
-
-  if (template_tuple_ == NULL) {
-    // No slots from partitions keys or slots.  This is count(*).  Just add the
-    // number of rows to the batch.
-    row_batch->AddRows(num_tuples);
-    row_batch->CommitRows(num_tuples);
-  } else {
-    // Make a row and evaluate the row
-    int row_idx = row_batch->AddRow();
-
-    TupleRow* current_row = row_batch->GetRow(row_idx);
-    current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
-    if (!EvalConjuncts(current_row)) return 0;
-    // Add first tuple
-    row_batch->CommitLastRow();
-    --num_tuples;
-
-    DCHECK_LE(num_tuples, row_batch->capacity() - row_batch->num_rows());
-
-    for (int n = 0; n < num_tuples; ++n) {
-      DCHECK(!row_batch->AtCapacity());
-      TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
-      current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
-      row_batch->CommitLastRow();
-    }
-  }
-  return num_tuples;
-}
-
-// In this code path, no slots were materialized from the input files.  The only
-// slots are from partition keys.  This lets us simplify writing out the batches.
-//   1. template_tuple_ is the complete tuple.
-//   2. Eval conjuncts against the tuple.
-//   3. If it passes, stamp out 'num_tuples' copies of it into the row_batch.
-int HdfsScanner::WriteEmptyTuples(ScannerContext* context,
-    TupleRow* row, int num_tuples) {
+int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) {
   DCHECK_GE(num_tuples, 0);
-  if (num_tuples == 0) return 0;
-
-  if (template_tuple_ == NULL) {
-    // Must be conjuncts on constant exprs.
-    if (!EvalConjuncts(row)) return 0;
-    return num_tuples;
-  } else {
-    row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
-    if (!EvalConjuncts(row)) return 0;
-    row = next_row(row);
+  DCHECK_EQ(scan_node_->tuple_idx(), 0);
+  DCHECK_EQ(scanner_conjunct_ctxs_->size(), 0);
+  if (num_tuples == 0 || template_tuple_ == NULL) return num_tuples;
 
-    for (int n = 1; n < num_tuples; ++n) {
-      row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
-      row = next_row(row);
-    }
-  }
+  Tuple** row_tuple = reinterpret_cast<Tuple**>(row);
+  for (int i = 0; i < num_tuples; ++i) row_tuple[i] = template_tuple_;
   return num_tuples;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 4a4d366..71efd5a 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -347,14 +347,9 @@ class HdfsScanner {
                                    scanner_conjunct_ctxs_->size(), row);
   }
 
-  /// Utility method to write out tuples when there are no materialized
-  /// fields (e.g. select count(*) or only partition keys).
-  ///   num_tuples - Total number of tuples to write out.
-  /// Returns the number of tuples added to the row batch.
-  int WriteEmptyTuples(RowBatch* row_batch, int num_tuples);
-
-  /// Write empty tuples and commit them to the context object
-  int WriteEmptyTuples(ScannerContext* context, TupleRow* tuple_row, int num_tuples);
+  /// Sets 'num_tuples' template tuples in the batch that 'row' points to. Assumes the
+  /// 'tuple_row' only has a single tuple. Returns the number of tuples set.
+  int WriteTemplateTuples(TupleRow* row, int num_tuples);
 
   /// Processes batches of fields and writes them out to tuple_row_mem.
   /// - 'pool' mempool to allocate from for auxiliary tuple memory
@@ -455,9 +450,10 @@ class HdfsScanner {
     return reinterpret_cast<Tuple*>(mem + tuple_byte_size);
   }
 
+  /// Assumes the row only has a single tuple.
   inline TupleRow* next_row(TupleRow* r) const {
     uint8_t* mem = reinterpret_cast<uint8_t*>(r);
-    return reinterpret_cast<TupleRow*>(mem + batch_->row_byte_size());
+    return reinterpret_cast<TupleRow*>(mem + sizeof(Tuple*));
   }
 
   /// Simple wrapper around scanner_conjunct_ctxs_. Used in the codegen'd version of

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index fd552be..33be362 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -214,7 +214,7 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
 
   if (scan_node_->materialized_slots().empty()) {
     // Handle case where there are no slots to materialize (e.g. count(*))
-    num_to_process = WriteEmptyTuples(context_, tuple_row, num_to_process);
+    num_to_process = WriteTemplateTuples(tuple_row, num_to_process);
     COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process);
     RETURN_IF_ERROR(CommitRows(num_to_process));
     return Status::OK();
@@ -334,7 +334,7 @@ Status HdfsSequenceScanner::ProcessRange() {
         RETURN_IF_ERROR(parse_status_);
       }
     } else {
-      add_row = WriteEmptyTuples(context_, tuple_row_mem, 1);
+      add_row = WriteTemplateTuples(tuple_row_mem, 1);
     }
 
     COUNTER_ADD(scan_node_->rows_read_counter(), 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index cc63408..0b048f4 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -400,7 +400,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) {
       SCOPED_TIMER(scan_node_->materialize_tuple_timer());
       // If we are doing count(*) then we return tuples only containing partition keys
       boundary_row_.Clear();
-      num_tuples_materialized = WriteEmptyTuples(context_, tuple_row_mem, *num_tuples);
+      num_tuples_materialized = WriteTemplateTuples(tuple_row_mem, *num_tuples);
     }
 
     // Save contents that are split across buffers if we are going to return this column

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
new file mode 100644
index 0000000..ac453ca
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop.test
@@ -0,0 +1,9 @@
+====
+---- QUERY
+# IMPALA-4285: Test scan with no materialized slots.
+select count(*) from alltypes
+---- RESULTS
+7300
+---- TYPES
+BIGINT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff6b450a/tests/query_test/test_mt_dop.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
new file mode 100644
index 0000000..1cd6d31
--- /dev/null
+++ b/tests/query_test/test_mt_dop.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests queries with the MT_DOP query option.
+
+import pytest
+
+from copy import deepcopy
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_vector import TestDimension
+from tests.common.test_vector import TestVector
+
+MT_DOP_VALUES = [1, 2, 8]
+
+class TestMtDop(ImpalaTestSuite):
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestMtDop, cls).add_test_dimensions()
+    cls.TestMatrix.add_dimension(TestDimension('mt_dop', *MT_DOP_VALUES))
+    # IMPALA-4332: The MT scheduler does not work for Kudu or HBase tables.
+    cls.TestMatrix.add_constraint(\
+        lambda v: v.get_value('table_format').file_format != 'hbase')
+    cls.TestMatrix.add_constraint(\
+        lambda v: v.get_value('table_format').file_format != 'kudu')
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  def test_mt_dop(self, vector):
+    new_vector = deepcopy(vector)
+    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
+    self.run_test_case('QueryTest/mt-dop', new_vector)