You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Mateusz Zakarczemny (JIRA)" <ji...@apache.org> on 2017/04/11 18:02:42 UTC
[jira] [Created] (FLINK-6298) Local execution is not setting
RuntimeContext for RichOutputFormat
Mateusz Zakarczemny created FLINK-6298:
------------------------------------------
Summary: Local execution is not setting RuntimeContext for RichOutputFormat
Key: FLINK-6298
URL: https://issues.apache.org/jira/browse/FLINK-6298
Project: Flink
Issue Type: Bug
Components: DataStream API
Affects Versions: 1.2.0, 1.1.0
Reporter: Mateusz Zakarczemny
RuntimeContext is never set in RichOutputFormat. I tested it in local execution. RichMapFunction is setup correctly.
Following code will never print "//////Context set in RichOutputFormat"
{code}
import org.apache.flink.api.common.functions.{RichMapFunction, RuntimeContext}
import org.apache.flink.api.common.io.RichOutputFormat
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object Startup {
def main(args: Array[String]): Unit = {
val mapFunction = new RichMapFunction[String, String] {
def open(taskNumber: Int, numTasks: Int) { getRuntimeContext }
def map(event: String) = { event }
override def setRuntimeContext(t: RuntimeContext) = {
println("//////Context set in RichMapFunction")
super.setRuntimeContext(t)
}
}
val outputFormat = new RichOutputFormat[String] {
override def setRuntimeContext(t: RuntimeContext) = {
println("//////Context set in RichOutputFormat")
super.setRuntimeContext(t)
}
def open(taskNumber: Int, numTasks: Int) {}
def writeRecord(event: String) {
println(event)
}
def configure(parameters: Configuration): Unit = {}
def close(): Unit = {}
}
val see = StreamExecutionEnvironment.getExecutionEnvironment
val eventsStream = see.fromElements[String]("A", "B", "C").map(mapFunction)
eventsStream.writeUsingOutputFormat(outputFormat)
see.execute("test-job")
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)