You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Xu Mingmin (JIRA)" <ji...@apache.org> on 2017/08/10 00:40:00 UTC
[jira] [Commented] (BEAM-2747) accept CombineFn as UDAF
[ https://issues.apache.org/jira/browse/BEAM-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120886#comment-16120886 ]
Xu Mingmin commented on BEAM-2747:
----------------------------------
[~takidau] [~xumingming]
I was swamped by a technical question to support CombineFn. Please advice for any solution.
Due to the limitation in Calcite:
1. the input parameter of UDAF can only be a AggregateFunctionImpl when call `SchemaPlus.add(fnName, fn)`;
2. AggregateFunctionImpl is not extensible, with only one public static `create` method to take a `Class` argument;
None of below solutions works so far:
1. An adaptor, which need a constructor like `public BeamUdafCombineFnAdaptor(CombineFn)`, doesn't work as `AggregateFunctionImpl` only takes a `Class` as input;
2. An abstract BeamSqlUdaf<InputT, AccumT, OutputT> which wraps CombineFn, users need to extend this class and implement `public abstract CombineFn<InputT, AccumT, OutputT> getCombineFn();`, like
{code}
public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable {
public BeamSqlUdaf(){}
public abstract CombineFn<InputT, AccumT, OutputT> getCombineFn();
public final AccumT init(){ return getCombineFn().createAccumulator(); }
public final AccumT add(AccumT accumulator, InputT input){ return getCombineFn().addInput(accumulator, input); }
public final AccumT merge(Iterable<AccumT> accumulators){ return getCombineFn().mergeAccumulators(accumulators); }
public final OutputT result(AccumT accumulator){ return getCombineFn().extractOutput(accumulator); }
public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry) { ... }
}
{code}
Calcite fails to get the actual type for InputT, AccumT, OutputT due to Type Erasure in Java(https://en.wikipedia.org/wiki/Generics_in_Java#Problems_with_type_erasure).
The only workable way I know is, keeping the interface as it is and guide users to use an existing CombineFn like:
{code}
public static class SquareSum extends BeamSqlUdaf<Integer, int[], Integer> {
public SquareSum() {}
public CombineFn<Integer, int[], Integer> getCombineFn() { return Sum.ofIntegers(); }
public int[] init() { return super.init(); }
public int[] add(int[] accumulator, Integer input) { return super.add(accumulator, input); }
public int[] merge(Iterable<int[]> accumulators) { return super.merge(accumulators); }
public Integer result(int[] accumulator) { return super.result(accumulator); }
}
{code}
> accept CombineFn as UDAF
> ------------------------
>
> Key: BEAM-2747
> URL: https://issues.apache.org/jira/browse/BEAM-2747
> Project: Beam
> Issue Type: Sub-task
> Components: dsl-sql
> Reporter: Xu Mingmin
> Assignee: Xu Mingmin
> Labels: dsl_sql_merge, dsl_sql_review
>
> refer to https://docs.google.com/document/d/1VlAqKLhQua6YpR42KuziIYHTB1FfQTi16Mh88MEYBsU/edit?disco=AAAABTmjhgo
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)