You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Satish Chandra Gupta <sc...@gmail.com> on 2016/10/15 03:25:52 UTC

How to debug why Flink makes and executes only partial plan

Hi,

In my Flink program, after a couple of map, union and connect, I have a
final filter and a sink. Something like this (after abstracting out
details):

val filteredEvents: DataStream[NotificationEvent]
  = allThisStuffWorking
      .name("filtered_users")

filteredEvents
  *.filter(x => check(x.f1, x.f2, someStuff)) //BUG*
  .addSink(new NotificationSinkFunction(notifier))
  .name("send_notification")

The check function returns a Boolean and does not access anything other
than parameters passed. Here is relevant part of Notification Sink Function:

class NotificationSinkFunction(notifier: Notifier)
      extends SinkFunction[NotificationEvent] {

  val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  def invoke(event: NotificationEvent): Unit = {
    LOG.info("Log this notification detail")
    *notifier.send(**event.f1, event.f2) //BUG*
  }
}

If I comment out the lines highlighted and marked with //BUG, the Flink
pipeline works and print the log messages, and Flink shows this execution
plan at the end:

filtered_users -> Sink: send_notification

[image: Inline image 1]


But with either of those two lines marked as BUG above, Flink makes and
executes plan only till filtered_user and does not print the log message.

[image: Inline image 2]

How can I figure out what is wrong with the check function or notifier send
function that prevents Flink from making the full plan. What are the
typical mistakes leading to this?

Thanks,
+satish

Re: How to debug why Flink makes and executes only partial plan

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
are we talking about the Plan View in the JobManager dashboard? If yes,
then I expect there to be only one "box" for the combination of filter and
sink because they are chained together to avoid sending data.

For debugging, could you maybe change check() to always return true and see
if you then yet your messages from the sink?

Cheers,
Aljoscha

On Sat, 15 Oct 2016 at 05:26 Satish Chandra Gupta <sc...@gmail.com>
wrote:

> Hi,
>
> In my Flink program, after a couple of map, union and connect, I have a
> final filter and a sink. Something like this (after abstracting out
> details):
>
> val filteredEvents: DataStream[NotificationEvent]
>   = allThisStuffWorking
>       .name("filtered_users")
>
> filteredEvents
>   *.filter(x => check(x.f1, x.f2, someStuff)) //BUG*
>   .addSink(new NotificationSinkFunction(notifier))
>   .name("send_notification")
>
> The check function returns a Boolean and does not access anything other
> than parameters passed. Here is relevant part of Notification Sink Function:
>
> class NotificationSinkFunction(notifier: Notifier)
>       extends SinkFunction[NotificationEvent] {
>
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
>
>   def invoke(event: NotificationEvent): Unit = {
>     LOG.info("Log this notification detail")
>     *notifier.send(**event.f1, event.f2) //BUG*
>   }
> }
>
> If I comment out the lines highlighted and marked with //BUG, the Flink
> pipeline works and print the log messages, and Flink shows this execution
> plan at the end:
>
> filtered_users -> Sink: send_notification
>
> [image: Inline image 1]
>
>
> But with either of those two lines marked as BUG above, Flink makes and
> executes plan only till filtered_user and does not print the log message.
>
> [image: Inline image 2]
>
> How can I figure out what is wrong with the check function or notifier
> send function that prevents Flink from making the full plan. What are the
> typical mistakes leading to this?
>
> Thanks,
> +satish
>