You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhanchun Zhang (Jira)" <ji...@apache.org> on 2019/11/05 03:31:00 UTC

[jira] [Comment Edited] (FLINK-14591) Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method

    [ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967203#comment-16967203 ] 

Zhanchun Zhang edited comment on FLINK-14591 at 11/5/19 3:30 AM:
-----------------------------------------------------------------

Hi [~zhongwei] [~jark], I'm willing to fix this issue, can you assign it to me. 
Thanks ~


was (Author: dillon.):
Hi [~zhongwei][~jark], I'm willing to fix this issue, can you assign it to me. 
Thanks ~

>  Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-14591
>                 URL: https://issues.apache.org/jira/browse/FLINK-14591
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Wei Zhong
>            Priority: Minor
>
> In current implementation of blink planner, the method "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
>   override def translate(
>       modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
>     if (modifyOperations.isEmpty) {
>       return List.empty[Transformation[_]]
>     }
>     mergeParameters()
>     val relNodes = modifyOperations.map(translateToRel)
>     val optimizedRelNodes = optimize(relNodes)
>     val execNodes = translateToExecNodePlan(optimizedRelNodes)
>     translateToPlan(execNodes)
>   }
> {code}
> This translate method is called in every important moment, e.g. execute, toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method and "TableEnvironment#execute" method, these configurations will not be merged into global job parameters because the "mergeParameters" method is not called:
> {code:scala}
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance.useBlinkPlanner.build)
>     ...
>     ...
>     val result = ...
>     val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
>     tEnv.registerTableSink("MySink", sink)
>     tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
>     result.insertInto("MySink")
>     
>     // the "jobparam2" configuration will loss
>     tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
>     tEnv.execute("test")
>     val jobConfig = env.getConfig.getGlobalJobParameters.toMap
>     
>     assertTrue(jobConfig.get("jobparam1")=="value1")
>     // this assertion will fail:
>     assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)