You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by lingchanhu <li...@163.com> on 2020/11/18 01:49:40 UTC
flink1.11 TableEnvironment 不支持注册 Aggregate Function?
*flink1.11*
在TableEnvironment环境中注册并使用自定义的Aggregate
Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment 注册和使用则是正常,这应该说明自定义的函数是ok的)
org.apache.flink.table.api.TableException: Aggregate functions are not
updated to the new type system yet.
at
org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
at
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
at
org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
at
com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
*// 以下是代码*
// main
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(envSettings);
// 注册source table, jdbc table source
tEnv.executeSql("CREATE TABLE wx_event_log (....) with
('connect.type'='jdbc'),....");
// 注册sink table,csv table sink
tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with
('connect.type'='filesystem','format.type'='csv',.....)");
// 注册agg function
tEnv.createTemporarySystemFunction("firSendMsgFunc",new FirstSendMsgFunc());
Table table2 = tEnv.sqlQuery("select from_user,create_time from wx_event_log
where msg_type='text' and create_time between '2020-03-20' and
'2020-03-21'");
table2.groupBy($("from_user"))
.aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
.select($("from_user"),$("first_send_msg_today"))
.executeInsert("wx_data_statistics");
// 自定义agg function类
public class FirstSendMsgFunc extends
AggregateFunction<LocalDateTime,CountDTO> {
public void accumulate(CountDTO acc, LocalDateTime createTime) {
if (acc.getDateTime() == null) {
acc.setDateTime(createTime);
} else if (acc.getDateTime().isAfter(createTime)) {
acc.setDateTime(createTime);
}
}
@Override
public LocalDateTime getValue(CountDTO acc) {
return acc.getDateTime();
}
@Override
public CountDTO createAccumulator() {
return new CountDTO();
}
}
// accumulate pojo 类
public class CountDTO implements Serializable {
private Integer count;
private LocalDateTime dateTime;
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public LocalDateTime getDateTime() {
return dateTime;
}
public void setDateTime(LocalDateTime dateTime) {
this.dateTime = dateTime;
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/
statementset下source怎么完全复用
Posted by Jeff <zi...@126.com>.
请问一下,flink 1.11statement set 怎么复用同一个source呢? 希望同一个job里不同sink使用完全相同的数据,不是默认的用hash分流,这个有地方设置么?
Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?
Posted by Jark Wu <im...@gmail.com>.
Btw, 1.12 版本 TableEnvironment#createTemporarySystemFunction 接口支持
AggregateFunction了。
On Wed, 18 Nov 2020 at 10:34, Jark Wu <im...@gmail.com> wrote:
> 1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
> AggregateFunction。
> 你说 StreamTableEnvironment 可以,我估计你用的是
> StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。
>
> Best,
> Jark
>
>
> On Wed, 18 Nov 2020 at 09:49, lingchanhu <li...@163.com> wrote:
>
>> *flink1.11*
>> 在TableEnvironment环境中注册并使用自定义的Aggregate
>> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
>> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>>
>> org.apache.flink.table.api.TableException: Aggregate functions are not
>> updated to the new type system yet.
>> at
>>
>> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
>> at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
>> at
>> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
>> at
>>
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>> at
>>
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at
>>
>> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
>> at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
>> at java.util.function.Function.lambda$andThen$1(Function.java:88)
>> at
>>
>> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
>> at
>>
>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>> at
>>
>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>> at
>>
>> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
>> at
>>
>> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
>> at
>>
>> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
>> at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>>
>> *// 以下是代码*
>> // main
>> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inBatchMode()
>> .build();
>>
>> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>>
>> // 注册source table, jdbc table source
>> tEnv.executeSql("CREATE TABLE wx_event_log (....) with
>> ('connect.type'='jdbc'),....");
>>
>> // 注册sink table,csv table sink
>> tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with
>> ('connect.type'='filesystem','format.type'='csv',.....)");
>>
>> // 注册agg function
>> tEnv.createTemporarySystemFunction("firSendMsgFunc",new
>> FirstSendMsgFunc());
>>
>> Table table2 = tEnv.sqlQuery("select from_user,create_time from
>> wx_event_log
>> where msg_type='text' and create_time between '2020-03-20' and
>> '2020-03-21'");
>>
>> table2.groupBy($("from_user"))
>>
>>
>> .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
>> .select($("from_user"),$("first_send_msg_today"))
>> .executeInsert("wx_data_statistics");
>>
>>
>> // 自定义agg function类
>> public class FirstSendMsgFunc extends
>> AggregateFunction<LocalDateTime,CountDTO> {
>>
>> public void accumulate(CountDTO acc, LocalDateTime createTime) {
>> if (acc.getDateTime() == null) {
>> acc.setDateTime(createTime);
>> } else if (acc.getDateTime().isAfter(createTime)) {
>> acc.setDateTime(createTime);
>> }
>> }
>>
>> @Override
>> public LocalDateTime getValue(CountDTO acc) {
>> return acc.getDateTime();
>> }
>>
>> @Override
>> public CountDTO createAccumulator() {
>> return new CountDTO();
>> }
>> }
>>
>> // accumulate pojo 类
>> public class CountDTO implements Serializable {
>>
>> private Integer count;
>>
>> private LocalDateTime dateTime;
>>
>> public Integer getCount() {
>> return count;
>> }
>>
>> public void setCount(Integer count) {
>> this.count = count;
>> }
>>
>> public LocalDateTime getDateTime() {
>> return dateTime;
>> }
>>
>> public void setDateTime(LocalDateTime dateTime) {
>> this.dateTime = dateTime;
>> }
>> }
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>
Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?
Posted by lingchanhu <li...@163.com>.
感谢,已经解决了!
BR,
lingchanhu
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?
Posted by Jark Wu <im...@gmail.com>.
通过 StreamTableEnvironmentImpl 构造函数直接构造一个 isStreamingMode = false
的 StreamTableEnvironmentImpl。
然后就可以在这个上面调用 registerFunction 了。
On Wed, 18 Nov 2020 at 10:40, lingchanhu <li...@163.com> wrote:
> 非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?
Posted by lingchanhu <li...@163.com>.
非常感谢,如果flink1.11 目前不支持的话,那对于这种场景的使用有什么建议么?想要批处理数据,其中又要用到自定义的agg function?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.11 TableEnvironment 不支持注册 Aggregate Function?
Posted by Jark Wu <im...@gmail.com>.
1.11 版本上 TableEnvironment#createTemporarySystemFunction 接口暂时还不支持
AggregateFunction。
你说 StreamTableEnvironment 可以,我估计你用的是
StreamTableEnvironment#registerFunction, 这个是支持 AggregateFunction 的。
Best,
Jark
On Wed, 18 Nov 2020 at 09:49, lingchanhu <li...@163.com> wrote:
> *flink1.11*
> 在TableEnvironment环境中注册并使用自定义的Aggregate
> Function时,报出以下错误。下面贴有代码(若是在StreamTableEnvironment
> 注册和使用则是正常,这应该说明自定义的函数是ok的)
>
> org.apache.flink.table.api.TableException: Aggregate functions are not
> updated to the new type system yet.
> at
>
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:152)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.getOptionalTypeInference(ResolveCallByArgumentsRule.java:183)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:112)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule$ResolvingCallVisitor.visit(ResolveCallByArgumentsRule.java:89)
> at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.lambda$apply$0(ResolveCallByArgumentsRule.java:83)
> at
> java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
> at
>
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
>
> org.apache.flink.table.expressions.resolver.rules.ResolveCallByArgumentsRule.apply(ResolveCallByArgumentsRule.java:84)
> at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$1(ExpressionResolver.java:211)
> at java.util.function.Function.lambda$andThen$1(Function.java:88)
> at
>
> org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:178)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.getAggregate(OperationTreeBuilder.java:651)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:616)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder$ExtractAliasAndAggregate.visit(OperationTreeBuilder.java:598)
> at
>
> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
> at
>
> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
> at
>
> org.apache.flink.table.operations.utils.OperationTreeBuilder.aggregate(OperationTreeBuilder.java:511)
> at
>
> org.apache.flink.table.api.internal.TableImpl$AggregatedTableImpl.select(TableImpl.java:685)
> at
>
> com.ideacom.flink.demo.example.BatchTableExample.demo(BatchTableExample.java:48)
> at com.ideacom.flink.demo.TableSqlJob.main(TableSqlJob.java:36)
>
> *// 以下是代码*
> // main
> EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inBatchMode()
> .build();
>
> TableEnvironment tEnv = TableEnvironment.create(envSettings);
>
> // 注册source table, jdbc table source
> tEnv.executeSql("CREATE TABLE wx_event_log (....) with
> ('connect.type'='jdbc'),....");
>
> // 注册sink table,csv table sink
> tEnv.executeSql("CREATE TABLE wx_data_statistics (....) with
> ('connect.type'='filesystem','format.type'='csv',.....)");
>
> // 注册agg function
> tEnv.createTemporarySystemFunction("firSendMsgFunc",new
> FirstSendMsgFunc());
>
> Table table2 = tEnv.sqlQuery("select from_user,create_time from
> wx_event_log
> where msg_type='text' and create_time between '2020-03-20' and
> '2020-03-21'");
>
> table2.groupBy($("from_user"))
>
>
> .aggregate(call("firSendMsgFunc",$("create_time")).as("first_send_msg_today"))
> .select($("from_user"),$("first_send_msg_today"))
> .executeInsert("wx_data_statistics");
>
>
> // 自定义agg function类
> public class FirstSendMsgFunc extends
> AggregateFunction<LocalDateTime,CountDTO> {
>
> public void accumulate(CountDTO acc, LocalDateTime createTime) {
> if (acc.getDateTime() == null) {
> acc.setDateTime(createTime);
> } else if (acc.getDateTime().isAfter(createTime)) {
> acc.setDateTime(createTime);
> }
> }
>
> @Override
> public LocalDateTime getValue(CountDTO acc) {
> return acc.getDateTime();
> }
>
> @Override
> public CountDTO createAccumulator() {
> return new CountDTO();
> }
> }
>
> // accumulate pojo 类
> public class CountDTO implements Serializable {
>
> private Integer count;
>
> private LocalDateTime dateTime;
>
> public Integer getCount() {
> return count;
> }
>
> public void setCount(Integer count) {
> this.count = count;
> }
>
> public LocalDateTime getDateTime() {
> return dateTime;
> }
>
> public void setDateTime(LocalDateTime dateTime) {
> this.dateTime = dateTime;
> }
> }
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>