You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhanchun Zhang (Jira)" <ji...@apache.org> on 2019/11/05 03:31:00 UTC
[jira] [Comment Edited] (FLINK-14591) Execute
PlannerBase#mergeParameters every time of calling PlannerBase#translate
method
[ https://issues.apache.org/jira/browse/FLINK-14591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967203#comment-16967203 ]
Zhanchun Zhang edited comment on FLINK-14591 at 11/5/19 3:30 AM:
-----------------------------------------------------------------
Hi [~zhongwei] [~jark], I'm willing to fix this issue, can you assign it to me.
Thanks ~
was (Author: dillon.):
Hi [~zhongwei][~jark], I'm willing to fix this issue, can you assign it to me.
Thanks ~
> Execute PlannerBase#mergeParameters every time of calling PlannerBase#translate method
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-14591
> URL: https://issues.apache.org/jira/browse/FLINK-14591
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Wei Zhong
> Priority: Minor
>
> In current implementation of blink planner, the method "PlannerBase#mergeParameter" will be called by "PlannerBase#translate" method to merge the configuration inside TableConfig into global job parameters:
> {code:scala}
> override def translate(
> modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
> if (modifyOperations.isEmpty) {
> return List.empty[Transformation[_]]
> }
> mergeParameters()
> val relNodes = modifyOperations.map(translateToRel)
> val optimizedRelNodes = optimize(relNodes)
> val execNodes = translateToExecNodePlan(optimizedRelNodes)
> translateToPlan(execNodes)
> }
> {code}
> This translate method is called in every important moment, e.g. execute, toDataStream, insertInto, etc.
> But as shown above, there is a chance that the method return directly and not call the "mergeParameters".
> In fact if we set some configurations between the "Table#insertInto" method and "TableEnvironment#execute" method, these configurations will not be merged into global job parameters because the "mergeParameters" method is not called:
> {code:scala}
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance.useBlinkPlanner.build)
> ...
> ...
> val result = ...
> val sink = TestSinkUtil.configureSink(result, new TestingAppendTableSink)
> tEnv.registerTableSink("MySink", sink)
> tEnv.getConfig.getConfiguration.setString("jobparam1", "value1")
> result.insertInto("MySink")
>
> // the "jobparam2" configuration will loss
> tEnv.getConfig.getConfiguration.setString("jobparam2", "value2")
> tEnv.execute("test")
> val jobConfig = env.getConfig.getGlobalJobParameters.toMap
>
> assertTrue(jobConfig.get("jobparam1")=="value1")
> // this assertion will fail:
> assertTrue(jobConfig.get("jobparam2")=="value2"){code}
> This may bring some confusion to the user. It will be great if we can fix this problem.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)