You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 赢峰 <si...@163.com> on 2022/05/22 14:35:46 UTC

在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常


在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 输出数据。在调用的时候参考文档的使用方式:
```
tEnv.from("stu_score")
    .groupBy($("course"))
    .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
    .select($("course"), $("f0"), $("f1"))
```
使用默认 blink Planner,会抛出如下异常:
```
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find an implementation method 'emitValue' in class 'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for function 'Top2' that matches the following signature:
void emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, org.apache.flink.util.Collector)
```
但是使用 Old Planner,则会正常输出:
```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings
        .newInstance()
        .useOldPlanner()
        .inStreamingMode()
        .build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
```
这是什么地方使用有问题?




 

Re:Re:Re:Re:Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

Posted by sjf0115 <si...@163.com>.
好的 谢谢
在 2022-05-24 21:23:56,"Xuyang" <xy...@163.com> 写道:
>Hi, 我debug到了代码里,似乎是个bug。如果使用OldPlanner的话,emitValue和emitUpdateWithRetract仅需定义一个就可以了,并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink Planner里,只看有没有定义emitValue。你可以去Flink issue上提一下这个bug
>在 2022-05-23 18:24:17,"sjf0115" <si...@163.com> 写道:
>>Flink 版本:1.13.5
>>
>>
>>
>>
>>函数完整代码如下:
>>```
>>public class Top2RetractTableAggregateFunction extends TableAggregateFunction<Tuple2<Long, Integer>, Top2RetractTableAggregateFunction.Top2RetractAccumulator> {
>>    private static final Logger LOG = LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);
>>    // Top2 聚合中间结果数据结构
>>    public static class Top2RetractAccumulator {
>>        public long beforeFirst = 0;
>>        public long beforeSecond = 0;
>>        public long afterFirst = 0;
>>        public long afterSecond = 0;
>>    }
>>
>>
>>    // 创建 Top2Accumulator 累加器并做初始化
>>    @Override
>>    public Top2RetractAccumulator createAccumulator() {
>>        LOG.info("[INFO] createAccumulator ...........................");
>>        Top2RetractAccumulator acc = new Top2RetractAccumulator();
>>        acc.beforeFirst = Integer.MIN_VALUE;
>>        acc.beforeSecond = Integer.MIN_VALUE;
>>        acc.afterFirst = Integer.MIN_VALUE;
>>        acc.afterSecond = Integer.MIN_VALUE;
>>        return acc;
>>    }
>>
>>
>>    // 接收输入元素并累加到 Accumulator 数据结构
>>    public void accumulate(Top2RetractAccumulator acc, Long value) {
>>        LOG.info("[INFO] accumulate ...........................");
>>        if (value > acc.afterFirst) {
>>            acc.afterSecond = acc.afterFirst;
>>            acc.afterFirst = value;
>>        } else if (value > acc.afterSecond) {
>>            acc.afterSecond = value;
>>        }
>>    }
>>
>>
>>    // 带撤回的输出
>>    public void emitUpdateWithRetract(Top2RetractAccumulator acc, RetractableCollector<Tuple2<Long, Integer>> out) {
>>        LOG.info("[INFO] emitUpdateWithRetract ...........................");
>>        if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
>>            // 撤回旧记录
>>            if (acc.beforeFirst != Integer.MIN_VALUE) {
>>                out.retract(Tuple2.of(acc.beforeFirst, 1));
>>            }
>>            // 输出新记录
>>            out.collect(Tuple2.of(acc.afterFirst, 1));
>>            acc.beforeFirst = acc.afterFirst;
>>        }
>>        if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
>>            // 撤回旧记录
>>            if (acc.beforeSecond != Integer.MIN_VALUE) {
>>                out.retract(Tuple2.of(acc.beforeSecond, 2));
>>            }
>>            // 输出新记录
>>            out.collect(Tuple2.of(acc.afterSecond, 2));
>>            acc.beforeSecond = acc.afterSecond;
>>        }
>>    }
>>}
>>```
>>调用完整代码如下:
>>```
>>// 执行环境
>>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>env.setParallelism(1);
>>EnvironmentSettings settings = EnvironmentSettings
>>        .newInstance()
>>        .useOldPlanner() // Blink Planner 异常 Old Planner 可以
>>        .inStreamingMode()
>>        .build();
>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>
>>
>>DataStream<Row> sourceStream = env.fromElements(
>>        Row.of("李雷", "语文", 78),
>>        Row.of("韩梅梅", "语文", 50),
>>        Row.of("李雷", "语文", 99),
>>        Row.of("韩梅梅", "语文", 80),
>>        Row.of("李雷", "英语", 90),
>>        Row.of("韩梅梅", "英语", 40),
>>        Row.of("李雷", "英语", 98),
>>        Row.of("韩梅梅", "英语", 88)
>>);
>>
>>
>>// 注册虚拟表
>>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), $("score"));
>>// 注册临时i系统函数
>>tEnv.createTemporarySystemFunction("Top2", new Top2RetractTableAggregateFunction());
>>// 调用函数
>>tEnv.from("stu_score")
>>        .groupBy($("course"))
>>        .flatAggregate(call("Top2", $("score")).as("score", "rank"))
>>        .select($("course"), $("score"), $("rank"))
>>        .execute()
>>        .print();
>>```
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-05-23 18:21:42,"sjf0115" <si...@163.com> 写道:
>>>函数代码如下:<br/>```<br/>public class Top2RetractTableAggregateFunction extends TableAggregateFunction&lt;Tuple2&lt;Long, Integer&gt;, Top2RetractTableAggregateFunction.Top2RetractAccumulator&gt; {<br/>    private static final Logger LOG = LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);<br/>    // Top2 聚合中间结果数据结构<br/>    public static class Top2RetractAccumulator {<br/>        public long beforeFirst = 0;<br/>        public long beforeSecond = 0;<br/>        public long afterFirst = 0;<br/>        public long afterSecond = 0;<br/>    }<br/><br/>    // 创建 Top2Accumulator 累加器并做初始化<br/>    @Override<br/>    public Top2RetractAccumulator createAccumulator() {<br/>        LOG.info("[INFO] createAccumulator ...........................");<br/>        Top2RetractAccumulator acc = new Top2RetractAccumulator();<br/>        acc.beforeFirst = Integer.MIN_VALUE;<br/>        acc.beforeSecond = Integer.MIN_VALUE;<br/>        acc.afterFirst = Integer.MIN_VALUE;<br/>        acc.afterSecond = Integer.MIN_VALUE;<br/>        return acc;<br/>    }<br/><br/>    // 接收输入元素并累加到 Accumulator 数据结构<br/>    public void accumulate(Top2RetractAccumulator acc, Long value) {<br/>        LOG.info("[INFO] accumulate ...........................");<br/>        if (value &gt; acc.afterFirst) {<br/>            acc.afterSecond = acc.afterFirst;<br/>            acc.afterFirst = value;<br/>        } else if (value &gt; acc.afterSecond) {<br/>            acc.afterSecond = value;<br/>        }<br/>    }<br/><br/>    // 带撤回的输出<br/>    public void emitUpdateWithRetract(Top2RetractAccumulator acc, RetractableCollector&lt;Tuple2&lt;Long, Integer&gt;&gt; out) {<br/>        LOG.info("[INFO] emitUpdateWithRetract ...........................");<br/>        if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {<br/>            // 撤回旧记录<br/>            if (acc.beforeFirst != Integer.MIN_VALUE) {<br/>                out.retract(Tuple2.of(acc.beforeFirst, 1));<br/>            }<br/>            // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterFirst, 1));<br/>            acc.beforeFirst = acc.afterFirst;<br/>        }<br/>        if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {<br/>            // 撤回旧记录<br/>            if (acc.beforeSecond != Integer.MIN_VALUE) {<br/>                out.retract(Tuple2.of(acc.beforeSecond, 2));<br/>            }<br/>            // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterSecond, 2));<br/>            acc.beforeSecond = acc.afterSecond;<br/>        }<br/>    }<br/>}<br/>```<br/>完整调用代码:<br/>```<br/>// 执行环境<br/>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();<br/>env.setParallelism(1);<br/>EnvironmentSettings settings = EnvironmentSettings<br/>        .newInstance()<br/>        .useOldPlanner() // Blink Planner 异常 Old Planner 可以<br/>        .inStreamingMode()<br/>        .build();<br/>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);<br/><br/>DataStream&lt;Row&gt; sourceStream = env.fromElements(<br/>        Row.of("李雷", "语文", 78),<br/>        Row.of("韩梅梅", "语文", 50),<br/>        Row.of("李雷", "语文", 99),<br/>        Row.of("韩梅梅", "语文", 80),<br/>        Row.of("李雷", "英语", 90),<br/>        Row.of("韩梅梅", "英语", 40),<br/>        Row.of("李雷", "英语", 98),<br/>        Row.of("韩梅梅", "英语", 88)<br/>);<br/><br/>// 注册虚拟表<br/>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), $("score"));<br/>// 注册临时i系统函数<br/>tEnv.createTemporarySystemFunction("Top2", new Top2RetractTableAggregateFunction());<br/>// 调用函数<br/>tEnv.from("stu_score")<br/>        .groupBy($("course"))<br/>        .flatAggregate(call("Top2", $("score")).as("score", "rank"))<br/>        .select($("course"), $("score"), $("rank"))<br/>        .execute()<br/>        .print();<br/>```<br/>Flink 版本:1.13.5
>>>在 2022-05-23 09:55:40,"Xuyang" <xy...@163.com> 写道:
>>>>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。
>>>>
>>>>
>>>>
>>>>
>>>>--
>>>>
>>>>    Best!
>>>>    Xuyang
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>在 2022-05-22 22:35:46,"赢峰" <si...@163.com> 写道:
>>>>>
>>>>>
>>>>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 输出数据。在调用的时候参考文档的使用方式:
>>>>>```
>>>>>tEnv.from("stu_score")
>>>>>    .groupBy($("course"))
>>>>>    .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>>>>>    .select($("course"), $("f0"), $("f1"))
>>>>>```
>>>>>使用默认 blink Planner,会抛出如下异常:
>>>>>```
>>>>>Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find an implementation method 'emitValue' in class 'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for function 'Top2' that matches the following signature:
>>>>>void emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, org.apache.flink.util.Collector)
>>>>>```
>>>>>但是使用 Old Planner,则会正常输出:
>>>>>```
>>>>>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>env.setParallelism(1);
>>>>>EnvironmentSettings settings = EnvironmentSettings
>>>>>        .newInstance()
>>>>>        .useOldPlanner()
>>>>>        .inStreamingMode()
>>>>>        .build();
>>>>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>>>>```
>>>>>这是什么地方使用有问题?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 

Re:Re:Re:Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

Posted by Xuyang <xy...@163.com>.
Hi, 我debug到了代码里,似乎是个bug。如果使用OldPlanner的话,emitValue和emitUpdateWithRetract仅需定义一个就可以了,并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink Planner里,只看有没有定义emitValue。你可以去Flink issue上提一下这个bug
在 2022-05-23 18:24:17,"sjf0115" <si...@163.com> 写道:
>Flink 版本:1.13.5
>
>
>
>
>函数完整代码如下:
>```
>public class Top2RetractTableAggregateFunction extends TableAggregateFunction<Tuple2<Long, Integer>, Top2RetractTableAggregateFunction.Top2RetractAccumulator> {
>    private static final Logger LOG = LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);
>    // Top2 聚合中间结果数据结构
>    public static class Top2RetractAccumulator {
>        public long beforeFirst = 0;
>        public long beforeSecond = 0;
>        public long afterFirst = 0;
>        public long afterSecond = 0;
>    }
>
>
>    // 创建 Top2Accumulator 累加器并做初始化
>    @Override
>    public Top2RetractAccumulator createAccumulator() {
>        LOG.info("[INFO] createAccumulator ...........................");
>        Top2RetractAccumulator acc = new Top2RetractAccumulator();
>        acc.beforeFirst = Integer.MIN_VALUE;
>        acc.beforeSecond = Integer.MIN_VALUE;
>        acc.afterFirst = Integer.MIN_VALUE;
>        acc.afterSecond = Integer.MIN_VALUE;
>        return acc;
>    }
>
>
>    // 接收输入元素并累加到 Accumulator 数据结构
>    public void accumulate(Top2RetractAccumulator acc, Long value) {
>        LOG.info("[INFO] accumulate ...........................");
>        if (value > acc.afterFirst) {
>            acc.afterSecond = acc.afterFirst;
>            acc.afterFirst = value;
>        } else if (value > acc.afterSecond) {
>            acc.afterSecond = value;
>        }
>    }
>
>
>    // 带撤回的输出
>    public void emitUpdateWithRetract(Top2RetractAccumulator acc, RetractableCollector<Tuple2<Long, Integer>> out) {
>        LOG.info("[INFO] emitUpdateWithRetract ...........................");
>        if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
>            // 撤回旧记录
>            if (acc.beforeFirst != Integer.MIN_VALUE) {
>                out.retract(Tuple2.of(acc.beforeFirst, 1));
>            }
>            // 输出新记录
>            out.collect(Tuple2.of(acc.afterFirst, 1));
>            acc.beforeFirst = acc.afterFirst;
>        }
>        if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
>            // 撤回旧记录
>            if (acc.beforeSecond != Integer.MIN_VALUE) {
>                out.retract(Tuple2.of(acc.beforeSecond, 2));
>            }
>            // 输出新记录
>            out.collect(Tuple2.of(acc.afterSecond, 2));
>            acc.beforeSecond = acc.afterSecond;
>        }
>    }
>}
>```
>调用完整代码如下:
>```
>// 执行环境
>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>EnvironmentSettings settings = EnvironmentSettings
>        .newInstance()
>        .useOldPlanner() // Blink Planner 异常 Old Planner 可以
>        .inStreamingMode()
>        .build();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>
>
>DataStream<Row> sourceStream = env.fromElements(
>        Row.of("李雷", "语文", 78),
>        Row.of("韩梅梅", "语文", 50),
>        Row.of("李雷", "语文", 99),
>        Row.of("韩梅梅", "语文", 80),
>        Row.of("李雷", "英语", 90),
>        Row.of("韩梅梅", "英语", 40),
>        Row.of("李雷", "英语", 98),
>        Row.of("韩梅梅", "英语", 88)
>);
>
>
>// 注册虚拟表
>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), $("score"));
>// 注册临时i系统函数
>tEnv.createTemporarySystemFunction("Top2", new Top2RetractTableAggregateFunction());
>// 调用函数
>tEnv.from("stu_score")
>        .groupBy($("course"))
>        .flatAggregate(call("Top2", $("score")).as("score", "rank"))
>        .select($("course"), $("score"), $("rank"))
>        .execute()
>        .print();
>```
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-05-23 18:21:42,"sjf0115" <si...@163.com> 写道:
>>函数代码如下:<br/>```<br/>public class Top2RetractTableAggregateFunction extends TableAggregateFunction&lt;Tuple2&lt;Long, Integer&gt;, Top2RetractTableAggregateFunction.Top2RetractAccumulator&gt; {<br/>    private static final Logger LOG = LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);<br/>    // Top2 聚合中间结果数据结构<br/>    public static class Top2RetractAccumulator {<br/>        public long beforeFirst = 0;<br/>        public long beforeSecond = 0;<br/>        public long afterFirst = 0;<br/>        public long afterSecond = 0;<br/>    }<br/><br/>    // 创建 Top2Accumulator 累加器并做初始化<br/>    @Override<br/>    public Top2RetractAccumulator createAccumulator() {<br/>        LOG.info("[INFO] createAccumulator ...........................");<br/>        Top2RetractAccumulator acc = new Top2RetractAccumulator();<br/>        acc.beforeFirst = Integer.MIN_VALUE;<br/>        acc.beforeSecond = Integer.MIN_VALUE;<br/>        acc.afterFirst = Integer.MIN_VALUE;<br/>        acc.afterSecond = Integer.MIN_VALUE;<br/>        return acc;<br/>    }<br/><br/>    // 接收输入元素并累加到 Accumulator 数据结构<br/>    public void accumulate(Top2RetractAccumulator acc, Long value) {<br/>        LOG.info("[INFO] accumulate ...........................");<br/>        if (value &gt; acc.afterFirst) {<br/>            acc.afterSecond = acc.afterFirst;<br/>            acc.afterFirst = value;<br/>        } else if (value &gt; acc.afterSecond) {<br/>            acc.afterSecond = value;<br/>        }<br/>    }<br/><br/>    // 带撤回的输出<br/>    public void emitUpdateWithRetract(Top2RetractAccumulator acc, RetractableCollector&lt;Tuple2&lt;Long, Integer&gt;&gt; out) {<br/>        LOG.info("[INFO] emitUpdateWithRetract ...........................");<br/>        if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {<br/>            // 撤回旧记录<br/>            if (acc.beforeFirst != Integer.MIN_VALUE) {<br/>                out.retract(Tuple2.of(acc.beforeFirst, 1));<br/>            }<br/>            // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterFirst, 1));<br/>            acc.beforeFirst = acc.afterFirst;<br/>        }<br/>        if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {<br/>            // 撤回旧记录<br/>            if (acc.beforeSecond != Integer.MIN_VALUE) {<br/>                out.retract(Tuple2.of(acc.beforeSecond, 2));<br/>            }<br/>            // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterSecond, 2));<br/>            acc.beforeSecond = acc.afterSecond;<br/>        }<br/>    }<br/>}<br/>```<br/>完整调用代码:<br/>```<br/>// 执行环境<br/>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();<br/>env.setParallelism(1);<br/>EnvironmentSettings settings = EnvironmentSettings<br/>        .newInstance()<br/>        .useOldPlanner() // Blink Planner 异常 Old Planner 可以<br/>        .inStreamingMode()<br/>        .build();<br/>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);<br/><br/>DataStream&lt;Row&gt; sourceStream = env.fromElements(<br/>        Row.of("李雷", "语文", 78),<br/>        Row.of("韩梅梅", "语文", 50),<br/>        Row.of("李雷", "语文", 99),<br/>        Row.of("韩梅梅", "语文", 80),<br/>        Row.of("李雷", "英语", 90),<br/>        Row.of("韩梅梅", "英语", 40),<br/>        Row.of("李雷", "英语", 98),<br/>        Row.of("韩梅梅", "英语", 88)<br/>);<br/><br/>// 注册虚拟表<br/>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), $("score"));<br/>// 注册临时i系统函数<br/>tEnv.createTemporarySystemFunction("Top2", new Top2RetractTableAggregateFunction());<br/>// 调用函数<br/>tEnv.from("stu_score")<br/>        .groupBy($("course"))<br/>        .flatAggregate(call("Top2", $("score")).as("score", "rank"))<br/>        .select($("course"), $("score"), $("rank"))<br/>        .execute()<br/>        .print();<br/>```<br/>Flink 版本:1.13.5
>>在 2022-05-23 09:55:40,"Xuyang" <xy...@163.com> 写道:
>>>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。
>>>
>>>
>>>
>>>
>>>--
>>>
>>>    Best!
>>>    Xuyang
>>>
>>>
>>>
>>>
>>>
>>>在 2022-05-22 22:35:46,"赢峰" <si...@163.com> 写道:
>>>>
>>>>
>>>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 输出数据。在调用的时候参考文档的使用方式:
>>>>```
>>>>tEnv.from("stu_score")
>>>>    .groupBy($("course"))
>>>>    .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>>>>    .select($("course"), $("f0"), $("f1"))
>>>>```
>>>>使用默认 blink Planner,会抛出如下异常:
>>>>```
>>>>Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find an implementation method 'emitValue' in class 'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for function 'Top2' that matches the following signature:
>>>>void emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, org.apache.flink.util.Collector)
>>>>```
>>>>但是使用 Old Planner,则会正常输出:
>>>>```
>>>>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>env.setParallelism(1);
>>>>EnvironmentSettings settings = EnvironmentSettings
>>>>        .newInstance()
>>>>        .useOldPlanner()
>>>>        .inStreamingMode()
>>>>        .build();
>>>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>>>```
>>>>这是什么地方使用有问题?
>>>>
>>>>
>>>>
>>>>
>>>> 

Re:Re:Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

Posted by sjf0115 <si...@163.com>.
Flink 版本:1.13.5




函数完整代码如下:
```
public class Top2RetractTableAggregateFunction extends TableAggregateFunction<Tuple2<Long, Integer>, Top2RetractTableAggregateFunction.Top2RetractAccumulator> {
    private static final Logger LOG = LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);
    // Top2 聚合中间结果数据结构
    public static class Top2RetractAccumulator {
        public long beforeFirst = 0;
        public long beforeSecond = 0;
        public long afterFirst = 0;
        public long afterSecond = 0;
    }


    // 创建 Top2Accumulator 累加器并做初始化
    @Override
    public Top2RetractAccumulator createAccumulator() {
        LOG.info("[INFO] createAccumulator ...........................");
        Top2RetractAccumulator acc = new Top2RetractAccumulator();
        acc.beforeFirst = Integer.MIN_VALUE;
        acc.beforeSecond = Integer.MIN_VALUE;
        acc.afterFirst = Integer.MIN_VALUE;
        acc.afterSecond = Integer.MIN_VALUE;
        return acc;
    }


    // 接收输入元素并累加到 Accumulator 数据结构
    public void accumulate(Top2RetractAccumulator acc, Long value) {
        LOG.info("[INFO] accumulate ...........................");
        if (value > acc.afterFirst) {
            acc.afterSecond = acc.afterFirst;
            acc.afterFirst = value;
        } else if (value > acc.afterSecond) {
            acc.afterSecond = value;
        }
    }


    // 带撤回的输出
    public void emitUpdateWithRetract(Top2RetractAccumulator acc, RetractableCollector<Tuple2<Long, Integer>> out) {
        LOG.info("[INFO] emitUpdateWithRetract ...........................");
        if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
            // 撤回旧记录
            if (acc.beforeFirst != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.beforeFirst, 1));
            }
            // 输出新记录
            out.collect(Tuple2.of(acc.afterFirst, 1));
            acc.beforeFirst = acc.afterFirst;
        }
        if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
            // 撤回旧记录
            if (acc.beforeSecond != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.beforeSecond, 2));
            }
            // 输出新记录
            out.collect(Tuple2.of(acc.afterSecond, 2));
            acc.beforeSecond = acc.afterSecond;
        }
    }
}
```
调用完整代码如下:
```
// 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings
        .newInstance()
        .useOldPlanner() // Blink Planner 异常 Old Planner 可以
        .inStreamingMode()
        .build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);


DataStream<Row> sourceStream = env.fromElements(
        Row.of("李雷", "语文", 78),
        Row.of("韩梅梅", "语文", 50),
        Row.of("李雷", "语文", 99),
        Row.of("韩梅梅", "语文", 80),
        Row.of("李雷", "英语", 90),
        Row.of("韩梅梅", "英语", 40),
        Row.of("李雷", "英语", 98),
        Row.of("韩梅梅", "英语", 88)
);


// 注册虚拟表
tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), $("score"));
// 注册临时i系统函数
tEnv.createTemporarySystemFunction("Top2", new Top2RetractTableAggregateFunction());
// 调用函数
tEnv.from("stu_score")
        .groupBy($("course"))
        .flatAggregate(call("Top2", $("score")).as("score", "rank"))
        .select($("course"), $("score"), $("rank"))
        .execute()
        .print();
```





















在 2022-05-23 18:21:42,"sjf0115" <si...@163.com> 写道:
>函数代码如下:<br/>```<br/>public class Top2RetractTableAggregateFunction extends TableAggregateFunction&lt;Tuple2&lt;Long, Integer&gt;, Top2RetractTableAggregateFunction.Top2RetractAccumulator&gt; {<br/>    private static final Logger LOG = LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);<br/>    // Top2 聚合中间结果数据结构<br/>    public static class Top2RetractAccumulator {<br/>        public long beforeFirst = 0;<br/>        public long beforeSecond = 0;<br/>        public long afterFirst = 0;<br/>        public long afterSecond = 0;<br/>    }<br/><br/>    // 创建 Top2Accumulator 累加器并做初始化<br/>    @Override<br/>    public Top2RetractAccumulator createAccumulator() {<br/>        LOG.info("[INFO] createAccumulator ...........................");<br/>        Top2RetractAccumulator acc = new Top2RetractAccumulator();<br/>        acc.beforeFirst = Integer.MIN_VALUE;<br/>        acc.beforeSecond = Integer.MIN_VALUE;<br/>        acc.afterFirst = Integer.MIN_VALUE;<br/>        acc.afterSecond = Integer.MIN_VALUE;<br/>        return acc;<br/>    }<br/><br/>    // 接收输入元素并累加到 Accumulator 数据结构<br/>    public void accumulate(Top2RetractAccumulator acc, Long value) {<br/>        LOG.info("[INFO] accumulate ...........................");<br/>        if (value &gt; acc.afterFirst) {<br/>            acc.afterSecond = acc.afterFirst;<br/>            acc.afterFirst = value;<br/>        } else if (value &gt; acc.afterSecond) {<br/>            acc.afterSecond = value;<br/>        }<br/>    }<br/><br/>    // 带撤回的输出<br/>    public void emitUpdateWithRetract(Top2RetractAccumulator acc, RetractableCollector&lt;Tuple2&lt;Long, Integer&gt;&gt; out) {<br/>        LOG.info("[INFO] emitUpdateWithRetract ...........................");<br/>        if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {<br/>            // 撤回旧记录<br/>            if (acc.beforeFirst != Integer.MIN_VALUE) {<br/>                out.retract(Tuple2.of(acc.beforeFirst, 1));<br/>            }<br/>            // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterFirst, 1));<br/>            acc.beforeFirst = acc.afterFirst;<br/>        }<br/>        if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {<br/>            // 撤回旧记录<br/>            if (acc.beforeSecond != Integer.MIN_VALUE) {<br/>                out.retract(Tuple2.of(acc.beforeSecond, 2));<br/>            }<br/>            // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterSecond, 2));<br/>            acc.beforeSecond = acc.afterSecond;<br/>        }<br/>    }<br/>}<br/>```<br/>完整调用代码:<br/>```<br/>// 执行环境<br/>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();<br/>env.setParallelism(1);<br/>EnvironmentSettings settings = EnvironmentSettings<br/>        .newInstance()<br/>        .useOldPlanner() // Blink Planner 异常 Old Planner 可以<br/>        .inStreamingMode()<br/>        .build();<br/>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);<br/><br/>DataStream&lt;Row&gt; sourceStream = env.fromElements(<br/>        Row.of("李雷", "语文", 78),<br/>        Row.of("韩梅梅", "语文", 50),<br/>        Row.of("李雷", "语文", 99),<br/>        Row.of("韩梅梅", "语文", 80),<br/>        Row.of("李雷", "英语", 90),<br/>        Row.of("韩梅梅", "英语", 40),<br/>        Row.of("李雷", "英语", 98),<br/>        Row.of("韩梅梅", "英语", 88)<br/>);<br/><br/>// 注册虚拟表<br/>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), $("score"));<br/>// 注册临时i系统函数<br/>tEnv.createTemporarySystemFunction("Top2", new Top2RetractTableAggregateFunction());<br/>// 调用函数<br/>tEnv.from("stu_score")<br/>        .groupBy($("course"))<br/>        .flatAggregate(call("Top2", $("score")).as("score", "rank"))<br/>        .select($("course"), $("score"), $("rank"))<br/>        .execute()<br/>        .print();<br/>```<br/>Flink 版本:1.13.5
>在 2022-05-23 09:55:40,"Xuyang" <xy...@163.com> 写道:
>>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。
>>
>>
>>
>>
>>--
>>
>>    Best!
>>    Xuyang
>>
>>
>>
>>
>>
>>在 2022-05-22 22:35:46,"赢峰" <si...@163.com> 写道:
>>>
>>>
>>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 输出数据。在调用的时候参考文档的使用方式:
>>>```
>>>tEnv.from("stu_score")
>>>    .groupBy($("course"))
>>>    .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>>>    .select($("course"), $("f0"), $("f1"))
>>>```
>>>使用默认 blink Planner,会抛出如下异常:
>>>```
>>>Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find an implementation method 'emitValue' in class 'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for function 'Top2' that matches the following signature:
>>>void emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, org.apache.flink.util.Collector)
>>>```
>>>但是使用 Old Planner,则会正常输出:
>>>```
>>>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>env.setParallelism(1);
>>>EnvironmentSettings settings = EnvironmentSettings
>>>        .newInstance()
>>>        .useOldPlanner()
>>>        .inStreamingMode()
>>>        .build();
>>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>>```
>>>这是什么地方使用有问题?
>>>
>>>
>>>
>>>
>>> 

Re:Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

Posted by sjf0115 <si...@163.com>.
函数代码如下:<br/>```<br/>public class Top2RetractTableAggregateFunction extends TableAggregateFunction&lt;Tuple2&lt;Long, Integer&gt;, Top2RetractTableAggregateFunction.Top2RetractAccumulator&gt; {<br/>    private static final Logger LOG = LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);<br/>    // Top2 聚合中间结果数据结构<br/>    public static class Top2RetractAccumulator {<br/>        public long beforeFirst = 0;<br/>        public long beforeSecond = 0;<br/>        public long afterFirst = 0;<br/>        public long afterSecond = 0;<br/>    }<br/><br/>    // 创建 Top2Accumulator 累加器并做初始化<br/>    @Override<br/>    public Top2RetractAccumulator createAccumulator() {<br/>        LOG.info("[INFO] createAccumulator ...........................");<br/>        Top2RetractAccumulator acc = new Top2RetractAccumulator();<br/>        acc.beforeFirst = Integer.MIN_VALUE;<br/>        acc.beforeSecond = Integer.MIN_VALUE;<br/>        acc.afterFirst = Integer.MIN_VALUE;<br/>        acc.afterSecond = Integer.MIN_VALUE;<br/>        return acc;<br/>    }<br/><br/>    // 接收输入元素并累加到 Accumulator 数据结构<br/>    public void accumulate(Top2RetractAccumulator acc, Long value) {<br/>        LOG.info("[INFO] accumulate ...........................");<br/>        if (value &gt; acc.afterFirst) {<br/>            acc.afterSecond = acc.afterFirst;<br/>            acc.afterFirst = value;<br/>        } else if (value &gt; acc.afterSecond) {<br/>            acc.afterSecond = value;<br/>        }<br/>    }<br/><br/>    // 带撤回的输出<br/>    public void emitUpdateWithRetract(Top2RetractAccumulator acc, RetractableCollector&lt;Tuple2&lt;Long, Integer&gt;&gt; out) {<br/>        LOG.info("[INFO] emitUpdateWithRetract ...........................");<br/>        if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {<br/>            // 撤回旧记录<br/>            if (acc.beforeFirst != Integer.MIN_VALUE) {<br/>                out.retract(Tuple2.of(acc.beforeFirst, 1));<br/>            }<br/>            // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterFirst, 1));<br/>            acc.beforeFirst = acc.afterFirst;<br/>        }<br/>        if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {<br/>            // 撤回旧记录<br/>            if (acc.beforeSecond != Integer.MIN_VALUE) {<br/>                out.retract(Tuple2.of(acc.beforeSecond, 2));<br/>            }<br/>            // 输出新记录<br/>            out.collect(Tuple2.of(acc.afterSecond, 2));<br/>            acc.beforeSecond = acc.afterSecond;<br/>        }<br/>    }<br/>}<br/>```<br/>完整调用代码:<br/>```<br/>// 执行环境<br/>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();<br/>env.setParallelism(1);<br/>EnvironmentSettings settings = EnvironmentSettings<br/>        .newInstance()<br/>        .useOldPlanner() // Blink Planner 异常 Old Planner 可以<br/>        .inStreamingMode()<br/>        .build();<br/>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);<br/><br/>DataStream&lt;Row&gt; sourceStream = env.fromElements(<br/>        Row.of("李雷", "语文", 78),<br/>        Row.of("韩梅梅", "语文", 50),<br/>        Row.of("李雷", "语文", 99),<br/>        Row.of("韩梅梅", "语文", 80),<br/>        Row.of("李雷", "英语", 90),<br/>        Row.of("韩梅梅", "英语", 40),<br/>        Row.of("李雷", "英语", 98),<br/>        Row.of("韩梅梅", "英语", 88)<br/>);<br/><br/>// 注册虚拟表<br/>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), $("score"));<br/>// 注册临时i系统函数<br/>tEnv.createTemporarySystemFunction("Top2", new Top2RetractTableAggregateFunction());<br/>// 调用函数<br/>tEnv.from("stu_score")<br/>        .groupBy($("course"))<br/>        .flatAggregate(call("Top2", $("score")).as("score", "rank"))<br/>        .select($("course"), $("score"), $("rank"))<br/>        .execute()<br/>        .print();<br/>```<br/>Flink 版本:1.13.5
在 2022-05-23 09:55:40,"Xuyang" <xy...@163.com> 写道:
>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。
>
>
>
>
>--
>
>    Best!
>    Xuyang
>
>
>
>
>
>在 2022-05-22 22:35:46,"赢峰" <si...@163.com> 写道:
>>
>>
>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 输出数据。在调用的时候参考文档的使用方式:
>>```
>>tEnv.from("stu_score")
>>    .groupBy($("course"))
>>    .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>>    .select($("course"), $("f0"), $("f1"))
>>```
>>使用默认 blink Planner,会抛出如下异常:
>>```
>>Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find an implementation method 'emitValue' in class 'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for function 'Top2' that matches the following signature:
>>void emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, org.apache.flink.util.Collector)
>>```
>>但是使用 Old Planner,则会正常输出:
>>```
>>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>env.setParallelism(1);
>>EnvironmentSettings settings = EnvironmentSettings
>>        .newInstance()
>>        .useOldPlanner()
>>        .inStreamingMode()
>>        .build();
>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>```
>>这是什么地方使用有问题?
>>
>>
>>
>>
>> 

Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

Posted by Xuyang <xy...@163.com>.
Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。




--

    Best!
    Xuyang





在 2022-05-22 22:35:46,"赢峰" <si...@163.com> 写道:
>
>
>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 输出数据。在调用的时候参考文档的使用方式:
>```
>tEnv.from("stu_score")
>    .groupBy($("course"))
>    .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>    .select($("course"), $("f0"), $("f1"))
>```
>使用默认 blink Planner,会抛出如下异常:
>```
>Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find an implementation method 'emitValue' in class 'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for function 'Top2' that matches the following signature:
>void emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, org.apache.flink.util.Collector)
>```
>但是使用 Old Planner,则会正常输出:
>```
>StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>EnvironmentSettings settings = EnvironmentSettings
>        .newInstance()
>        .useOldPlanner()
>        .inStreamingMode()
>        .build();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>```
>这是什么地方使用有问题?
>
>
>
>
>