You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "kandy.wang" <ka...@163.com> on 2020/11/24 12:32:12 UTC

flink 自定义AggregateFunction 如何识别HyperLogLog对象?

自定义AggregateFunction 实现了UV的 HLL 近似计算,问题是 HyperLogLog 是第三方包,这个如何让flink 识别 ?
就不知道这个TypeInformation该如何写。


代码如下:
import io.airlift.slice.Slices;
import io.airlift.stats.cardinality.HyperLogLog;
import org.apache.flink.table.functions.AggregateFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.Iterator;




public class FlinkUDAFCardinalityEstimationFunction extends AggregateFunction<Long, HyperLogLog> {


    private static final Logger LOG = LoggerFactory.getLogger(JsonArrayParseUDTF.class);


    private static final int NUMBER_OF_BUCKETS = 4096;


    @Override
    public HyperLogLog createAccumulator() {
        return HyperLogLog.newInstance(NUMBER_OF_BUCKETS);
    }


    @Override
    public Long getValue(HyperLogLog acc) {
        if(acc == null){
            return 0L;
        }
        return acc.cardinality();
    }


    public void accumulate(HyperLogLog acc, String element) {
        if(element == null){
            return;
        }
        acc.add(Slices.utf8Slice(element));
    }


    public void retract(HyperLogLog acc, byte[] element) {
        // do nothing
        LOG.info("-- retract:" + new String(element));
    }


    public void merge(HyperLogLog acc, Iterable<HyperLogLog> it) {
        Iterator<HyperLogLog> iter = it.iterator();
        while (iter.hasNext()) {
            HyperLogLog a = iter.next();
            if(a != null) {
                acc.mergeWith(a);
            }
        }
    }


    public void resetAccumulator(HyperLogLog acc) {
        acc = HyperLogLog.newInstance(NUMBER_OF_BUCKETS);
    }
}