You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Hill <qu...@gmail.com> on 2021/12/05 20:43:02 UTC

Any way to require .uid(...) calls?

Hi.

I want to make sure we keep consistent uids on my Flink operators.  Is
there a way to require uids?  It's pretty easy to add operators and not
have explicit uids on them.

Also, I noticed an issue (Flink v1.12.3) where a filter operator does not
chain when it's between a ProcessFunction and a cogroup window operator.  I
can't get a uid set on this map.  I've tried a few variations and haven't
been able to chain it.



[image: no-uid.png]

RE: Any way to require .uid(...) calls?

Posted by Schwalbe Matthias <Ma...@viseca.ch>.
Hi Dan,

In case you also want to keep automatic UID assignment, we do something like this (scala):

override def run(args: ApplicationArguments): Unit = {
  require(jobName != null, "a specific jobName needs to be configured, if hosted in Spring Boot, configure 'flink.job.name' in application.yaml !")

  val graph = env.getStreamGraph(jobName, false)
  val nodes = graph.getStreamNodes
  var missingUid = false
  for(node: StreamNode <- nodes.asScala){
    if(node.getTransformationUID == null){
      missingUid = true
      val message = s"Operator[${node.getId}: ${node.getOperatorName}]: Missing uid(...) for state migration]"
      println(message)
      if(forceOperatorUid){
        if(logger.isErrorEnabled())logger.error(message)
      }
      else{
        if(logger.isWarnEnabled())logger.warn(message)
      }
    }
  }

  val exPlan = env.getExecutionPlan
  if(missingUid){
    val message = s"job execution plan: \n$exPlan"
    if(forceOperatorUid){
      if(logger.isErrorEnabled())logger.error(message)
    }
    else{
      if(logger.isWarnEnabled())logger.warn(message)
    }
  }
  else {
    if (logger.isInfoEnabled()) logger.info(s"job execution plan: \n$exPlan")
  }

  println
  println
  println("job execution plan:")
  println
  println(exPlan)
  println
  println

  if(forceOperatorUid){
    require(!missingUid, s"Job not executed because of configuration parameter: flink.job.forceOperatorUid: $forceOperatorUid (for state migration)")
  }

  env.execute(jobName)
}


That also gives us a little more explicit diagnostics.

Hope this helps 😊

Thias



From: Dan Hill <qu...@gmail.com>
Sent: Montag, 6. Dezember 2021 05:03
To: Chesnay Schepler <ch...@apache.org>
Cc: user <us...@flink.apache.org>
Subject: Re: Any way to require .uid(...) calls?

Thanks!

On Sun, Dec 5, 2021 at 1:23 PM Chesnay Schepler <ch...@apache.org>> wrote:
I'm not sure if there is a configuration option for doing so, but the generation of UIDs can be disabled via ExecutionConfig#disableAutoGeneratedUIDs, which would fail a job if not all operators have a UID.

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableAutoGeneratedUIDs();

On 05/12/2021 21:43, Dan Hill wrote:
Hi.

I want to make sure we keep consistent uids on my Flink operators.  Is there a way to require uids?  It's pretty easy to add operators and not have explicit uids on them.

Also, I noticed an issue (Flink v1.12.3) where a filter operator does not chain when it's between a ProcessFunction and a cogroup window operator.  I can't get a uid set on this map.  I've tried a few variations and haven't been able to chain it.



[cid:image002.png@01D7EA7C.94892860]


Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten.

This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.

Re: Any way to require .uid(...) calls?

Posted by Dan Hill <qu...@gmail.com>.
Thanks!

On Sun, Dec 5, 2021 at 1:23 PM Chesnay Schepler <ch...@apache.org> wrote:

> I'm not sure if there is a configuration option for doing so, but the
> generation of UIDs can be disabled via ExecutionConfig#disableAutoGeneratedUIDs,
> which would fail a job if not all operators have a UID.
>
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.getConfig().disableAutoGeneratedUIDs();
>
> On 05/12/2021 21:43, Dan Hill wrote:
>
> Hi.
>
> I want to make sure we keep consistent uids on my Flink operators.  Is
> there a way to require uids?  It's pretty easy to add operators and not
> have explicit uids on them.
>
> Also, I noticed an issue (Flink v1.12.3) where a filter operator does not
> chain when it's between a ProcessFunction and a cogroup window operator.  I
> can't get a uid set on this map.  I've tried a few variations and haven't
> been able to chain it.
>
>
>
> [image: no-uid.png]
>
>
>

Re: Any way to require .uid(...) calls?

Posted by Chesnay Schepler <ch...@apache.org>.
I'm not sure if there is a configuration option for doing so, but the 
generation of UIDs can be disabled via 
ExecutionConfig#disableAutoGeneratedUIDs, which would fail a job if not 
all operators have a UID.

     StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
     env.getConfig().disableAutoGeneratedUIDs();

On 05/12/2021 21:43, Dan Hill wrote:
> Hi.
>
> I want to make sure we keep consistent uids on my Flink operators.  Is 
> there a way to require uids?  It's pretty easy to add operators and 
> not have explicit uids on them.
>
> Also, I noticed an issue (Flink v1.12.3) where a filter operator does 
> not chain when it's between a ProcessFunction and a cogroup window 
> operator.  I can't get a uid set on this map.  I've tried a few 
> variations and haven't been able to chain it.
>
>
>
> no-uid.png