You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/10/13 14:04:31 UTC

[incubator-doris] branch master updated: Support variable arguments for UDAF (#1968)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eece1e  Support variable arguments for UDAF  (#1968)
7eece1e is described below

commit 7eece1e9e2768b5801e0cf6182a4f6c38d8f79ef
Author: WingC <10...@qq.com>
AuthorDate: Sun Oct 13 22:04:23 2019 +0800

    Support variable arguments for UDAF  (#1968)
---
 be/src/exprs/agg_fn.cc               |   3 +-
 be/src/exprs/agg_fn.h                |   6 ++
 be/src/exprs/new_agg_fn_evaluator.cc | 200 ++++++++++++++++++++++++++---------
 3 files changed, 156 insertions(+), 53 deletions(-)

diff --git a/be/src/exprs/agg_fn.cc b/be/src/exprs/agg_fn.cc
index 8d19474..0c561fe 100644
--- a/be/src/exprs/agg_fn.cc
+++ b/be/src/exprs/agg_fn.cc
@@ -35,7 +35,8 @@ AggFn::AggFn(const TExprNode& tnode, const SlotDescriptor& intermediate_slot_des
   : Expr(tnode),
     is_merge_(tnode.agg_expr.is_merge_agg),
     intermediate_slot_desc_(intermediate_slot_desc),
-    output_slot_desc_(output_slot_desc) {
+    output_slot_desc_(output_slot_desc),
+    _vararg_start_idx(tnode.__isset.vararg_start_idx ? tnode.vararg_start_idx : -1) {
   // TODO(pengyubing) arg_type_descs_ is used for codegen
   //    arg_type_descs_(AnyValUtil::column_type_to_type_desc(
   //        TypeDescriptor::from_thrift(tnode.agg_expr.arg_types))) {
diff --git a/be/src/exprs/agg_fn.h b/be/src/exprs/agg_fn.h
index 602728e..bed805d 100644
--- a/be/src/exprs/agg_fn.h
+++ b/be/src/exprs/agg_fn.h
@@ -150,6 +150,10 @@ class AggFn : public Expr {
   virtual std::string DebugString() const;
   static std::string DebugString(const std::vector<AggFn*>& exprs);
  
+  const int get_vararg_start_idx() const {
+      return _vararg_start_idx;
+  }
+
 private:
   friend class Expr;
   friend class NewAggFnEvaluator;
@@ -178,6 +182,8 @@ private:
   void* serialize_fn_ = nullptr;
   void* get_value_fn_ = nullptr;
   void* finalize_fn_ = nullptr;
+  
+  int _vararg_start_idx;
 
   AggFn(const TExprNode& node, const SlotDescriptor& intermediate_slot_desc,
       const SlotDescriptor& output_slot_desc);
diff --git a/be/src/exprs/new_agg_fn_evaluator.cc b/be/src/exprs/new_agg_fn_evaluator.cc
index bfc7a2f..d92eabf 100644
--- a/be/src/exprs/new_agg_fn_evaluator.cc
+++ b/be/src/exprs/new_agg_fn_evaluator.cc
@@ -66,6 +66,25 @@ typedef void (*UpdateFn7)(FunctionContext*, const AnyVal&, const AnyVal&,
 typedef void (*UpdateFn8)(FunctionContext*, const AnyVal&, const AnyVal&,
     const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&,
     const AnyVal&, AnyVal*);
+
+typedef void (*VarargUpdateFn0)(FunctionContext*, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn1)(FunctionContext*, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn2)(FunctionContext*, const AnyVal&, const AnyVal&, int num_varargs,
+        const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn3)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+        int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn4)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+        const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn5)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+        const AnyVal&, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn6)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+        const AnyVal&, const AnyVal&, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn7)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+        const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn8)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+        const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, int num_varargs,
+        const AnyVal*,  AnyVal*);
+
 typedef StringVal (*SerializeFn)(FunctionContext*, const StringVal&);
 typedef AnyVal (*GetValueFn)(FunctionContext*, const AnyVal&);
 typedef AnyVal (*FinalizeFn)(FunctionContext*, const AnyVal&);
@@ -394,58 +413,135 @@ void NewAggFnEvaluator::Update(const TupleRow* row, Tuple* dst, void* fn) {
   // TODO: this part is not so good and not scalable. It can be replaced with
   // codegen but we can also consider leaving it for the first few cases for
   // debugging.
-    switch (input_evals_.size()) {
-      case 0:
-        reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx_.get(), staging_intermediate_val_);
-        break;
-      case 1:
-        reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx_.get(),
-            *staging_input_vals_[0], staging_intermediate_val_);
-        break;
-      case 2:
-        reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx_.get(),
-            *staging_input_vals_[0], *staging_input_vals_[1], staging_intermediate_val_);
-        break;
-      case 3:
-        reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx_.get(),
-            *staging_input_vals_[0], *staging_input_vals_[1],
-            *staging_input_vals_[2], staging_intermediate_val_);
-        break;
-      case 4:
-        reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx_.get(),
-            *staging_input_vals_[0], *staging_input_vals_[1],
-            *staging_input_vals_[2], *staging_input_vals_[3], staging_intermediate_val_);
-        break;
-      case 5:
-        reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx_.get(),
-            *staging_input_vals_[0], *staging_input_vals_[1],
-            *staging_input_vals_[2], *staging_input_vals_[3],
-            *staging_input_vals_[4], staging_intermediate_val_);
-        break;
-      case 6:
-        reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx_.get(),
-            *staging_input_vals_[0], *staging_input_vals_[1],
-            *staging_input_vals_[2], *staging_input_vals_[3],
-            *staging_input_vals_[4], *staging_input_vals_[5], staging_intermediate_val_);
-        break;
-      case 7:
-        reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx_.get(),
-            *staging_input_vals_[0], *staging_input_vals_[1],
-            *staging_input_vals_[2], *staging_input_vals_[3],
-            *staging_input_vals_[4], *staging_input_vals_[5],
-            *staging_input_vals_[6], staging_intermediate_val_);
-        break;
-      case 8:
-        reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx_.get(),
-            *staging_input_vals_[0], *staging_input_vals_[1],
-            *staging_input_vals_[2], *staging_input_vals_[3],
-            *staging_input_vals_[4], *staging_input_vals_[5],
-            *staging_input_vals_[6], *staging_input_vals_[7],
-            staging_intermediate_val_);
-        break;
-      default:
-        DCHECK(false) << "NYI";
-    }
+  if (agg_fn_.get_vararg_start_idx() == -1) {
+      switch (input_evals_.size()) {
+          case 0:
+              reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx_.get(), staging_intermediate_val_);
+              break;
+          case 1:
+              reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx_.get(),
+                                              *staging_input_vals_[0], staging_intermediate_val_);
+              break;
+          case 2:
+              reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx_.get(),
+                                              *staging_input_vals_[0], *staging_input_vals_[1],
+                                              staging_intermediate_val_);
+              break;
+          case 3:
+              reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx_.get(),
+                                              *staging_input_vals_[0], *staging_input_vals_[1],
+                                              *staging_input_vals_[2], staging_intermediate_val_);
+              break;
+          case 4:
+              reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx_.get(),
+                                              *staging_input_vals_[0], *staging_input_vals_[1],
+                                              *staging_input_vals_[2], *staging_input_vals_[3],
+                                              staging_intermediate_val_);
+              break;
+          case 5:
+              reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx_.get(),
+                                              *staging_input_vals_[0], *staging_input_vals_[1],
+                                              *staging_input_vals_[2], *staging_input_vals_[3],
+                                              *staging_input_vals_[4], staging_intermediate_val_);
+              break;
+          case 6:
+              reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx_.get(),
+                                              *staging_input_vals_[0], *staging_input_vals_[1],
+                                              *staging_input_vals_[2], *staging_input_vals_[3],
+                                              *staging_input_vals_[4], *staging_input_vals_[5],
+                                              staging_intermediate_val_);
+              break;
+          case 7:
+              reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx_.get(),
+                                              *staging_input_vals_[0], *staging_input_vals_[1],
+                                              *staging_input_vals_[2], *staging_input_vals_[3],
+                                              *staging_input_vals_[4], *staging_input_vals_[5],
+                                              *staging_input_vals_[6], staging_intermediate_val_);
+              break;
+          case 8:
+              reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx_.get(),
+                                              *staging_input_vals_[0], *staging_input_vals_[1],
+                                              *staging_input_vals_[2], *staging_input_vals_[3],
+                                              *staging_input_vals_[4], *staging_input_vals_[5],
+                                              *staging_input_vals_[6], *staging_input_vals_[7],
+                                              staging_intermediate_val_);
+              break;
+          default:
+              DCHECK(false) << "NYI";
+      }
+  } else {
+      int num_varargs = input_evals_.size() - agg_fn_.get_vararg_start_idx();
+      const AnyVal* varargs = *(staging_input_vals_.data() + agg_fn_.get_vararg_start_idx());
+      switch (agg_fn_.get_vararg_start_idx()) {
+          case 0:
+              reinterpret_cast<VarargUpdateFn0>(fn)(agg_fn_ctx_.get(),
+                                                    num_varargs, varargs,
+                                                    staging_intermediate_val_);
+              break;
+          case 1:
+              reinterpret_cast<VarargUpdateFn1>(fn)(agg_fn_ctx_.get(),
+                                                    *staging_input_vals_[0],
+                                                    num_varargs, varargs,
+                                                    staging_intermediate_val_);
+              break;
+          case 2:
+              reinterpret_cast<VarargUpdateFn2>(fn)(agg_fn_ctx_.get(),
+                                                    *staging_input_vals_[0], *staging_input_vals_[1],
+                                                    num_varargs, varargs,
+                                                    staging_intermediate_val_);
+              break;
+          case 3:
+              reinterpret_cast<VarargUpdateFn3>(fn)(agg_fn_ctx_.get(),
+                                                    *staging_input_vals_[0], *staging_input_vals_[1],
+                                                    *staging_input_vals_[2],
+                                                    num_varargs, varargs,
+                                                    staging_intermediate_val_);
+              break;
+          case 4:
+              reinterpret_cast<VarargUpdateFn4>(fn)(agg_fn_ctx_.get(),
+                                                    *staging_input_vals_[0], *staging_input_vals_[1],
+                                                    *staging_input_vals_[2], *staging_input_vals_[3],
+                                                    num_varargs, varargs,
+                                                    staging_intermediate_val_);
+              break;
+          case 5:
+              reinterpret_cast<VarargUpdateFn5>(fn)(agg_fn_ctx_.get(),
+                                                    *staging_input_vals_[0], *staging_input_vals_[1],
+                                                    *staging_input_vals_[2], *staging_input_vals_[3],
+                                                    *staging_input_vals_[4],
+                                                    num_varargs, varargs,
+                                                    staging_intermediate_val_);
+              break;
+          case 6:
+              reinterpret_cast<VarargUpdateFn6>(fn)(agg_fn_ctx_.get(),
+                                                    *staging_input_vals_[0], *staging_input_vals_[1],
+                                                    *staging_input_vals_[2], *staging_input_vals_[3],
+                                                    *staging_input_vals_[4], *staging_input_vals_[5],
+                                                    num_varargs, varargs,
+                                                    staging_intermediate_val_);
+              break;
+          case 7:
+              reinterpret_cast<VarargUpdateFn7>(fn)(agg_fn_ctx_.get(),
+                                                    *staging_input_vals_[0], *staging_input_vals_[1],
+                                                    *staging_input_vals_[2], *staging_input_vals_[3],
+                                                    *staging_input_vals_[4], *staging_input_vals_[5],
+                                                    *staging_input_vals_[6],
+                                                    num_varargs, varargs,
+                                                    staging_intermediate_val_);
+              break;
+          case 8:
+              reinterpret_cast<VarargUpdateFn8>(fn)(agg_fn_ctx_.get(),
+                                                    *staging_input_vals_[0], *staging_input_vals_[1],
+                                                    *staging_input_vals_[2], *staging_input_vals_[3],
+                                                    *staging_input_vals_[4], *staging_input_vals_[5],
+                                                    *staging_input_vals_[6], *staging_input_vals_[7],
+                                                    num_varargs, varargs,
+                                                    staging_intermediate_val_);
+              break;
+          default:
+              DCHECK(false) << "NYI";
+      }
+  }
   SetDstSlot(staging_intermediate_val_, slot_desc, dst);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org