You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Xu Mingmin (JIRA)" <ji...@apache.org> on 2017/08/10 00:40:00 UTC

[jira] [Commented] (BEAM-2747) accept CombineFn as UDAF

    [ https://issues.apache.org/jira/browse/BEAM-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120886#comment-16120886 ] 

Xu Mingmin commented on BEAM-2747:
----------------------------------

[~takidau] [~xumingming]
I was swamped by a technical question to support CombineFn. Please advice for any solution.

Due to the limitation in Calcite:
1. the input parameter of UDAF can only be a AggregateFunctionImpl when call `SchemaPlus.add(fnName, fn)`;
2. AggregateFunctionImpl is not extensible, with only one public static `create` method to take a `Class` argument;

None of below solutions works so far:
1. An adaptor, which need a constructor like `public BeamUdafCombineFnAdaptor(CombineFn)`, doesn't work as `AggregateFunctionImpl` only takes a `Class` as input;
2. An abstract BeamSqlUdaf<InputT, AccumT, OutputT> which wraps CombineFn, users need to extend this class and implement `public abstract CombineFn<InputT, AccumT, OutputT> getCombineFn();`, like
{code}
public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable {
  public BeamSqlUdaf(){}

  public abstract CombineFn<InputT, AccumT, OutputT> getCombineFn();

  public final AccumT init(){    return getCombineFn().createAccumulator();  }

  public final AccumT add(AccumT accumulator, InputT input){    return getCombineFn().addInput(accumulator, input);  }

  public final AccumT merge(Iterable<AccumT> accumulators){    return getCombineFn().mergeAccumulators(accumulators);  }

  public final OutputT result(AccumT accumulator){    return getCombineFn().extractOutput(accumulator);  }

  public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry) { ...  }
}
{code}
Calcite fails to get the actual type for InputT, AccumT, OutputT due to Type Erasure in Java(https://en.wikipedia.org/wiki/Generics_in_Java#Problems_with_type_erasure).

The only workable way I know is, keeping the interface as it is and guide users to use an existing CombineFn like:
 {code}
  public static class SquareSum extends BeamSqlUdaf<Integer, int[], Integer> {
    public SquareSum() {}

    public CombineFn<Integer, int[], Integer> getCombineFn() {      return Sum.ofIntegers();    }

    public int[] init() {    return super.init();    }

    public int[] add(int[] accumulator, Integer input) {     return super.add(accumulator, input);    }

    public int[] merge(Iterable<int[]> accumulators) {      return super.merge(accumulators);    }

    public Integer result(int[] accumulator) {     return super.result(accumulator);    }
  } 
{code}

> accept CombineFn as UDAF
> ------------------------
>
>                 Key: BEAM-2747
>                 URL: https://issues.apache.org/jira/browse/BEAM-2747
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-sql
>            Reporter: Xu Mingmin
>            Assignee: Xu Mingmin
>              Labels: dsl_sql_merge, dsl_sql_review
>
> refer to https://docs.google.com/document/d/1VlAqKLhQua6YpR42KuziIYHTB1FfQTi16Mh88MEYBsU/edit?disco=AAAABTmjhgo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)