You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by izual <iz...@163.com> on 2020/01/27 03:12:27 UTC

SideOutput Exception: "Output Tag must not be null"

I followed docs[1] and SideOutputITCase.scala(unittest case from flink-master), but encountered an Exception:

Caused by: java.lang.IllegalArgumentException: OutputTag must not be null.


code snippet implemented by Scala

```

  private final val backupOutputTag = OutputTag[String]("backup")

  val result = dataStream.assignAscendingTimestamps(_._3)

    .keyBy(0)

    .window(TumblingEventTimeWindows.of(Time.seconds(10)))

    .sum(1)

      .process(new ProcessFunction[(String, Int, Long), (String, Int, Long)] {

        override def processElement(value: (String, Int, Long), ctx: ProcessFunction[(String, Int, Long), (String, Int, Long)]#Context, out: Collector[(String, Int, Long)]): Unit = {

          out.collect(value)

          ctx.output(backupOutputTag, s"backup:${value}")

        }

      })

```

In my opinion, the reason is bcz `backupOutputTag` was created on JobManager, and `ctx.output(backupOutputTag)` was on TaskManager, so the `backupOutputTag` would be null.

But the doc example shows that way is ok, what's the correct usage in Scala?

flink-version:1.9.1

I post a question here but no resp:http://apache-flink.147419.n8.nabble.com/sideoutput-sql-state-td1533.html

Hope resp.Thanks.

1. https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/side_output.html




Re: SideOutput Exception: "Output Tag must not be null"

Posted by Arvid Heise <ar...@ververica.com>.
Hi Izual,

it seems as the code example is not complete. I'm assuming backupOutputTag
is actually a field within your application class.

If you look at the examples, you will notice that backupOutputTag should be
defined within the method that defines your topology and not on the
wrapping object.
So drop the private modifier and move the definition inside the function.