You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Wenlong Lyu (JIRA)" <ji...@apache.org> on 2017/04/13 09:20:41 UTC
[jira] [Assigned] (FLINK-6298) Local execution is not setting
RuntimeContext for RichOutputFormat
[ https://issues.apache.org/jira/browse/FLINK-6298?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenlong Lyu reassigned FLINK-6298:
----------------------------------
Assignee: Wenlong Lyu
> Local execution is not setting RuntimeContext for RichOutputFormat
> ------------------------------------------------------------------
>
> Key: FLINK-6298
> URL: https://issues.apache.org/jira/browse/FLINK-6298
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.1.0, 1.2.0
> Reporter: Mateusz Zakarczemny
> Assignee: Wenlong Lyu
>
> RuntimeContext is never set in RichOutputFormat. I tested it in local execution. RichMapFunction is setup correctly.
> Following code will never print "//////Context set in RichOutputFormat"
> {code}
> import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
> import org.apache.flink.api.common.io.RichOutputFormat
> import org.apache.flink.api.scala._
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> object Startup {
> def main(args: Array[String]): Unit = {
> val mapFunction = new RichMapFunction[String, String] {
> def open(taskNumber: Int, numTasks: Int) { getRuntimeContext }
> def map(event: String) = { event }
> override def setRuntimeContext(t: RuntimeContext) = {
> println("//////Context set in RichMapFunction")
> super.setRuntimeContext(t)
> }
> }
> val outputFormat = new RichOutputFormat[String] {
> override def setRuntimeContext(t: RuntimeContext) = {
> println("//////Context set in RichOutputFormat")
> super.setRuntimeContext(t)
> }
> def open(taskNumber: Int, numTasks: Int) {}
> def writeRecord(event: String) {
> println(event)
> }
> def configure(parameters: Configuration): Unit = {}
> def close(): Unit = {}
> }
> val see = StreamExecutionEnvironment.getExecutionEnvironment
> val eventsStream = see.fromElements[String]("A", "B", "C").map(mapFunction)
> eventsStream.writeUsingOutputFormat(outputFormat)
> see.execute("test-job")
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)