You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Julio Biason <ju...@azion.com> on 2018/04/02 19:53:23 UTC

Side outputs never getting consumed

Hey guys,

I have a pipeline that generates two different types of data (but both use
the same trait) and I need to save each on a different sink.

So far, things were working with splits, but it seems using splits with
side outputs (for the late data, which we'll plug a late arrival handling)
causes errors, so I changed everything to side outputs.

To select a side output based on type, I did the following:

class MetricTypeSplitter(accountingTag:OutputTag[Metric],
analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {

  val logger = LoggerFactory.getLogger(this.getClass)

  override def processElement(
    value:Metric,
    ctx:ProcessFunction[Metric, Metric]#Context,
    out:Collector[Metric]
  ): Unit = {
    out.collect(value)
    value match {
      case record:AccountingMetric => {
        logger.info(s"Sending ${record} to Accounting")
        ctx.output(accountingTag, record)
      }
      case record:AnalysingMetric => {
        logger.info(s"Sending ${record} to Analysis")
        ctx.output(analysingTag, record)
      }
      case _ => {
        logger.error(s"Don't know the type of ${value}")
      }
    }
  }
}

And at the end of the pipeline I add the splitter:

    pipeline
      .process(new MetricTypeSplitter(accountTag, analysisTag))

So far, this works and I can see the logs of which tag each metric in being
sent being generated. The second part, in which I capture the side output
and send the data to sink, doesn't seem to work, though:

    pipeline
      .getSideOutput(accountTag)
      .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")

.writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")


And here is the problem: It seems .getSideOutput() is never actually
getting the side output because a the logger in AccoutingSink.toRow() is
never happening and the data is not showing on our database (toRow()
convers the Metric to a Row and accountingSInk.output returns the
JDBCOutputFormat).

Any ideas what I need to do for side outputs to be actually captured?

-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
<callto:+5551996209291>*99907 0554*

Re: Side outputs never getting consumed

Posted by Timo Walther <tw...@apache.org>.
Hi Julio,

thanks for this great example. I could reproduce it on my machine and I 
could find the problem.

You need to store the newly created branch of your pipeline in some 
variable like `val test = pipeline.process()` in order to access the 
side outputs via `test.getSideOutput(outputSimple)`. Right now your 
program expects a a side output from the wrong operator (namely the 
window operation).

Regards,
Timo


Am 04.04.18 um 16:35 schrieb Julio Biason:
> Hey Timo,
>
> To be completely honest, I _think_ they are POJO, although I use case 
> classes (because I want our data to be immutable).
>
> I wrote a sample code, which basically reflects our pipeline: 
> https://github.com/jbiason/FlinkSample/blob/master/src/main/scala/net/juliobiason/SideoutputSample.scala
>
> The thing to notice is that we do the split to side outputs _after_ 
> the window functions -- because we want to split the results just 
> before the sinks (we had a split there instead, but the job would, 
> sometimes, crash because "splits can't be used with side outputs", or 
> something around those lines). Are we correct in assume that there 
> can't be side outputs once a window is processed?
>
> On Tue, Apr 3, 2018 at 10:17 AM, Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> wrote:
>
>     Hi Julio,
>
>     I tried to reproduce your problem locally but everything run
>     correctly. Could you share a little example job with us?
>
>     This worked for me:
>
>     class TestingClass {
>        var hello:Int =0 }
>
>     class TestAextends TestingClass {
>        var test:String = _
>     }
>
>     def main(args: Array[String]) {
>
>        // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment // get input data val text = env.fromElements(WordCountData.WORDS: _*)
>
>        val outputTag =OutputTag[(String, Int)]("side-output")
>        val outputTag2 =OutputTag[TestingClass]("side-output2")
>
>        val counts: DataStream[(String, Int)] = text
>          // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(_.toLowerCase.split("\\W+"))
>          .filter(_.nonEmpty)
>          .map((_, 1))
>          // group by the tuple field "0" and sum up tuple field "1" .keyBy(0)
>          .sum(1)
>            .process(new ProcessFunction[(String, Int), (String, Int)] {
>              override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]):Unit = {
>                ctx.output(outputTag, value)
>                ctx.output(outputTag2, new TestingClass)
>                ctx.output(outputTag2, new TestA)
>              }
>            })
>
>        counts.getSideOutput(outputTag).print()
>        counts.getSideOutput(outputTag2).print()
>
>        // execute program env.execute("Streaming WordCount")
>     }
>
>
>     Are the Metric classes proper POJO types?
>
>     Regards,
>     Timo
>
>
>     Am 02.04.18 um 21:53 schrieb Julio Biason:
>>     Hey guys,
>>
>>     I have a pipeline that generates two different types of data (but
>>     both use the same trait) and I need to save each on a different sink.
>>
>>     So far, things were working with splits, but it seems using
>>     splits with side outputs (for the late data, which we'll plug a
>>     late arrival handling) causes errors, so I changed everything to
>>     side outputs.
>>
>>     To select a side output based on type, I did the following:
>>
>>     class MetricTypeSplitter(accountingTag:OutputTag[Metric],
>>     analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric,
>>     Metric] {
>>
>>       val logger = LoggerFactory.getLogger(this.getClass)
>>
>>       override def processElement(
>>         value:Metric,
>>         ctx:ProcessFunction[Metric, Metric]#Context,
>>         out:Collector[Metric]
>>       ): Unit = {
>>         out.collect(value)
>>         value match {
>>           case record:AccountingMetric => {
>>     logger.info <http://logger.info>(s"Sending ${record} to Accounting")
>>             ctx.output(accountingTag, record)
>>           }
>>           case record:AnalysingMetric => {
>>     logger.info <http://logger.info>(s"Sending ${record} to Analysis")
>>             ctx.output(analysingTag, record)
>>           }
>>           case _ => {
>>             logger.error(s"Don't know the type of ${value}")
>>           }
>>         }
>>       }
>>     }
>>
>>     And at the end of the pipeline I add the splitter:
>>
>>         pipeline
>>           .process(new MetricTypeSplitter(accountTag, analysisTag))
>>
>>     So far, this works and I can see the logs of which tag each
>>     metric in being sent being generated. The second part, in which I
>>     capture the side output and send the data to sink, doesn't seem
>>     to work, though:
>>
>>         pipeline
>>           .getSideOutput(accountTag)
>>           .map { tuple => AccountingSink.toRow(tuple)
>>     }.name("Accounting rows")
>>     .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")
>>
>>     And here is the problem: It seems .getSideOutput() is never
>>     actually getting the side output because a the logger in
>>     AccoutingSink.toRow() is never happening and the data is not
>>     showing on our database (toRow() convers the Metric to a Row and
>>     accountingSInk.output returns the JDBCOutputFormat).
>>
>>     Any ideas what I need to do for side outputs to be actually captured?
>>
>>     -- 
>>     *Julio Biason*,Sofware Engineer
>>     *AZION*  | Deliver. Accelerate. Protect.
>>     Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55
>>     51 <callto:+5551996209291>_99907 0554_
>
>
>
>
>
> -- 
> *Julio Biason*,Sofware Engineer
> *AZION*  | Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51 
> <callto:+5551996209291>_99907 0554_



Re: Side outputs never getting consumed

Posted by Julio Biason <ju...@azion.com>.
Hey Timo,

To be completely honest, I _think_ they are POJO, although I use case
classes (because I want our data to be immutable).

I wrote a sample code, which basically reflects our pipeline:
https://github.com/jbiason/FlinkSample/blob/master/src/main/scala/net/juliobiason/SideoutputSample.scala

The thing to notice is that we do the split to side outputs _after_ the
window functions -- because we want to split the results just before the
sinks (we had a split there instead, but the job would, sometimes, crash
because "splits can't be used with side outputs", or something around those
lines). Are we correct in assume that there can't be side outputs once a
window is processed?

On Tue, Apr 3, 2018 at 10:17 AM, Timo Walther <tw...@apache.org> wrote:

> Hi Julio,
>
> I tried to reproduce your problem locally but everything run correctly.
> Could you share a little example job with us?
>
> This worked for me:
>
> class TestingClass {
>   var hello: Int = 0}
> class TestA extends TestingClass {
>   var test: String = _
> }
> def main(args: Array[String]) {
>
>   // set up the execution environment  val env = StreamExecutionEnvironment.getExecutionEnvironment  // get input data  val text = env.fromElements(WordCountData.WORDS: _*)
>
>   val outputTag = OutputTag[(String, Int)]("side-output")
>   val outputTag2 = OutputTag[TestingClass]("side-output2")
>
>   val counts: DataStream[(String, Int)] = text
>     // split up the lines in pairs (2-tuples) containing: (word,1)    .flatMap(_.toLowerCase.split("\\W+"))
>     .filter(_.nonEmpty)
>     .map((_, 1))
>     // group by the tuple field "0" and sum up tuple field "1"    .keyBy(0)
>     .sum(1)
>       .process(new ProcessFunction[(String, Int), (String, Int)] {
>         override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
>           ctx.output(outputTag, value)
>           ctx.output(outputTag2, new TestingClass)
>           ctx.output(outputTag2, new TestA)
>         }
>       })
>
>   counts.getSideOutput(outputTag).print()
>   counts.getSideOutput(outputTag2).print()
>
>   // execute program  env.execute("Streaming WordCount")
> }
>
>
> Are the Metric classes proper POJO types?
>
> Regards,
> Timo
>
>
> Am 02.04.18 um 21:53 schrieb Julio Biason:
>
> Hey guys,
>
> I have a pipeline that generates two different types of data (but both use
> the same trait) and I need to save each on a different sink.
>
> So far, things were working with splits, but it seems using splits with
> side outputs (for the late data, which we'll plug a late arrival handling)
> causes errors, so I changed everything to side outputs.
>
> To select a side output based on type, I did the following:
>
> class MetricTypeSplitter(accountingTag:OutputTag[Metric],
> analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {
>
>   val logger = LoggerFactory.getLogger(this.getClass)
>
>   override def processElement(
>     value:Metric,
>     ctx:ProcessFunction[Metric, Metric]#Context,
>     out:Collector[Metric]
>   ): Unit = {
>     out.collect(value)
>     value match {
>       case record:AccountingMetric => {
>         logger.info(s"Sending ${record} to Accounting")
>         ctx.output(accountingTag, record)
>       }
>       case record:AnalysingMetric => {
>         logger.info(s"Sending ${record} to Analysis")
>         ctx.output(analysingTag, record)
>       }
>       case _ => {
>         logger.error(s"Don't know the type of ${value}")
>       }
>     }
>   }
> }
>
> And at the end of the pipeline I add the splitter:
>
>     pipeline
>       .process(new MetricTypeSplitter(accountTag, analysisTag))
>
> So far, this works and I can see the logs of which tag each metric in
> being sent being generated. The second part, in which I capture the side
> output and send the data to sink, doesn't seem to work, though:
>
>     pipeline
>       .getSideOutput(accountTag)
>       .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting rows")
>       .writeUsingOutputFormat(accountingSink.output).name(s"
> ${accountingSink}")
>
>
>
> And here is the problem: It seems .getSideOutput() is never actually
> getting the side output because a the logger in AccoutingSink.toRow() is
> never happening and the data is not showing on our database (toRow()
> convers the Metric to a Row and accountingSInk.output returns the
> JDBCOutputFormat).
>
> Any ideas what I need to do for side outputs to be actually captured?
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
> <callto:+5551996209291>*99907 0554*
>
>
>


-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
<callto:+5551996209291>*99907 0554*

Re: Side outputs never getting consumed

Posted by Timo Walther <tw...@apache.org>.
Hi Julio,

I tried to reproduce your problem locally but everything run correctly. 
Could you share a little example job with us?

This worked for me:

class TestingClass {
   var hello:Int =0 }

class TestAextends TestingClass {
   var test:String = _
}

def main(args: Array[String]) {

   // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment // get input data val text = env.fromElements(WordCountData.WORDS: _*)

   val outputTag =OutputTag[(String, Int)]("side-output")
   val outputTag2 =OutputTag[TestingClass]("side-output2")

   val counts: DataStream[(String, Int)] = text
     // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(_.toLowerCase.split("\\W+"))
     .filter(_.nonEmpty)
     .map((_, 1))
     // group by the tuple field "0" and sum up tuple field "1" .keyBy(0)
     .sum(1)
       .process(new ProcessFunction[(String, Int), (String, Int)] {
         override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]):Unit = {
           ctx.output(outputTag, value)
           ctx.output(outputTag2, new TestingClass)
           ctx.output(outputTag2, new TestA)
         }
       })

   counts.getSideOutput(outputTag).print()
   counts.getSideOutput(outputTag2).print()

   // execute program env.execute("Streaming WordCount")
}


Are the Metric classes proper POJO types?

Regards,
Timo


Am 02.04.18 um 21:53 schrieb Julio Biason:
> Hey guys,
>
> I have a pipeline that generates two different types of data (but both 
> use the same trait) and I need to save each on a different sink.
>
> So far, things were working with splits, but it seems using splits 
> with side outputs (for the late data, which we'll plug a late arrival 
> handling) causes errors, so I changed everything to side outputs.
>
> To select a side output based on type, I did the following:
>
> class MetricTypeSplitter(accountingTag:OutputTag[Metric], 
> analysingTag:OutputTag[Metric]) extends ProcessFunction[Metric, Metric] {
>
>   val logger = LoggerFactory.getLogger(this.getClass)
>
>   override def processElement(
>     value:Metric,
>     ctx:ProcessFunction[Metric, Metric]#Context,
>     out:Collector[Metric]
>   ): Unit = {
>     out.collect(value)
>     value match {
>       case record:AccountingMetric => {
> logger.info <http://logger.info>(s"Sending ${record} to Accounting")
>         ctx.output(accountingTag, record)
>       }
>       case record:AnalysingMetric => {
> logger.info <http://logger.info>(s"Sending ${record} to Analysis")
>         ctx.output(analysingTag, record)
>       }
>       case _ => {
>         logger.error(s"Don't know the type of ${value}")
>       }
>     }
>   }
> }
>
> And at the end of the pipeline I add the splitter:
>
>     pipeline
>       .process(new MetricTypeSplitter(accountTag, analysisTag))
>
> So far, this works and I can see the logs of which tag each metric in 
> being sent being generated. The second part, in which I capture the 
> side output and send the data to sink, doesn't seem to work, though:
>
>     pipeline
>       .getSideOutput(accountTag)
>       .map { tuple => AccountingSink.toRow(tuple) }.name("Accounting 
> rows")
> .writeUsingOutputFormat(accountingSink.output).name(s"${accountingSink}")
>
> And here is the problem: It seems .getSideOutput() is never actually 
> getting the side output because a the logger in AccoutingSink.toRow() 
> is never happening and the data is not showing on our database 
> (toRow() convers the Metric to a Row and accountingSInk.output returns 
> the JDBCOutputFormat).
>
> Any ideas what I need to do for side outputs to be actually captured?
>
> -- 
> *Julio Biason*,Sofware Engineer
> *AZION*  | Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51 
> <callto:+5551996209291>_99907 0554_