You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Wu (Jira)" <ji...@apache.org> on 2021/01/18 01:53:00 UTC

[jira] [Commented] (FLINK-21001) Flink job is blocked while using tableEnvironment with tableFunction and join

    [ https://issues.apache.org/jira/browse/FLINK-21001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17266956#comment-17266956 ] 

Wu commented on FLINK-21001:
----------------------------

I start the job at 15:48. The job is blocked and do not continue to run until 19:09. The client log is in the attachment.
 
 
 
 

> Flink job is blocked while using tableEnvironment with tableFunction and join
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-21001
>                 URL: https://issues.apache.org/jira/browse/FLINK-21001
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.11.2
>         Environment: flink-11.2
>            Reporter: Wu
>            Priority: Major
>         Attachments: client_log.txt
>
>
> The code is as follow.
> {code:java}
> //代码占位符
> package com.oppo.recdata.datapipe;
> import com.oppo.recdata.datapipe.flink.transform.ExplodeDataTypeEnum;
> import com.oppo.recdata.datapipe.flink.transform.ExplodeModify;
> import com.oppo.recdata.datapipe.flink.transform.TableExplode;
> import com.oppo.recdata.datapipe.flink.transform.function.CollectMapAggregateFunction;
> import org.apache.flink.table.api.*;
> import static org.apache.flink.table.api.Expressions.row;
> /**
>  * @author wujianzhen@oppo.com
>  */
> public class BatchTable {
>     public static void main(String[] args) {
>         EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>         TableEnvironment tableEnv = TableEnvironment.create(settings);
>         ExplodeModify modify = new ExplodeModify(ExplodeDataTypeEnum.string, null, "&");
>         tableEnv.createTemporarySystemFunction("explode", new TableExplode(modify));
>         tableEnv.createFunction("collect_map", CollectMapAggregateFunction.class);
>         Table table = tableEnv.fromValues(
>             DataTypes.ROW(
>                 DataTypes.FIELD("buuid", DataTypes.STRING()),
>                 DataTypes.FIELD("docType", DataTypes.INT()),
>                 DataTypes.FIELD("viewTime", DataTypes.INT()),
>                 DataTypes.FIELD("subCategory", DataTypes.STRING())
>             ),
>             row("John", "1", "36", "NBA&football")
>         );
>         tableEnv.createTemporaryView("feeds_expose_click_profile", table);
>         Table add_profile = tableEnv.sqlQuery("select buuid, cast(docType as varchar) as docType, viewTime, subCategory from feeds_expose_click_profile where buuid is not null and docType is not null and viewTime > 0");
>         tableEnv.createTemporaryView("add_profile", add_profile);
>         Table cate2Click = tableEnv.sqlQuery("select buuid, docType, viewTime, cate2 from add_profile, LATERAL TABLE(explode(subCategory)) as t(cate2) where subCategory is not null");
>         tableEnv.createTemporaryView("cate2_click", cate2Click);
>         Table cate2_detail = tableEnv.sqlQuery("select cate2, sum(viewTime) as viewTimeSum, buuid, docType from cate2_click GROUP BY buuid, cate2, docType");
>         tableEnv.createTemporaryView("user_cate2_detail", cate2_detail);
>         Table user_global_cate2 = tableEnv.sqlQuery("select 'gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue,  buuid as keyName, docType from cate2_click group by buuid, docType");
>         tableEnv.createTemporaryView("user_global_cate2", user_global_cate2);
>         Table global_user_cate2 = tableEnv.sqlQuery("select cate2 as fieldName, sum(viewTime) as fieldValue, 'guser_cate2_24h_click_sumtime' as keyName, docType from cate2_click group by cate2, docType ");
>         tableEnv.createTemporaryView("global_user_cate2",global_user_cate2);
>         Table global_user_global_cate2 = tableEnv.sqlQuery("select 'guser_gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, 'global_feature' as keyName, docType from cate2_click group by docType");
>         tableEnv.createTemporaryView("global_user_global_cate2", global_user_global_cate2);
>         Table cate2_cs_detail = tableEnv.sqlQuery("select a.cate2 as fieldName, (a.viewTimeSum + 0.2) / (b.fieldValue * c.fieldValue / d.fieldValue + 0.2) as fieldValue, a.buuid as keyName, a.docType from user_cate2_detail a join user_global_cate2 b on a.buuid = b.keyName and a.docType = b.docType join global_user_cate2 c on a.cate2 = c.fieldName and a.docType = c.docType join global_user_global_cate2 d on a.docType = d.docType where a.viewTimeSum > 0 and b.fieldValue > 0 and c.fieldValue > 0 and d.fieldValue > 0");
>         tableEnv.createTemporaryView("cate2_cs_detail", cate2_cs_detail);
>         Table cate2Cs = tableEnv.sqlQuery("select 'cate2_24h_click_sumtimeds' as fieldName, collect_map(fieldName, ROUND(fieldValue, 5)) as fieldValue, concat(docType, '#', keyName) as keyName from cate2_cs_detail  where fieldValue < 0 or fieldValue >= 0 group by keyName, docType");
>         cate2Cs.execute().print();
>     }
> }
> {code}
> The client log is as follow.
> {code:java}
> //代码占位符
> "C:\Program Files\Java\jdk1.8.0_73\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA 2018.2.5\lib\idea_rt.jar=64196:D:\Program Files\JetBrains\IntelliJ IDEA 2018.2.5\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\80242151\AppData\Local\Temp\classpath403316789.jar com.oppo.recdata.datapipe.BatchTable
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.5/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/D:/lib/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.8.2/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.15/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2021-01-17 15:05:25,639 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
> 2021-01-17 15:05:25,645 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
> 2021-01-17 15:05:25,652 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
> 2021-01-17 15:05:25,653 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
> 2021-01-17 15:05:25,656 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
> 2021-01-17 15:05:25,656 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)