You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/12/10 14:36:00 UTC
[jira] [Closed] (FLINK-16919) Parameter 'jobName' does not take
effect in BatchTableEnvUtil
[ https://issues.apache.org/jira/browse/FLINK-16919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu closed FLINK-16919.
---------------------------
Resolution: Invalid
This method has been removed.
> Parameter 'jobName' does not take effect in BatchTableEnvUtil
> -------------------------------------------------------------
>
> Key: FLINK-16919
> URL: https://issues.apache.org/jira/browse/FLINK-16919
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Zhanchun Zhang
> Priority: Major
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> {code:java}
> def collect[T](
> tEnv: TableEnvironment,
> table: Table,
> sink: CollectTableSink[T],
> jobName: Option[String]): Seq[T] = { // jobName was not used
> val typeSerializer = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
> .asInstanceOf[TypeInformation[T]]
> .createSerializer(tEnv.asInstanceOf[TableEnvironmentImpl]
> .getPlanner.asInstanceOf[PlannerBase].getExecEnv.getConfig)
> val id = new AbstractID().toString
> sink.init(typeSerializer.asInstanceOf[TypeSerializer[T]], id)
> val sinkName = UUID.randomUUID().toString
> tEnv.registerTableSink(sinkName, sink)
> tEnv.insertInto(s"`$sinkName`", table)
> val res = tEnv.execute(jobName.getOrElse("test"))
> val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
> SerializedListAccumulator.deserializeList(accResult, typeSerializer)
> }
> {code}
> Just shown as the code, parameter 'jobName' not used.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)