You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Junning Liang (Jira)" <ji...@apache.org> on 2024/01/16 09:23:00 UTC
[jira] [Created] (FLINK-34112) JavaCodeSplitter OOM
Junning Liang created FLINK-34112:
-------------------------------------
Summary: JavaCodeSplitter OOM
Key: FLINK-34112
URL: https://issues.apache.org/jira/browse/FLINK-34112
Project: Flink
Issue Type: Bug
Reporter: Junning Liang
I writed a sql that has many case when syntax in FLINK 1.17 release version. But even if the client provides 8GB of memory, there is still an OOM exception:
{code:java}
16:38:51,975 ERROR org.apache.flink.client.didi.job.FlinkKubernetesJobClient [] - Failed to execute flink job.org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: JavaCodeSplitter failed. This is a bug. Please file an issue. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at org.apache.flink.client.didi.job.FlinkKubernetesJobClient.internalSubmitJob(FlinkKubernetesJobClient.java:474) ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at org.apache.flink.client.didi.job.FlinkKubernetesJobClient.submitStreamSQL(FlinkKubernetesJobClient.java:184) [flink-client-executor-core-1.17.0-011_pre.jar:?] at com.didichuxing.bigdata.flink.client.executor.k8s.core.component.FlinkJobK8sClientComponent.startStreamSql(FlinkJobK8sClientComponent.java:107) [flink-client-executor-core-1.17.0-011_pre.jar:?] at com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartFlinkJob(FlinkK8sClientExecutor.java:230) [flink-client-executor-core-1.17.0-011_pre.jar:?] at com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartJob(FlinkK8sClientExecutor.java:130) [flink-client-executor-core-1.17.0-011_pre.jar:?] at com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.main(FlinkK8sClientExecutor.java:55) [flink-client-executor-core-1.17.0-011_pre.jar:?]Caused by: java.lang.RuntimeException: JavaCodeSplitter failed. This is a bug. Please file an issue. at org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:37) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:130) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:60) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:100) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion.translateToPlanInternal(CommonExecUnion.java:61) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.Iterator.foreach(Iterator.scala:937) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.Iterator.foreach$(Iterator.scala:937) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.IterableLike.foreach(IterableLike.scala:70) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.IterableLike.foreach$(IterableLike.scala:69) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1873) ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:891) ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:2053) ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at com.didichuxing.flink.executor.CommunitySqlExecutor.execute(CommunitySqlExecutor.java:34) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at com.didichuxing.flink.executor.FlinkSqlExecutor.execute(FlinkSqlExecutor.java:51) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at com.didichuxing.flink.FlinkSqlApp.main(FlinkSqlApp.java:35) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_252] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] ... 8 moreCaused by: java.lang.OutOfMemoryError: Java heap space at org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addAnyChild(ParserRuleContext.java:133) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addChild(ParserRuleContext.java:139) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.addContextToParseTree(Parser.java:617) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.enterRule(Parser.java:629) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.primary(JavaParser.java:7760) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:6954) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:7020) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.parExpression(JavaParser.java:6653) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5521) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018]16:38:51,997 ERROR com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor [] - start job fail!org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: JavaCodeSplitter failed. This is a bug. Please file an issue. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at org.apache.flink.client.didi.job.FlinkKubernetesJobClient.internalSubmitJob(FlinkKubernetesJobClient.java:474) ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at org.apache.flink.client.didi.job.FlinkKubernetesJobClient.submitStreamSQL(FlinkKubernetesJobClient.java:184) ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at com.didichuxing.bigdata.flink.client.executor.k8s.core.component.FlinkJobK8sClientComponent.startStreamSql(FlinkJobK8sClientComponent.java:107) ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartFlinkJob(FlinkK8sClientExecutor.java:230) ~[flink-client-executor-core-1.17.0-011_pre.jar:?] at com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.handleStartJob(FlinkK8sClientExecutor.java:130) [flink-client-executor-core-1.17.0-011_pre.jar:?] at com.didichuxing.bigdata.flink.client.executor.k8s.core.FlinkK8sClientExecutor.main(FlinkK8sClientExecutor.java:55) [flink-client-executor-core-1.17.0-011_pre.jar:?]Caused by: java.lang.RuntimeException: JavaCodeSplitter failed. This is a bug. Please file an issue. at org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:37) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:130) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:60) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:100) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion.translateToPlanInternal(CommonExecUnion.java:61) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:257) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:145) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:161) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.Iterator.foreach(Iterator.scala:937) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.Iterator.foreach$(Iterator.scala:937) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.IterableLike.foreach(IterableLike.scala:70) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.IterableLike.foreach$(IterableLike.scala:69) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197) ~[flink-table-planner-loader-bundle-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1873) ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:891) ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:2053) ~[flink-table-api-java-uber-1.17.0-018.jar:1.17.0-018] at com.didichuxing.flink.executor.CommunitySqlExecutor.execute(CommunitySqlExecutor.java:34) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at com.didichuxing.flink.executor.FlinkSqlExecutor.execute(FlinkSqlExecutor.java:51) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at com.didichuxing.flink.FlinkSqlApp.main(FlinkSqlApp.java:35) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_252] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_252] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_252] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_252] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.17.0-018.jar:1.17.0-018] ... 8 moreCaused by: java.lang.OutOfMemoryError: Java heap space at org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addAnyChild(ParserRuleContext.java:133) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.shaded.org.antlr.v4.runtime.ParserRuleContext.addChild(ParserRuleContext.java:139) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.addContextToParseTree(Parser.java:617) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.shaded.org.antlr.v4.runtime.Parser.enterRule(Parser.java:629) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.primary(JavaParser.java:7760) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:6954) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.expression(JavaParser.java:7020) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.parExpression(JavaParser.java:6653) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5521) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5489) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.statement(JavaParser.java:5532) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.blockStatement(JavaParser.java:5190) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] at org.apache.flink.table.codesplit.JavaParser.block(JavaParser.java:5119) ~[flink-table-runtime-1.17.0-018.jar:1.17.0-018] {code}
the sql is follow:
{code:java}
add jar 'hdfs://xxxx/realtime-netcar-udf-lastest-v1.1.3-SNAPSHOT.jar';
CREATE FUNCTION RemoveMapKeySort as 'org.didichuxing.flink.udf.RemoveMapKey2ArraySort';set `table.exec.resource.default-parallelism` = `1`;
set `table.generated-code.max-length`=`7000`;
set `table.generated-code.allow` = `true`;-- Source schema
CREATE TABLE source_table (
binlog_table string,
binlog_time string,
binlog_event string,
order_id string,
driver_id string,
passenger_id string,
channel string,
product_id string,
area string,
to_area string,
county string,
order_status string,
_status string,
t_order_status string,
f_order_status string,
_birth_time string,
cancelled_time string,
new_time string,
assigned_time string,
prepared_time string,
completed_time string,
departure_time string,
begun_time string,
consult_time string,
finished_time string,
estimate_time string,
_modify_time string,
start_dest_distance string,
driver_start_distance string,
distance string,
pre_total_fee string,
`type` string,
combo_type string,
complete_type string,
airport_type string,
carpool_type string,
compound_type string,
source_type string,
driver_type string,
capacity_level string,
level_type string,
require_level string,
route_type string,
strive_car_level string,
extra_type string,
extend_feature string,
is_special_price string,
carpool_price_type string,
estimate_id string,
new_lng string,
new_lat string,
assigned_lng string,
assigned_lat string,
prepared_lng string,
prepared_lat string,
begun_lng string,
begun_lat string,
finished_lng string,
finished_lat string,
dynamic_price string,
travel_id string,
passenger_count string,
combo_id string,
starting_lng string,
starting_lat string,
dest_lng string,
dest_lat string,
cap_price string,
spacious_car_alliance string,
json_extend_1 string,
p_access_key_id string
)
WITH
(
'connector.topic' = 'xxx',
'connector.startup-mode' = 'group-offsets',
'connector.type' = 'kafka',
'connector.properties.bootstrap.servers' = 'xxxx',
'connector.properties.group.id' = 'rxxx',
'connector.parallelism' = '1',
'format.type' = 'json'
);-- Sink schema
CREATE TABLE realtime_dwd_trip_trd_order_base (
table_filter string,
table_time string,
table_event string,
order_id bigint,
driver_id bigint,
passenger_id bigint,
channel_id bigint,
channel_name string,
channel_level1 string,
channel_level2 string,
product_id bigint,
sub_product_line bigint,
level_1_product bigint,
level_2_product bigint,
level_3_product bigint,
call_city_id bigint,
to_city_id bigint,
call_city_name string,
call_county_id bigint,
call_county_name string,
region_name string,
order_status bigint,
call_time string,
cancel_time string,
answer_time string,
arrive_time string,
depart_time string,
charge_time string,
finish_time string,
call_time_minute string,
cancel_time_minute string,
answer_time_minute string,
arrive_time_minute string,
depart_time_minute string,
charge_time_minute string,
finish_time_minute string,
est_dis bigint,
est_arrive_dis bigint,
est_price_amt double,
combo_type bigint,
complete_type bigint,
airport_type bigint,
carpool_type bigint,
compound_type bigint,
source_type bigint,
driver_type bigint,
capacity_level bigint,
level_type bigint,
require_level bigint,
route_type bigint,
strive_car_level bigint,
extra_type bigint,
answer_dur bigint,
est_dur bigint,
arrive_dur bigint,
cancel_dur bigint,
driver_wait_dur bigint,
charge_dur bigint,
carpool_price_type bigint,
is_appt_flag bigint,
is_agency_call bigint,
is_anycar bigint,
is_cross_city_flag bigint,
is_call_flag bigint,
cancel_type bigint,
is_cancel_flag bigint,
is_grab_before_cannel_flag bigint,
is_grab_after_pas_cancel_flag bigint,
is_grab_after_dri_cancel_flag bigint,
is_grab_after_srvc_cancel_flag bigint,
is_reassigned_flag bigint,
is_reassign_flag bigint,
is_answer_flag bigint,
is_arrive_flag bigint,
is_begin_charge_flag bigint,
is_finish_flag bigint,
is_openapi bigint,
is_nirvana bigint,
is_ontheway bigint,
is_update_dest bigint,
is_service_agency_call bigint,
is_serial_assign bigint,
is_prepay bigint,
estimate_id string,
estimate_id_anycar string,
rank_index bigint,
is_anycar_sumtag bigint,
call_lng string,
call_lat string,
answer_lng string,
answer_lat string,
arrive_lng string,
arrive_lat string,
begin_charge_lng string,
begin_charge_lat string,
finish_lng string,
finish_lat string,
event_lng string,
event_lat string,
actual_distance bigint,
event_distance bigint,
extend_feature_value string,
distance_category int,
is_special_price int,
is_guide_scene int,
extend_feature string,
dynamic_price int,
travel_id bigint,
passenger_num bigint,
combo_id bigint,
dest_lng string,
dest_lat string,
cap_price_amt string,
spacious_car_alliance int,
pro_id int,
pro_name string,
json_extend_1 string,
access_key_id int
)
WITH
(
'connector' = 'print'
);-- 非anycar部分
create view source_non_anycar_table
as
select binlog_table
,binlog_time
,binlog_event
,order_id
,driver_id
,passenger_id
,channel
,source_table.product_id
,area
,to_area
,county
,order_status as order_status
,_status
,t_order_status
,f_order_status
,_birth_time
,coalesce(cancelled_time,'1971-01-01 00:00:00') as cancelled_time
,new_time
,assigned_time
,prepared_time
,completed_time
,departure_time
,begun_time
,consult_time
,finished_time
,estimate_time
,_modify_time
,cast(start_dest_distance as bigint) as start_dest_distance
,cast(driver_start_distance as bigint) as driver_start_distance
,cast(cast(distance as double)*1000 as bigint) as distance
,source_table.pre_total_fee
,`type`
,source_table.combo_type
,complete_type
,airport_type
,carpool_type
,compound_type
,source_type
,driver_type
,capacity_level
,level_type
,source_table.require_level
,route_type
,strive_car_level
,source_table.extra_type
,RemoveMapKeySort(get_json_object(extend_feature, '$.multi_require_product')) as extend_feature_value
,is_special_price
,carpool_price_type
,estimate_id
,'0' as estimate_id_anycar
,9999 as rank_index
,new_lng
,new_lat
,assigned_lng
,assigned_lat
,prepared_lng
,prepared_lat
,begun_lng
,begun_lat
,finished_lng
,finished_lat
,extend_feature
,dynamic_price
,travel_id
,passenger_count
,combo_id
,starting_lng
,starting_lat
,dest_lng
,dest_lat
,cap_price
,spacious_car_alliance
,json_extend_1
,p_access_key_id
,case
when ((coalesce(product_id,'-1') ='1' and coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and not (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in ('1','4','2','3','5','10','17','7','6','37')) then '1'
when coalesce(product_id,'-1')='1' and coalesce(combo_type,'-1') = '308' then '308'
when coalesce(product_id,'-1') in ('3','4') and coalesce(require_level,'-1')<>'900' and coalesce(combo_type,'-1')<>'314' or coalesce(product_id,'-1') in ('287') or coalesce(cast(product_id as bigint),-1) between 3000 and 3999 then '3'
when coalesce(product_id,'-1') in ('6') then '6'
when coalesce(product_id,'-1') in ('7','20') then product_id
when coalesce(product_id,'-1') in ('3','4') and coalesce(require_level,'-1')='900' then '99'
when (((coalesce(product_id,'-1') ='1' and coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in ('1','4','2','3','5','10','17','7','6','37'))) or coalesce(product_id,'-1') in ('9','22') then '9'
when coalesce(product_id,'-1') in ('11','19','12','21') then '11'
when coalesce(product_id,'-1') in ('26') then '999'
when (coalesce(product_id,'-1') in ('3','4') and coalesce(combo_type,'-1')='314') or (coalesce(product_id,'-1') in ('800','801')) then '314'
when product_id in ('38') then '38'
when coalesce(cast(channel as bigint),-1) between 1000001 and 2000000 and (coalesce(cast(product_id as bigint),-1) between 50 and 59 or coalesce(cast(product_id as bigint),-1) between 70 and 79 or coalesce(cast(product_id as bigint),-1) between 200 and 599) then '50'
when coalesce(cast(channel as bigint),-1) not between 1000001 and 2000000 and coalesce(product_id,'-1') in ('51','52','53','54','59','71','73','74','75','76','79','77') then '50'
when coalesce(cast(product_id as bigint),-1) between 50 and 59 or coalesce(cast(product_id as bigint),-1) between 70 and 79 or coalesce(cast(product_id as bigint),-1) between 200 and 599 then '51'
when coalesce(product_id,'-1') in ('700') then '700'
when coalesce(product_id,'-1') in ('81') then '81'
when coalesce(product_id,'-1') in ('83') then '327'
when coalesce(product_id,'-1') in ('15') then '15'
end as category_id
from source_table
where not (order_status in ('0','6')
and extend_feature is not null
and extend_feature like '%multi_require_product%')
and not (binlog_table='d_order_status' and order_status='5' and f_order_status is null and completed_time>'2020-01-01 00:00:00')
;-- anycar部分
create view source_anycar_table
as
select binlog_table
,binlog_time
,binlog_event
,order_id
,driver_id
,passenger_id
,channel
,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'product_id', -1) as string) as product_id
,area
,to_area
,county
,order_status as order_status
,_status
,t_order_status
,f_order_status
,_birth_time
,coalesce(cancelled_time,'1971-01-01 00:00:00') as cancelled_time
,new_time
,assigned_time
,prepared_time
,completed_time
,departure_time
,begun_time
,consult_time
,finished_time
,estimate_time
,_modify_time
,cast(start_dest_distance as bigint) as start_dest_distance
,cast(driver_start_distance as bigint) as driver_start_distance
,cast(cast(distance as double)*1000 as bigint) as distance
,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'pre_total_fee', 0) as string) as pre_total_fee
,`type`
,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'combo_type', -1) as string) as combo_type
,complete_type
,airport_type
,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'carpool_type', -1) as string) as carpool_type
,compound_type
,source_type
,driver_type
,capacity_level
,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'level_type', -1) as string) as level_type
,GET_OBJ_STR_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'require_level', '-1') as require_level
,GET_OBJ_STR_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'route_type', '-1') as route_type
,strive_car_level
,cast(GET_LONG_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'extra_type', -1) as string) as extra_type
,RemoveMapKeySort(get_json_object(extend_feature, '$.multi_require_product')) as extend_feature_value
,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'is_special_price', -1) as string) as is_special_price
,carpool_price_type
,estimate_id
,GET_OBJ_STR_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'estimate_id', '1') as estimate_id_anycar
,GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'rank', -1) as rank_index
,new_lng
,new_lat
,assigned_lng
,assigned_lat
,prepared_lng
,prepared_lat
,begun_lng
,begun_lat
,finished_lng
,finished_lat
,extend_feature
,dynamic_price
,travel_id
,passenger_count
,combo_id
,starting_lng
,starting_lat
,dest_lng
,dest_lat
,cap_price
,cast(GET_INT_FROM_MAP(JSON_STR_TO_MAP(T.any_call_info), 'spacious_car_alliance', -1) as string) as spacious_car_alliance
,json_extend_1
,p_access_key_id
,case
when ((coalesce(product_id,'-1') ='1' and coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and not (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in ('1','4','2','3','5','10','17','7','6','37')) then '1'
when coalesce(product_id,'-1')='1' and coalesce(combo_type,'-1') = '308' then '308'
when coalesce(product_id,'-1') in ('3','4') and coalesce(require_level,'-1')<>'900' and coalesce(combo_type,'-1')<>'314' or coalesce(product_id,'-1') in ('287') or coalesce(cast(product_id as bigint),-1) between 3000 and 3999 then '3'
when coalesce(product_id,'-1') in ('6') then '6'
when coalesce(product_id,'-1') in ('7','20') then product_id
when coalesce(product_id,'-1') in ('3','4') and coalesce(require_level,'-1')='900' then '99'
when (((coalesce(product_id,'-1') ='1' and coalesce(combo_type,'-1') <> '308') or coalesce(product_id,'-1') ='2') and (coalesce(require_level,'-1')='200' and coalesce(area,'-1') in ('1','4','2','3','5','10','17','7','6','37'))) or coalesce(product_id,'-1') in ('9','22') then '9'
when coalesce(product_id,'-1') in ('11','19','12','21') then '11'
when coalesce(product_id,'-1') in ('26') then '999'
when (coalesce(product_id,'-1') in ('3','4') and coalesce(combo_type,'-1')='314') or (coalesce(product_id,'-1') in ('800','801')) then '314'
when product_id in ('38') then '38'
when coalesce(cast(channel as bigint),-1) between 1000001 and 2000000 and (coalesce(cast(product_id as bigint),-1) between 50 and 59 or coalesce(cast(product_id as bigint),-1) between 70 and 79 or coalesce(cast(product_id as bigint),-1) between 200 and 599) then '50'
when coalesce(cast(channel as bigint),-1) not between 1000001 and 2000000 and coalesce(product_id,'-1') in ('51','52','53','54','59','71','73','74','75','76','79','77') then '50'
when coalesce(cast(product_id as bigint),-1) between 50 and 59 or coalesce(cast(product_id as bigint),-1) between 70 and 79 or coalesce(cast(product_id as bigint),-1) between 200 and 599 then '51'
when coalesce(product_id,'-1') in ('700') then '700'
when coalesce(product_id,'-1') in ('81') then '81'
when coalesce(product_id,'-1') in ('83') then '327'
when coalesce(product_id,'-1') in ('15') then '15'
end as category_id
from source_table
,LATERAL TABLE(JSON_ARRAY_TO_STR(RemoveMapKeySort(get_json_object(extend_feature, '$.multi_require_product')))) as T(any_call_info)
where order_status in ('0','6')
and extend_feature is not null
and extend_feature like '%multi_require_product%'
and not (binlog_table='d_order_status' and order_status='5' and f_order_status is null and completed_time>'2020-01-01 00:00:00')
;
-- Union all
create view view_merge
as
select *
from source_non_anycar_table
union all
select *
from source_anycar_table
;
-- View
create view view_01
as
select binlog_table
,binlog_time
,binlog_event
,order_id
,driver_id
,passenger_id
,channel ,'未知' as channel_name
,'未知' as channel_level1
,'未知' as channel_level2
,product_id
,case when level_type = '1' then '30'
when product_id = '3' and capacity_level = '11000' then '81'
when spacious_car_alliance = '1' then '90'
else category_id
end as sub_product_line
,area
,to_area
,'未知' as city_name
,county
,'未知' as county_name
,'未知' as region
,order_status
,_birth_time
,case when order_status = '6' and source_type <> '2' then _modify_time
when (source_type = '2' and order_status = '6') then _modify_time
when order_status = '7' and cancelled_time = '1971-01-01 00:00:00' then _modify_time
when ((complete_type not in ('7', '13') and order_status = '12') or (source_type = '2' and order_status = '0') or order_status = '9') and (order_status = '12' and complete_type = '14') then _modify_time
when order_status = '11' then _modify_time
when complete_type = '13' and order_status = '12' then _modify_time
else cancelled_time
end cancel_time
,assigned_time
,prepared_time
,departure_time
,begun_time
,finished_time
,start_dest_distance
,driver_start_distance
,pre_total_fee
,combo_type
,complete_type
,airport_type
,carpool_type
,compound_type
,source_type
,driver_type
,capacity_level
,level_type
,require_level
,route_type
,strive_car_level
,extra_type
,case when assigned_time is not null and _birth_time is not null and assigned_time > _birth_time then cast(DATE_DIFF(_birth_time,assigned_time) as string) else '0' end as answer_dur
,cast(estimate_time as bigint)*60 as est_dur
,case when assigned_time is not null and prepared_time is not null and prepared_time > assigned_time then cast(DATE_DIFF(assigned_time,prepared_time) as string) else '0' end as arrive_dur
,case when cancelled_time is not null and assigned_time is not null and cancelled_time > assigned_time then cast(DATE_DIFF(assigned_time,cancelled_time) as string)
when cancelled_time is not null and _birth_time is not null and cancelled_time > _birth_time then cast(DATE_DIFF(_birth_time ,cancelled_time) as string)
when f_order_status is not null and assigned_time is not null and order_status = '7' and _modify_time > assigned_time then cast(DATE_DIFF(assigned_time,_modify_time) as string)
when f_order_status is not null and _birth_time is not null and order_status = '7' and _modify_time > _birth_time then cast(DATE_DIFF(_birth_time,_modify_time) as string)
else '0'
end as cancel_dur
,case when begun_time is not null and prepared_time is not null and begun_time > prepared_time then cast(DATE_DIFF(prepared_time,begun_time) as string) else '0' end as driver_wait_dur
,case when finished_time is not null and begun_time is not null and finished_time > begun_time then cast(DATE_DIFF(begun_time,finished_time) as string) else '0' end as charge_dur
,cast(`type` as int) as is_appt_flag
,case when extra_type is not null and BITWISE_AND(cast(extra_type as bigint), 131072) > 0 then '1' else '0' end as is_agency_call
,case when compound_type is not null and BITWISE_AND(cast(compound_type as bigint),8192) > 0 then '1' else '0' end as is_anycar
,case when area is not null and to_area is not null and area <> to_area then '1' else '0' end as is_cross_city_flag
-- ,case when _birth_time is not null and substring(_birth_time,1,10) > '2020-01-01 00:00:00' and order_status='0' and binlog_table='d_order_base' and (driver_id = '0' or driver_id is null) and substring(_birth_time,1,16)=substring(_modify_time,1,16) and binlog_event='i' then '1' else '0' end as is_call_flag
,case when _birth_time is not null and substring(_birth_time,1,10) > '2020-01-01 00:00:00' and order_status='0' and (driver_id = '0' or driver_id is null) and substring(_birth_time,1,10)=substring(_modify_time,1,10) then '1' else '0' end as is_call_flag
,case when f_order_status is not null and source_type is not null and cancelled_time is not null and _modify_time is not null and order_status is not null and substring((case when order_status = '7' then cancelled_time else _modify_time end),1,10) > '2020-01-01 00:00:00' and order_status = '6' and source_type <> '2' and binlog_table='d_order_status' then '1'
when f_order_status is not null and source_type is not null and cancelled_time is not null and _modify_time is not null and order_status is not null and substring((case when order_status = '7' then cancelled_time else _modify_time end),1,10) > '2020-01-01 00:00:00' and ((source_type = '2' and order_status in ('6', '7')) or (source_type <> '2' and order_status = '7')) and binlog_table='d_order_status' then '2'
when f_order_status is not null and source_type is not null and cancelled_time is not null and complete_type is not null and _modify_time is not null and order_status is not null and substring((case when order_status = '7' then cancelled_time else _modify_time end),1,10) > '2020-01-01 00:00:00' and ((complete_type not in ('7', '13') and order_status = '12') or (source_type = '2' and order_status = '0') or order_status = '9') and (order_status <> '12' or complete_type <> '14') and binlog_table='d_order_status' then '3'
when f_order_status is not null and source_type is not null and cancelled_time is not null and _modify_time is not null and order_status is not null and substring((case when order_status = '7' then cancelled_time else _modify_time end),1,10) > '2020-01-01 00:00:00' and order_status = '11' and binlog_table='d_order_status' then '4'
when f_order_status is not null and source_type is not null and cancelled_time is not null and complete_type is not null and _modify_time is not null and order_status is not null and substring((case when order_status = '7' then cancelled_time else _modify_time end),1,10) > '2020-01-01 00:00:00' and complete_type = '13' and order_status = '12' and binlog_table='d_order_status' then '5'
else '0'
end as cancel_type
,case when f_order_status is not null and complete_type is not null and substring((case when order_status = '7' then cancelled_time else _modify_time end),1,10) > '2020-01-01 00:00:00' and complete_type = '7' and binlog_table='d_order_status' then '1' else '0' end as is_reassigned_flag
,case when source_type is not null and source_type = '2' and binlog_table='d_order_status' then '1' else '0' end as is_reassign_flag
,case when assigned_time is not null and substring(assigned_time,1,10) > '2020-01-01 00:00:00' and order_status='1' and f_order_status is not null and binlog_table='d_order_status' and driver_id is not null and cast(driver_id as bigint) > 0 then '1' else '0' end as is_answer_flag
,case when prepared_time is not null and substring(prepared_time,1,10) > '2020-01-01 00:00:00' and order_status='2' and f_order_status is not null and binlog_table='d_order_status' and driver_id is not null and cast(driver_id as bigint) > 0 then '1' else '0' end as is_arrive_flag
,case when begun_time is not null and substring(begun_time,1,10) > '2020-01-01 00:00:00' and order_status='4' and binlog_table='d_order_status' and f_order_status is not null and driver_id is not null and cast(driver_id as bigint) > 0 then '1' else '0' end as is_begin_charge_flag
,case when finished_time is not null and substring(finished_time,1,10) > '2020-01-01 00:00:00' and order_status = '5' and f_order_status is not null and binlog_table='d_order_status' then '1' else '0' end as is_finish_flag
,case when passenger_id is not null and passenger_id = '2624535533694' or (channel is not null and cast(channel as bigint) between 27010 and 27019) then '1' else '0' end as is_openapi
,case when product_id is not null and capacity_level is not null and product_id in ('3', '4') and capacity_level = '2000' then '1' else '0' end as is_nirvana
,case when extra_type is not null and BITWISE_AND(cast(extra_type as bigint), 33554432) > 0 then '1' else '0' end as is_ontheway
,case when extra_type is not null and BITWISE_AND(cast(extra_type as bigint), 8796093022208) > 0 then '1' else '0' end as is_update_dest
,case when extra_type is not null and BITWISE_AND(cast(extra_type as bigint), 137438953472) > 0 then '1' else '0' end as is_service_agency_call
,case when extra_type is not null and BITWISE_AND(cast(extra_type as bigint), 8388608) > 0 then '1' else '0' end as is_serial_assign
,case when extra_type is not null and BITWISE_AND(cast(extra_type as bigint), 8589934592) > 0 then '1' else '0' end as is_prepay
,is_special_price
,carpool_price_type
,estimate_id
,estimate_id_anycar
,rank_index
,starting_lng
,starting_lat
,assigned_lng
,assigned_lat
,prepared_lng
,prepared_lat
,begun_lng
,begun_lat
,finished_lng
,finished_lat
,distance
,extend_feature_value
,case
when distance<3*1000 then 0
when distance>=3*1000 and distance<6*1000 then 3
when distance>=6*1000 and distance<10*1000 then 6
when distance>=10*1000 and distance<15*1000 then 10
when distance>=15*1000 and distance<20*1000 then 15
when distance>=20*1000 then 20
end as real_distance_category
,case
when start_dest_distance<3*1000 then 0
when start_dest_distance>=3*1000 and start_dest_distance<6*1000 then 3
when start_dest_distance>=6*1000 and start_dest_distance<10*1000 then 6
when start_dest_distance>=10*1000 and start_dest_distance<15*1000 then 10
when start_dest_distance>=15*1000 and start_dest_distance<20*1000 then 15
when start_dest_distance>=20*1000 then 20
end as estimate_distance_category
,extend_feature
,case when extra_type is not null and BITWISE_AND(cast(extra_type as bigint), 268435456) > 0 then '1' else '0' end as is_guide_scene
,dynamic_price
,travel_id
,passenger_count
,combo_id
,dest_lng
,dest_lat
,cap_price
,spacious_car_alliance
,'1' as pro_id
,'未知' as pro_name
,json_extend_1
,p_access_key_id as access_key_id
from view_merge
;
-- insert into order_base_test
-- select
-- order_id,
-- driver_id,
-- passenger_id,
-- product_id,
-- sub_product_line
-- from view_01;--Sink
insert into realtime_dwd_trip_trd_order_base
select cast(binlog_table as string) as table_filter
,cast(binlog_time as string) as table_time
,cast(binlog_event as string) as table_event
,cast(order_id as bigint) as order_id
,cast(driver_id as bigint) as driver_id
,cast(passenger_id as bigint) as passenger_id
,cast(channel as bigint) as channel_id
,cast(channel_name as string) as channel_name
,cast(channel_level1 as string) as channel_level1
,cast(channel_level2 as string) as channel_level2
,cast(product_id as bigint) as product_id
,cast(sub_product_line as bigint) as sub_product_line
,case
when sub_product_line in ('1', '3', '6', '7', '9', '20', '99', '314', '30', '81', '700', '90','327') or (sub_product_line in ('11','12') and require_level='2200') then 110000
when sub_product_line in ('11','12') and require_level<>'2200' then 120000
when sub_product_line in ('308') then 130000
when sub_product_line in ('38') then 140000
when sub_product_line in ('50','51') then 190000
when sub_product_line = '15' then 150000
else 990000
end as level_1_product
,case
when sub_product_line in ('3','7','30','81','90') and combo_type not in ('4','302') then 110100
when sub_product_line in ('3','7') and combo_type = '4' then 110200
when sub_product_line in ('3','7') and combo_type = '302' then 110300
when sub_product_line in ('20','99') then 110400
when sub_product_line in ('1','6') then 110500
when sub_product_line in ('9') then 110600
when sub_product_line in ('11','12') and require_level<>'2200' then 120100
when sub_product_line in ('308') then 130100
when sub_product_line in ('38') then 140200
when sub_product_line in ('700') then 110800
when sub_product_line in ('314') then 110900
when sub_product_line in ('11','12') and require_level='2200' then 111000
when (cast(channel as bigint)>=1000001 and cast(channel as bigint)<=2000000 and sub_product_line = '50') or ((cast(channel as bigint)<1000001 or cast(channel as bigint) >2000000) and sub_product_line in ('50') and coalesce(product_id,'-1') <> '77') or sub_product_line = '51' then 190200
when sub_product_line in ('327') then 111200
when sub_product_line = '15' then 150100
when (cast(channel as bigint)<1000001 or cast(channel as bigint) >2000000) and sub_product_line = '50' and coalesce(product_id,'-1') = '77' then 190100
else 999900
end as level_2_product
,case when sub_product_line = '11' and require_level = '2000' and level_type = '5' then 120105
when sub_product_line = '11' and require_level = '1100' and level_type = '5' then 120106
when sub_product_line = '11' and require_level = '2200' and level_type = '7' then 111002
else
case
when sub_product_line in ('3','7') and combo_type not in ('4','302') then 110101
when sub_product_line='30' then 110102
when sub_product_line='314' then 110103
when sub_product_line in ('3','7') and combo_type='4' and carpool_type='4' then 110201
when sub_product_line in ('3','7') and combo_type='4' and carpool_type='5' then 110202
when sub_product_line in ('3','7') and require_level='600' and combo_type='4' and carpool_type in ('1','2') then 110203
when sub_product_line in ('3','7') and require_level='610' and combo_type='4' and carpool_type='2' then 110204
when sub_product_line in ('3','7') and require_level='600' and combo_type='4' and carpool_type in ('10') then 110205
when sub_product_line in ('3','7') and require_level = '4' then 110299
when sub_product_line in ('3','7') and combo_type = '302' and carpool_type = '6' then 110303
when sub_product_line in ('3','7') and combo_type = '302' and carpool_type = '3' and not (coalesce(product_id,'-1') in ('287') or (coalesce(product_id,'-1') not in ('3013') and cast(coalesce(product_id,'-1') as bigint) between 3000 and 3999)) then 110302
when sub_product_line in ('3','7') and combo_type = '302' and carpool_type not in ('3','6','8') then 110304
when sub_product_line in ('3','7') and combo_type = '302' and carpool_type = '3' then 110305
when sub_product_line in ('3','7') and combo_type = '302' and carpool_type = '8' then 110306
when sub_product_line in ('20','99') then 110401
when sub_product_line in ('1','6') and require_level='100' then 110501
when sub_product_line in ('1','6') and require_level='400' then 110502
when sub_product_line in ('1','6') and require_level='3300' then 110503
when sub_product_line in ('1','6') and require_level='200' then 110504
when sub_product_line in ('1','6') then 110599
when sub_product_line in ('9') then 110601
when (cast(channel as bigint)<1000001 or cast(channel as bigint) >2000000) and sub_product_line in ('50') and coalesce(product_id,'-1') in ('52','53','54','59') then 110721
when sub_product_line= '51' or ((cast(channel as bigint)<1000001 or cast(channel as bigint) >2000000) and sub_product_line in ('50') and coalesce(product_id,'-1') not in ('52','53','54','59','77')) then 110722
when sub_product_line = '11' and require_level='2000' then 120101
when sub_product_line = '11' and require_level = '1100' and combo_type = '4' and carpool_type = '2' then 120102
when sub_product_line = '11' and coalesce(cast(is_special_price as bigint), -1) = 1 then 120103
when sub_product_line = '11' and require_level ='1100' then 120104
when sub_product_line = '12' then 120201
when sub_product_line in ('308') then 130101
when sub_product_line in ('38') then 140201
when sub_product_line in ('81') then 110104
when sub_product_line in ('90') then 110105
when sub_product_line in ('700') then 110801
when (cast(channel as bigint)>=1000001 and cast(channel as bigint)<=2000000 and sub_product_line = '50') then 190201
when sub_product_line in ('11','12') and require_level='2200' and level_type='0' then 111001
when sub_product_line in ('11','12') and require_level='2200' and level_type='7' then 111002
when sub_product_line in ('327') then 111201
when sub_product_line = '15' then 150101
when (cast(channel as bigint)<1000001 or cast(channel as bigint) >2000000) and sub_product_line = '50' and coalesce(product_id,'-1')='77' then 190101
else 999999
end
end as level_3_product
,cast(area as bigint) as call_city_id
,cast(to_area as bigint) as to_city_id
,cast(city_name as string) as call_city_name
,cast(county as bigint) as call_county_id
,cast(county_name as string) as call_county_name
,cast(region as string) as region_name
,cast(order_status as bigint) as order_status
,cast(_birth_time as string) as call_time
,cast(cancel_time as string) as cancel_time
,cast(assigned_time as string) as answer_time
,cast(prepared_time as string) as arrive_time
,cast(departure_time as string) as depart_time
,cast(begun_time as string) as charge_time
,cast(finished_time as string) as finish_time
,cast(cast(substring(cast(TIMESTAMP_TO_MS(_birth_time, 'yyyy-MM-dd HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as call_time_minute
,cast(cast(substring(cast(TIMESTAMP_TO_MS(cancel_time, 'yyyy-MM-dd HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as cancel_time_minute
,cast(cast(substring(cast(TIMESTAMP_TO_MS(assigned_time, 'yyyy-MM-dd HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as answer_time_minute
,cast(cast(substring(cast(TIMESTAMP_TO_MS(prepared_time, 'yyyy-MM-dd HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as arrive_time_minute
,cast(cast(substring(cast(TIMESTAMP_TO_MS(departure_time, 'yyyy-MM-dd HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as depart_time_minute
,cast(cast(substring(cast(TIMESTAMP_TO_MS(begun_time, 'yyyy-MM-dd HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as charge_time_minute
,cast(cast(substring(cast(TIMESTAMP_TO_MS(finished_time, 'yyyy-MM-dd HH:mm:ss')/60 as string),0,8) as bigint)*60 as string) as finish_time_minute
,cast(start_dest_distance as bigint) as est_dis
,cast(driver_start_distance as bigint) as est_arrive_dis
,cast(pre_total_fee as double) as est_price_amt
,cast(combo_type as bigint) as combo_type
,cast(complete_type as bigint) as complete_type
,cast(airport_type as bigint) as airport_type
,cast(carpool_type as bigint) as carpool_type
,cast(compound_type as bigint) as compound_type
,cast(source_type as bigint) as source_type
,cast(driver_type as bigint) as driver_type
,cast(capacity_level as bigint) as capacity_level
,cast(level_type as bigint) as level_type
,cast(require_level as bigint) as require_level
,cast(route_type as bigint) as route_type
,cast(strive_car_level as bigint) as strive_car_level
,cast(extra_type as bigint) as extra_type
,cast(answer_dur as bigint) as answer_dur
,cast(est_dur as bigint) as est_dur
,cast(arrive_dur as bigint) as arrive_dur
,cast(cancel_dur as bigint) as cancel_dur
,cast(driver_wait_dur as bigint) as driver_wait_dur
,cast(charge_dur as bigint) as charge_dur
,cast(carpool_price_type as bigint) as carpool_price_type
,cast(is_appt_flag as bigint) as is_appt_flag
,cast(is_agency_call as bigint) as is_agency_call
,cast(is_anycar as bigint) as is_anycar
,cast(is_cross_city_flag as bigint) as is_cross_city_flag
,cast(is_call_flag as bigint) as is_call_flag
,cast(cancel_type as bigint) as cancel_type
,cast(case when cancel_type <> '0' then 1 else 0 end as bigint) as is_cancel_flag
,cast(case when cancel_type = '1' then 1 else 0 end as bigint) as is_grab_before_cannel_flag
,cast(case when order_status<>'6' and cancel_type = '2' then 1 else 0 end as bigint) as is_grab_after_pas_cancel_flag
,cast(case when order_status<>'6' and cancel_type = '3' then 1 else 0 end as bigint) as is_grab_after_dri_cancel_flag
,cast(case when order_status<>'6' and cancel_type = '4' then 1 else 0 end as bigint) as is_grab_after_srvc_cancel_flag
,cast(is_reassigned_flag as bigint) as is_reassigned_flag
,cast(is_reassign_flag as bigint) as is_reassign_flag
,cast(is_answer_flag as bigint) as is_answer_flag
,cast(is_arrive_flag as bigint) as is_arrive_flag
,cast(is_begin_charge_flag as bigint) as is_begin_charge_flag
,cast(is_finish_flag as bigint) as is_finish_flag
,cast(is_openapi as bigint) as is_openapi
,cast(is_nirvana as bigint) as is_nirvana
,cast(is_ontheway as bigint) as is_ontheway
,cast(is_update_dest as bigint) as is_update_dest
,cast(is_service_agency_call as bigint) as is_service_agency_call
,cast(is_serial_assign as bigint) as is_serial_assign
,cast(is_prepay as bigint) as is_prepay
,estimate_id
,estimate_id_anycar
,rank_index
,case when (rank_index = 0 or rank_index = 9999) then 1 else 0 end is_anycar_sumtag
,starting_lng as call_lng
,starting_lat as call_lat
,assigned_lng as answer_lng
,assigned_lat as answer_lat
,prepared_lng as arrive_lng
,prepared_lat as arrive_lat
,begun_lng as begin_charge_lng
,begun_lat as begin_charge_lat
,finished_lng as finish_lng
,finished_lat as finish_lat
,case when is_call_flag='1' then starting_lng
when is_answer_flag='1' then assigned_lng
when is_arrive_flag='1' then prepared_lng
when is_begin_charge_flag='1' then begun_lng
when is_finish_flag='1' then finished_lng
end as event_lng
,case when is_call_flag='1' then starting_lat
when is_answer_flag='1' then assigned_lat
when is_arrive_flag='1' then prepared_lat
when is_begin_charge_flag='1' then begun_lat
when is_finish_flag='1' then finished_lat
end as event_lat
,distance as actual_distance
,case when is_call_flag='1' then start_dest_distance
when is_answer_flag='1' then driver_start_distance
when is_finish_flag='1' then distance
end as event_distance
,'' as extend_feature_value
,case when is_finish_flag = '1' then real_distance_category
else estimate_distance_category
end distance_category
,cast(is_special_price as int) as is_special_price
,cast(is_guide_scene as int) as is_guide_scene
,extend_feature
,cast(dynamic_price as int) as dynamic_price
,cast(travel_id as bigint) as travel_id
,cast(passenger_count as bigint) as passenger_num
,cast(combo_id as bigint) as combo_id
,dest_lng
,dest_lat
,cap_price as cap_price_amt
,cast(spacious_car_alliance as int) as spacious_car_alliance
,cast(pro_id as int) as pro_id
,pro_name
,json_extend_1
,cast(access_key_id as int) as access_key_id
from view_01
where channel not in ('1', '20001','1500001','1510001', '1010000001')
;{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)