You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/10/13 14:04:31 UTC
[incubator-doris] branch master updated: Support variable arguments
for UDAF (#1968)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7eece1e Support variable arguments for UDAF (#1968)
7eece1e is described below
commit 7eece1e9e2768b5801e0cf6182a4f6c38d8f79ef
Author: WingC <10...@qq.com>
AuthorDate: Sun Oct 13 22:04:23 2019 +0800
Support variable arguments for UDAF (#1968)
---
be/src/exprs/agg_fn.cc | 3 +-
be/src/exprs/agg_fn.h | 6 ++
be/src/exprs/new_agg_fn_evaluator.cc | 200 ++++++++++++++++++++++++++---------
3 files changed, 156 insertions(+), 53 deletions(-)
diff --git a/be/src/exprs/agg_fn.cc b/be/src/exprs/agg_fn.cc
index 8d19474..0c561fe 100644
--- a/be/src/exprs/agg_fn.cc
+++ b/be/src/exprs/agg_fn.cc
@@ -35,7 +35,8 @@ AggFn::AggFn(const TExprNode& tnode, const SlotDescriptor& intermediate_slot_des
: Expr(tnode),
is_merge_(tnode.agg_expr.is_merge_agg),
intermediate_slot_desc_(intermediate_slot_desc),
- output_slot_desc_(output_slot_desc) {
+ output_slot_desc_(output_slot_desc),
+ _vararg_start_idx(tnode.__isset.vararg_start_idx ? tnode.vararg_start_idx : -1) {
// TODO(pengyubing) arg_type_descs_ is used for codegen
// arg_type_descs_(AnyValUtil::column_type_to_type_desc(
// TypeDescriptor::from_thrift(tnode.agg_expr.arg_types))) {
diff --git a/be/src/exprs/agg_fn.h b/be/src/exprs/agg_fn.h
index 602728e..bed805d 100644
--- a/be/src/exprs/agg_fn.h
+++ b/be/src/exprs/agg_fn.h
@@ -150,6 +150,10 @@ class AggFn : public Expr {
virtual std::string DebugString() const;
static std::string DebugString(const std::vector<AggFn*>& exprs);
+ const int get_vararg_start_idx() const {
+ return _vararg_start_idx;
+ }
+
private:
friend class Expr;
friend class NewAggFnEvaluator;
@@ -178,6 +182,8 @@ private:
void* serialize_fn_ = nullptr;
void* get_value_fn_ = nullptr;
void* finalize_fn_ = nullptr;
+
+ int _vararg_start_idx;
AggFn(const TExprNode& node, const SlotDescriptor& intermediate_slot_desc,
const SlotDescriptor& output_slot_desc);
diff --git a/be/src/exprs/new_agg_fn_evaluator.cc b/be/src/exprs/new_agg_fn_evaluator.cc
index bfc7a2f..d92eabf 100644
--- a/be/src/exprs/new_agg_fn_evaluator.cc
+++ b/be/src/exprs/new_agg_fn_evaluator.cc
@@ -66,6 +66,25 @@ typedef void (*UpdateFn7)(FunctionContext*, const AnyVal&, const AnyVal&,
typedef void (*UpdateFn8)(FunctionContext*, const AnyVal&, const AnyVal&,
const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&,
const AnyVal&, AnyVal*);
+
+typedef void (*VarargUpdateFn0)(FunctionContext*, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn1)(FunctionContext*, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn2)(FunctionContext*, const AnyVal&, const AnyVal&, int num_varargs,
+ const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn3)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+ int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn4)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+ const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn5)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+ const AnyVal&, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn6)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+ const AnyVal&, const AnyVal&, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn7)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+ const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, int num_varargs, const AnyVal*, AnyVal*);
+typedef void (*VarargUpdateFn8)(FunctionContext*, const AnyVal&, const AnyVal&, const AnyVal&,
+ const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, const AnyVal&, int num_varargs,
+ const AnyVal*, AnyVal*);
+
typedef StringVal (*SerializeFn)(FunctionContext*, const StringVal&);
typedef AnyVal (*GetValueFn)(FunctionContext*, const AnyVal&);
typedef AnyVal (*FinalizeFn)(FunctionContext*, const AnyVal&);
@@ -394,58 +413,135 @@ void NewAggFnEvaluator::Update(const TupleRow* row, Tuple* dst, void* fn) {
// TODO: this part is not so good and not scalable. It can be replaced with
// codegen but we can also consider leaving it for the first few cases for
// debugging.
- switch (input_evals_.size()) {
- case 0:
- reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx_.get(), staging_intermediate_val_);
- break;
- case 1:
- reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx_.get(),
- *staging_input_vals_[0], staging_intermediate_val_);
- break;
- case 2:
- reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx_.get(),
- *staging_input_vals_[0], *staging_input_vals_[1], staging_intermediate_val_);
- break;
- case 3:
- reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx_.get(),
- *staging_input_vals_[0], *staging_input_vals_[1],
- *staging_input_vals_[2], staging_intermediate_val_);
- break;
- case 4:
- reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx_.get(),
- *staging_input_vals_[0], *staging_input_vals_[1],
- *staging_input_vals_[2], *staging_input_vals_[3], staging_intermediate_val_);
- break;
- case 5:
- reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx_.get(),
- *staging_input_vals_[0], *staging_input_vals_[1],
- *staging_input_vals_[2], *staging_input_vals_[3],
- *staging_input_vals_[4], staging_intermediate_val_);
- break;
- case 6:
- reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx_.get(),
- *staging_input_vals_[0], *staging_input_vals_[1],
- *staging_input_vals_[2], *staging_input_vals_[3],
- *staging_input_vals_[4], *staging_input_vals_[5], staging_intermediate_val_);
- break;
- case 7:
- reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx_.get(),
- *staging_input_vals_[0], *staging_input_vals_[1],
- *staging_input_vals_[2], *staging_input_vals_[3],
- *staging_input_vals_[4], *staging_input_vals_[5],
- *staging_input_vals_[6], staging_intermediate_val_);
- break;
- case 8:
- reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx_.get(),
- *staging_input_vals_[0], *staging_input_vals_[1],
- *staging_input_vals_[2], *staging_input_vals_[3],
- *staging_input_vals_[4], *staging_input_vals_[5],
- *staging_input_vals_[6], *staging_input_vals_[7],
- staging_intermediate_val_);
- break;
- default:
- DCHECK(false) << "NYI";
- }
+ if (agg_fn_.get_vararg_start_idx() == -1) {
+ switch (input_evals_.size()) {
+ case 0:
+ reinterpret_cast<UpdateFn0>(fn)(agg_fn_ctx_.get(), staging_intermediate_val_);
+ break;
+ case 1:
+ reinterpret_cast<UpdateFn1>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], staging_intermediate_val_);
+ break;
+ case 2:
+ reinterpret_cast<UpdateFn2>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ staging_intermediate_val_);
+ break;
+ case 3:
+ reinterpret_cast<UpdateFn3>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], staging_intermediate_val_);
+ break;
+ case 4:
+ reinterpret_cast<UpdateFn4>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ staging_intermediate_val_);
+ break;
+ case 5:
+ reinterpret_cast<UpdateFn5>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ *staging_input_vals_[4], staging_intermediate_val_);
+ break;
+ case 6:
+ reinterpret_cast<UpdateFn6>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ *staging_input_vals_[4], *staging_input_vals_[5],
+ staging_intermediate_val_);
+ break;
+ case 7:
+ reinterpret_cast<UpdateFn7>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ *staging_input_vals_[4], *staging_input_vals_[5],
+ *staging_input_vals_[6], staging_intermediate_val_);
+ break;
+ case 8:
+ reinterpret_cast<UpdateFn8>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ *staging_input_vals_[4], *staging_input_vals_[5],
+ *staging_input_vals_[6], *staging_input_vals_[7],
+ staging_intermediate_val_);
+ break;
+ default:
+ DCHECK(false) << "NYI";
+ }
+ } else {
+ int num_varargs = input_evals_.size() - agg_fn_.get_vararg_start_idx();
+ const AnyVal* varargs = *(staging_input_vals_.data() + agg_fn_.get_vararg_start_idx());
+ switch (agg_fn_.get_vararg_start_idx()) {
+ case 0:
+ reinterpret_cast<VarargUpdateFn0>(fn)(agg_fn_ctx_.get(),
+ num_varargs, varargs,
+ staging_intermediate_val_);
+ break;
+ case 1:
+ reinterpret_cast<VarargUpdateFn1>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0],
+ num_varargs, varargs,
+ staging_intermediate_val_);
+ break;
+ case 2:
+ reinterpret_cast<VarargUpdateFn2>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ num_varargs, varargs,
+ staging_intermediate_val_);
+ break;
+ case 3:
+ reinterpret_cast<VarargUpdateFn3>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2],
+ num_varargs, varargs,
+ staging_intermediate_val_);
+ break;
+ case 4:
+ reinterpret_cast<VarargUpdateFn4>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ num_varargs, varargs,
+ staging_intermediate_val_);
+ break;
+ case 5:
+ reinterpret_cast<VarargUpdateFn5>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ *staging_input_vals_[4],
+ num_varargs, varargs,
+ staging_intermediate_val_);
+ break;
+ case 6:
+ reinterpret_cast<VarargUpdateFn6>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ *staging_input_vals_[4], *staging_input_vals_[5],
+ num_varargs, varargs,
+ staging_intermediate_val_);
+ break;
+ case 7:
+ reinterpret_cast<VarargUpdateFn7>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ *staging_input_vals_[4], *staging_input_vals_[5],
+ *staging_input_vals_[6],
+ num_varargs, varargs,
+ staging_intermediate_val_);
+ break;
+ case 8:
+ reinterpret_cast<VarargUpdateFn8>(fn)(agg_fn_ctx_.get(),
+ *staging_input_vals_[0], *staging_input_vals_[1],
+ *staging_input_vals_[2], *staging_input_vals_[3],
+ *staging_input_vals_[4], *staging_input_vals_[5],
+ *staging_input_vals_[6], *staging_input_vals_[7],
+ num_varargs, varargs,
+ staging_intermediate_val_);
+ break;
+ default:
+ DCHECK(false) << "NYI";
+ }
+ }
SetDstSlot(staging_intermediate_val_, slot_desc, dst);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org