You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/25 20:16:10 UTC

[23/33] incubator-impala git commit: IMPALA-4120: Incorrect results with LEAD() analytic function

IMPALA-4120: Incorrect results with LEAD() analytic function

This change fixes a memory management problem with LEAD()/LAG()
analytic functions which led to incorrect result. In particular,
the update functions specified for these analytic functions only
make a shallow copy of StringVal (i.e. copying only the pointer
and the length of the string) without copying the string itself.
This may lead to problem if the string is created from some UDFs
which do local allocations whose buffer may be freed and reused
before the result tuple is copied out. This change fixes the problem
above by allocating a buffer at the Init() functions of these
analytic functions to track the intermediate value. In addition,
when the value is copied out in GetValue(), it will be copied into
the MemPool belonging to the AnalyticEvalNode and attached to the
outgoing row batches. This change also fixes a missing free of
local allocations in QueryMaintenance().

Change-Id: I85bb1745232d8dd383a6047c86019c6378ab571f
Reviewed-on: http://gerrit.cloudera.org:8080/4740
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/51268c05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/51268c05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/51268c05

Branch: refs/heads/hadoop-next
Commit: 51268c053ffe41dc1aa9f1b250878113d4225258
Parents: 4808527
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Sep 26 16:04:27 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Oct 22 07:39:37 2016 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc               | 88 +++++++++++++-------
 be/src/exec/analytic-eval-node.h                | 28 ++++---
 be/src/exprs/agg-fn-evaluator.h                 | 27 +++---
 be/src/exprs/aggregate-functions-ir.cc          | 11 ++-
 be/src/udf/udf.cc                               |  3 +-
 .../org/apache/impala/catalog/BuiltinsDb.java   |  4 +-
 .../queries/QueryTest/analytic-fns.test         | 14 ++++
 7 files changed, 120 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 21a0805..06b0467 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -359,7 +359,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
     }
   }
 
-  Status status = Status::OK();
+  Status status;
   // Buffer the entire input row to be returned later with the analytic eval results.
   if (UNLIKELY(!input_stream_->AddRow(row, &status))) {
     // AddRow returns false if an error occurs (available via status()) or there is
@@ -377,45 +377,62 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
     }
   }
   DCHECK(status.ok());
-  return status;
+  return Status::OK();
 }
 
-void AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
+Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
   VLOG_ROW << id() << " AddResultTuple idx=" << stream_idx;
   DCHECK(curr_tuple_ != NULL);
-  Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(),
-      curr_tuple_pool_.get());
+  MemPool* cur_tuple_pool = curr_tuple_pool_.get();
+  Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), cur_tuple_pool);
 
   AggFnEvaluator::GetValue(evaluators_, fn_ctxs_, curr_tuple_, result_tuple);
+  // Copy any string data in 'result_tuple' into 'cur_tuple_pool_'.
+  for (const SlotDescriptor* slot_desc : result_tuple_desc_->slots()) {
+    if (!slot_desc->type().IsVarLenStringType()) continue;
+    StringValue* sv = reinterpret_cast<StringValue*>(
+        result_tuple->GetSlot(slot_desc->tuple_offset()));
+    if (sv == NULL || sv->len == 0) continue;
+    char* new_ptr = reinterpret_cast<char*>(cur_tuple_pool->TryAllocate(sv->len));
+    if (UNLIKELY(new_ptr == NULL)) {
+      return cur_tuple_pool->mem_tracker()->MemLimitExceeded(NULL,
+          "Failed to allocate memory for analytic function's result.", sv->len);
+    }
+    memcpy(new_ptr, sv->ptr, sv->len);
+    sv->ptr = new_ptr;
+  }
+
   DCHECK_GT(stream_idx, last_result_idx_);
   result_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, result_tuple));
   last_result_idx_ = stream_idx;
   VLOG_ROW << id() << " Added result tuple, final state: " << DebugStateString(true);
+  return Status::OK();
 }
 
-inline void AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition,
+inline Status AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition,
     int64_t stream_idx, TupleRow* row) {
   // The analytic fns are finalized after the previous row if we found a new partition
   // or the window is a RANGE and the order by exprs changed. For ROWS windows we do not
   // need to compare the current row to the previous row.
   VLOG_ROW << id() << " TryAddResultTupleForPrevRow partition=" << next_partition
            << " idx=" << stream_idx;
-  if (fn_scope_ == ROWS) return;
-  if (next_partition || (fn_scope_ == RANGE && window_.__isset.window_end &&
-      !PrevRowCompare(order_by_eq_expr_ctx_))) {
-    AddResultTuple(stream_idx - 1);
+  if (fn_scope_ != ROWS && (next_partition || (fn_scope_ == RANGE &&
+      window_.__isset.window_end && !PrevRowCompare(order_by_eq_expr_ctx_)))) {
+    RETURN_IF_ERROR(AddResultTuple(stream_idx - 1));
   }
+  return Status::OK();
 }
 
-inline void AnalyticEvalNode::TryAddResultTupleForCurrRow(int64_t stream_idx,
+inline Status AnalyticEvalNode::TryAddResultTupleForCurrRow(int64_t stream_idx,
     TupleRow* row) {
   VLOG_ROW << id() << " TryAddResultTupleForCurrRow idx=" << stream_idx;
   // We only add results at this point for ROWS windows (unless unbounded following)
-  if (fn_scope_ != ROWS || !window_.__isset.window_end) return;
-
   // Nothing to add if the end offset is before the start of the partition.
-  if (stream_idx - rows_end_offset_ < curr_partition_idx_) return;
-  AddResultTuple(stream_idx - rows_end_offset_);
+  if (fn_scope_ == ROWS && window_.__isset.window_end &&
+      stream_idx - rows_end_offset_ >= curr_partition_idx_) {
+    RETURN_IF_ERROR(AddResultTuple(stream_idx - rows_end_offset_));
+  }
+  return Status::OK();
 }
 
 inline void AnalyticEvalNode::TryRemoveRowsBeforeWindow(int64_t stream_idx) {
@@ -435,21 +452,27 @@ inline void AnalyticEvalNode::TryRemoveRowsBeforeWindow(int64_t stream_idx) {
   window_tuples_.pop_front();
 }
 
-inline void AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
+inline Status AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
     int64_t prev_partition_idx) {
   DCHECK_LT(prev_partition_idx, partition_idx);
   // For PARTITION, RANGE, or ROWS with UNBOUNDED PRECEDING: add a result tuple for the
   // remaining rows in the partition that do not have an associated result tuple yet.
   if (fn_scope_ != ROWS || !window_.__isset.window_end) {
-    if (last_result_idx_ < partition_idx - 1) AddResultTuple(partition_idx - 1);
-    return;
+    if (last_result_idx_ < partition_idx - 1) {
+      RETURN_IF_ERROR(AddResultTuple(partition_idx - 1));
+    }
+    return Status::OK();
   }
 
   // lead() is re-written to a ROWS window with an end bound FOLLOWING. Any remaining
   // results need the default value (set by Init()). If this is the case, the start bound
   // is UNBOUNDED PRECEDING (DCHECK in Init()).
   for (int i = 0; i < evaluators_.size(); ++i) {
-    if (is_lead_fn_[i]) evaluators_[i]->Init(fn_ctxs_[i], curr_tuple_);
+    if (is_lead_fn_[i]) {
+      // Needs to call Finalize() to release resources.
+      evaluators_[i]->Finalize(fn_ctxs_[i], curr_tuple_, dummy_result_tuple_);
+      evaluators_[i]->Init(fn_ctxs_[i], curr_tuple_);
+    }
   }
 
   // If the start bound is not UNBOUNDED PRECEDING and there are still rows in the
@@ -470,13 +493,16 @@ inline void AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
       AggFnEvaluator::Remove(evaluators_, fn_ctxs_, remove_row, curr_tuple_);
       window_tuples_.pop_front();
     }
-    AddResultTuple(last_result_idx_ + 1);
+    RETURN_IF_ERROR(AddResultTuple(last_result_idx_ + 1));
   }
 
   // If there are still rows between the row with the last result (AddResultTuple() may
   // have updated last_result_idx_) and the partition boundary, add the current results
   // for the remaining rows with the same result tuple (curr_tuple_ is not modified).
-  if (last_result_idx_ < partition_idx - 1) AddResultTuple(partition_idx - 1);
+  if (last_result_idx_ < partition_idx - 1) {
+    RETURN_IF_ERROR(AddResultTuple(partition_idx - 1));
+  }
+  return Status::OK();
 }
 
 inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
@@ -523,7 +549,7 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
 
   if (fn_scope_ == ROWS && stream_idx > 0 && (!window_.__isset.window_end ||
         window_.window_end.type == TAnalyticWindowBoundaryType::FOLLOWING)) {
-    TryAddRemainingResults(stream_idx, prev_partition_stream_idx);
+    RETURN_IF_ERROR(TryAddRemainingResults(stream_idx, prev_partition_stream_idx));
   }
   window_tuples_.clear();
 
@@ -557,10 +583,10 @@ inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
       // 2) Insert the initial result tuple at first_val_null_offset_. This happens when
       //    the end bound was actually Y PRECEDING.
       if (first_val_null_offset_ != -1) {
-        AddResultTuple(curr_partition_idx_ + first_val_null_offset_ - 1);
+        RETURN_IF_ERROR(AddResultTuple(curr_partition_idx_ + first_val_null_offset_ - 1));
       }
     } else {
-      AddResultTuple(curr_partition_idx_ - rows_end_offset_ - 1);
+      RETURN_IF_ERROR(AddResultTuple(curr_partition_idx_ - rows_end_offset_ - 1));
     }
   }
   return Status::OK();
@@ -614,7 +640,7 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
   if (UNLIKELY(stream_idx == 0 && curr_child_batch_->num_rows() > 0)) {
     TupleRow* row = curr_child_batch_->GetRow(0);
     RETURN_IF_ERROR(AddRow(0, row));
-    TryAddResultTupleForCurrRow(0, row);
+    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(0, row));
     prev_input_row_ = row;
     ++batch_idx;
     ++stream_idx;
@@ -647,19 +673,19 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
       // partition_by_eq_expr_ctx_ checks equality over the predicate exprs
       next_partition = !PrevRowCompare(partition_by_eq_expr_ctx_);
     }
-    TryAddResultTupleForPrevRow(next_partition, stream_idx, row);
+    RETURN_IF_ERROR(TryAddResultTupleForPrevRow(next_partition, stream_idx, row));
     if (next_partition) RETURN_IF_ERROR(InitNextPartition(state, stream_idx));
 
     // The evaluators_ are updated with the current row.
     RETURN_IF_ERROR(AddRow(stream_idx, row));
 
-    TryAddResultTupleForCurrRow(stream_idx, row);
+    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(stream_idx, row));
     prev_input_row_ = row;
   }
 
   if (UNLIKELY(input_eos_ && stream_idx > curr_partition_idx_)) {
     // We need to add the results for the last row(s).
-    TryAddRemainingResults(stream_idx, curr_partition_idx_);
+    RETURN_IF_ERROR(TryAddRemainingResults(stream_idx, curr_partition_idx_));
   }
 
   // Transfer resources to prev_tuple_pool_ when enough resources have accumulated
@@ -846,7 +872,10 @@ void AnalyticEvalNode::Close(RuntimeState* state) {
     }
     evaluators_[i]->Close(state);
   }
-  for (int i = 0; i < fn_ctxs_.size(); ++i) fn_ctxs_[i]->impl()->Close();
+  for (int i = 0; i < fn_ctxs_.size(); ++i) {
+    fn_ctxs_[i]->impl()->FreeLocalAllocations();
+    fn_ctxs_[i]->impl()->Close();
+  }
 
   if (partition_by_eq_expr_ctx_ != NULL) partition_by_eq_expr_ctx_->Close(state);
   if (order_by_eq_expr_ctx_ != NULL) order_by_eq_expr_ctx_->Close(state);
@@ -878,6 +907,7 @@ Status AnalyticEvalNode::QueryMaintenance(RuntimeState* state) {
   for (int i = 0; i < evaluators_.size(); ++i) {
     ExprContext::FreeLocalAllocations(evaluators_[i]->input_expr_ctxs());
   }
+  ExprContext::FreeLocalAllocations(fn_ctxs_);
   return ExecNode::QueryMaintenance(state);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 68b5bbc..579ad5b 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -132,21 +132,24 @@ class AnalyticEvalNode : public ExecNode {
   Status AddRow(int64_t stream_idx, TupleRow* row);
 
   /// Determines if there is a window ending at the previous row, and if so, calls
-  /// AddResultTuple() with the index of the previous row in input_stream_. next_partition
-  /// indicates if the current row is the start of a new partition. stream_idx is the
-  /// index of the current input row from input_stream_.
-  void TryAddResultTupleForPrevRow(bool next_partition, int64_t stream_idx,
+  /// AddResultTuple() with the index of the previous row in 'input_stream_'.
+  /// 'next_partition' indicates if the current row is the start of a new partition.
+  /// 'stream_idx' is the index of the current input row from 'input_stream_'.
+  /// Returns an error when memory limit is exceeded.
+  Status TryAddResultTupleForPrevRow(bool next_partition, int64_t stream_idx,
       TupleRow* row);
 
   /// Determines if there is a window ending at the current row, and if so, calls
-  /// AddResultTuple() with the index of the current row in input_stream_. stream_idx is
-  /// the index of the current input row from input_stream_.
-  void TryAddResultTupleForCurrRow(int64_t stream_idx, TupleRow* row);
+  /// AddResultTuple() with the index of the current row in 'input_stream_'.
+  /// 'stream_idx' is the index of the current input row from 'input_stream_'.
+  /// Returns an error when memory limit is exceeded.
+  Status TryAddResultTupleForCurrRow(int64_t stream_idx, TupleRow* row);
 
   /// Adds additional result tuples at the end of a partition, e.g. if the end bound is
   /// FOLLOWING. partition_idx is the index into input_stream_ of the new partition,
-  /// prev_partition_idx is the index of the previous partition.
-  void TryAddRemainingResults(int64_t partition_idx, int64_t prev_partition_idx);
+  /// 'prev_partition_idx' is the index of the previous partition.
+  /// Returns an error when memory limit is exceeded.
+  Status TryAddRemainingResults(int64_t partition_idx, int64_t prev_partition_idx);
 
   /// Removes rows from curr_tuple_ (by calling AggFnEvaluator::Remove()) that are no
   /// longer in the window (i.e. they are before the window start boundary). stream_idx
@@ -159,9 +162,10 @@ class AnalyticEvalNode : public ExecNode {
   Status InitNextPartition(RuntimeState* state, int64_t stream_idx);
 
   /// Produces a result tuple with analytic function results by calling GetValue() or
-  /// Finalize() for curr_tuple_ on the evaluators_. The result tuple is stored in
-  /// result_tuples_ with the index into input_stream_ specified by stream_idx.
-  void AddResultTuple(int64_t stream_idx);
+  /// Finalize() for 'curr_tuple_' on the 'evaluators_'. The result tuple is stored in
+  /// 'result_tuples_' with the index into 'input_stream_' specified by 'stream_idx'.
+  /// Returns an error when memory limit is exceeded.
+  Status AddResultTuple(int64_t stream_idx);
 
   /// Gets the number of rows that are ready to be returned by subsequent calls to
   /// GetNextOutputBatch().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index cde969c..b3ecda0 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -147,9 +147,11 @@ class AggFnEvaluator {
   void Finalize(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
 
   /// Puts the finalized value from Tuple* src in Tuple* dst just as Finalize() does.
-  /// However, unlike Finalize(), GetValue() does not clean up state in src. GetValue()
-  /// can be called repeatedly with the same src. Only used internally for analytic fn
-  /// builtins.
+  /// However, unlike Finalize(), GetValue() does not clean up state in src.
+  /// GetValue() can be called repeatedly with the same src. Only used internally for
+  /// analytic fn builtins. Note that StringVal result is from local allocation (which
+  /// will be freed in the next QueryMaintenance()) so it needs to be copied out if it
+  /// needs to survive beyond QueryMaintenance() (e.g. if 'dst' lives in a row batch).
   void GetValue(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
 
   /// Helper functions for calling the above functions on many evaluators.
@@ -229,10 +231,13 @@ class AggFnEvaluator {
   /// fn must be a function that implement's the UDA Update() signature.
   void Update(FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst, void* fn);
 
-  /// Sets up the arguments to call fn. This converts from the agg-expr signature,
+  /// Sets up the arguments to call 'fn'. This converts from the agg-expr signature,
   /// taking TupleRow to the UDA signature taking AnvVals. Writes the serialize/finalize
-  /// result to the given destination slot/tuple. The fn can be NULL to indicate the src
-  /// value should simply be written into the destination.
+  /// result to the given destination slot/tuple. 'fn' can be NULL to indicate the src
+  /// value should simply be written into the destination. Note that StringVal result is
+  /// from local allocation (which will be freed in the next QueryMaintenance()) so it
+  /// needs to be copied out if it needs to survive beyond QueryMaintenance() (e.g. if
+  /// 'dst' lives in a row batch).
   void SerializeOrFinalize(FunctionContext* agg_fn_ctx, Tuple* src,
       const SlotDescriptor* dst_slot_desc, Tuple* dst, void* fn);
 
@@ -265,7 +270,7 @@ inline void AggFnEvaluator::GetValue(
 }
 
 inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Init(fn_ctxs[i], dst);
@@ -279,28 +284,28 @@ inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evaluators,
   }
 }
 inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Remove(fn_ctxs[i], src, dst);
   }
 }
 inline void AggFnEvaluator::Serialize(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Serialize(fn_ctxs[i], dst);
   }
 }
 inline void AggFnEvaluator::GetValue(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->GetValue(fn_ctxs[i], src, dst);
   }
 }
 inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Finalize(fn_ctxs[i], src, dst);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exprs/aggregate-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/aggregate-functions-ir.cc b/be/src/exprs/aggregate-functions-ir.cc
index 44b72a3..dd81005 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1583,10 +1583,19 @@ void AggregateFunctions::OffsetFnInit(FunctionContext* ctx, T* dst) {
   *dst = *static_cast<T*>(ctx->GetConstantArg(2));
 }
 
+template <>
+void AggregateFunctions::OffsetFnInit(FunctionContext* ctx, StringVal* dst) {
+  DCHECK_EQ(ctx->GetNumArgs(), 3);
+  DCHECK(ctx->IsArgConstant(1));
+  DCHECK(ctx->IsArgConstant(2));
+  DCHECK_EQ(ctx->GetArgType(0)->type, ctx->GetArgType(2)->type);
+  CopyStringVal(ctx, *static_cast<StringVal*>(ctx->GetConstantArg(2)), dst);
+}
+
 template <typename T>
 void AggregateFunctions::OffsetFnUpdate(FunctionContext* ctx, const T& src,
     const BigIntVal&, const T& default_value, T* dst) {
-  *dst = src;
+  UpdateVal(ctx, src, dst);
 }
 
 // Stamp out the templates for the types we need.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 13634dc..4ad8d8b 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -194,7 +194,8 @@ void FunctionContextImpl::Close() {
   stringstream error_ss;
   if (!debug_) {
     if (pool_->net_allocations() > 0) {
-      error_ss << "Memory leaked via FunctionContext::Allocate()";
+      error_ss << "Memory leaked via FunctionContext::Allocate() "
+               << "or FunctionContext::AllocateLocal()";
     } else if (pool_->net_allocations() < 0) {
       error_ss << "FunctionContext::Free() called on buffer that was already freed or "
                   "was not allocated.";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index fb4accc..4fcecb2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1035,7 +1035,9 @@ public class BuiltinsDb extends Db {
           db, "lead", Lists.newArrayList(t, Type.BIGINT, t), t, t,
           prefix + OFFSET_FN_INIT_SYMBOL.get(t),
           prefix + OFFSET_FN_UPDATE_SYMBOL.get(t),
-          null, null, null));
+          null,
+          t == Type.STRING ? stringValGetValue : null,
+          t == Type.STRING ? stringValSerializeOrFinalize : null));
 
       // lead() and lag() the default offset and the default value should be
       // rewritten to call the overrides that take all parameters.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
index 93431bf..62c9a67 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
@@ -1893,3 +1893,17 @@ DECIMAL, DECIMAL
 12345,1234
 132842,1234
 ====
+---- QUERY
+# Regression test for IMPALA-4120: Invoke UDFs which do local allocations and verifies
+# that the results are copied out.
+select count(*) from (
+select
+  from_unixtime(lead(bigint_col, 1) over (order by id), 'yyyyMMddHH:mm:ss') as a,
+  lead(from_unixtime(bigint_col, 'yyyyMMddHH:mm:ss'), 1) over (order by id) AS b
+from functional.alltypes) x
+where x.a = x.b
+---- TYPES
+BIGINT
+---- RESULTS
+7299
+====