You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ori Popowski <or...@gmail.com> on 2022/02/01 13:54:42 UTC

Using custom validation logic on PipelineOptions

Hi all!

I am extending PipelineOptions and I have getters/setters for several
properties which are passed to my job from Dataflow. The parameters are
indeed passed correctly and I can access them from my pipeline.

However, I'd like to have some custom logic for my parameters which cannot
be expressed by a metadata file with regexes, or @Validation.Required
annotation.

I tried to add the following code:

val options = PipelineOptionsFactory.fromArgs(args:
_*).withValidation().create().as(classOf[JobOptions])

if (options.getUserId().get() == "…" && …) {
    throw new IllegalArgumentException("oops")
}

Unfortunately, I am getting this exception when I am staging the pipeline
to Dataflow with mvn -Pdataflow-runner compile exec:java …:

[info] running (fork) walkme.Main --runner=DataflowRunner --project=…
--stagingLocation=gs://…/staging --templateLocation=gs://…/template
--region=europe-west3
[error] Exception in thread "main" java.lang.IllegalStateException: Value
only available at runtime, but accessed from a non-runtime context:
RuntimeValueProvider{propertyName=userId, default=null}
[error]         at
org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
[error]         at walkme.Main$.main(Main.scala:13)
[error]         at walkme.Main.main(Main.scala)

What is the recommended way to achieve what I described?

Thanks!

Re: Using custom validation logic on PipelineOptions

Posted by Robert Bradshaw <ro...@google.com>.
I would highly recommend using Flex Templates [1] which will avoid all the
surprises and idiosyncrasies of ValueProviders and give you full
flexibility to do what you need (including complex validation) in your main
program.

[1]
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates

On Wed, Feb 2, 2022 at 3:54 AM Ori Popowski <or...@gmail.com> wrote:

>
> Tried that. It doesn't work. Unfortunately Dataflow happily ignores
> everything that happens outside the pipeline itself. I even threw an
> exception that was ignored:
>
> The Job Name is indeed "test" as you can see in the screenshot. It doesn't
> matter what I do, it just ignores it. Even logger.error and
> System.err.println.
>
>   def main(args: Array[String]): Unit = {
>     PipelineOptionsFactory.register(classOf[JobOptions])
>     val options = PipelineOptionsFactory.fromArgs(args:
> _*).withValidation().create().as(classOf[JobOptions])
>
>     if (options.getJobName == "test") {
>       throw new RuntimeException("oops")
>     }
>
>     val pipeline = Pipeline.create(options)
>
>     if (options.getJobName == "test") {
>       throw new RuntimeException("oops")
>     }
>
>     val updater = new AerospikeUpdater
>
>     pipeline
>       .apply("Read report", TextIO.read().from(options.getReportPath))
>       .apply("Update Aerospike", ParDo.of(new UpdateAerospike(updater)))
>
>     if (options.getJobName == "test") {
>       throw new RuntimeException("oops")
>     }
>
>     pipeline.run()
>   }
>
> [image: image.png]
>
>
>
> On Tue, Feb 1, 2022 at 7:05 PM Luke Cwik <lc...@google.com> wrote:
>
>> Validating at pipeline construction time makes sense unless you are
>> building a template. In this case your validating that user is set and your
>> getting an exception. If user is necessary then you can do something like:
>> if (!options.getUser().isAccessible()) {
>>   throw new IllegalArgumentException("Required property user is unset.");
>> } else if (options.getUser().get() != X) {
>>   throw new IllegalArgumentException("User property does not satisfy X");
>> }
>> If user is optional then:
>> if (options.getUser().isAccessible() && option.getUser().get() != X) {
>>   throw new IllegalArgumentException("User property does not satisfy X");
>> }
>>
>>
>> On Tue, Feb 1, 2022 at 6:17 AM Ori Popowski <or...@gmail.com> wrote:
>>
>>> Thanks.
>>>
>>> I am trying a fail-fast approach here, so I thought the best thing is to
>>> validate the options before starting the pipeline.
>>>
>>> I think validating the options in processElements on each element is
>>> wasteful since the validation code will run many times instead of just
>>> once. The options are not available for DoFn's @Setup method - only for
>>> @StartBundle. Is there any better place to validate the options except for
>>> @StartBundle?
>>>
>>> Thanks
>>>
>>> On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <ch...@doit-intl.com> wrote:
>>>
>>>> Hi Ori,
>>>>
>>>> The error message hints that you are trying to use a ValueProvider
>>>> during pipeline construction. You should only call the .get() method
>>>> from code that is run during pipeline execution, i.e. in a lambda for
>>>> MapElements or in a DoFn's processElements.
>>>>
>>>> Check the Dataflow docs on this topic here [1]. It also spells out your
>>>> error message.
>>>>
>>>> Best,
>>>> Chris
>>>>
>>>>
>>>> [1]
>>>> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>>>>
>>>> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:
>>>>
>>>>> Hi all!
>>>>>
>>>>> I am extending PipelineOptions and I have getters/setters for several
>>>>> properties which are passed to my job from Dataflow. The parameters are
>>>>> indeed passed correctly and I can access them from my pipeline.
>>>>>
>>>>> However, I'd like to have some custom logic for my parameters which
>>>>> cannot be expressed by a metadata file with regexes, or
>>>>> @Validation.Required annotation.
>>>>>
>>>>> I tried to add the following code:
>>>>>
>>>>> val options = PipelineOptionsFactory.fromArgs(args:
>>>>> _*).withValidation().create().as(classOf[JobOptions])
>>>>>
>>>>> if (options.getUserId().get() == "…" && …) {
>>>>>     throw new IllegalArgumentException("oops")
>>>>> }
>>>>>
>>>>> Unfortunately, I am getting this exception when I am staging the
>>>>> pipeline to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>>>>
>>>>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>>>>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>>>>> --region=europe-west3
>>>>> [error] Exception in thread "main" java.lang.IllegalStateException:
>>>>> Value only available at runtime, but accessed from a non-runtime context:
>>>>> RuntimeValueProvider{propertyName=userId, default=null}
>>>>> [error]         at
>>>>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>>>>> [error]         at walkme.Main$.main(Main.scala:13)
>>>>> [error]         at walkme.Main.main(Main.scala)
>>>>>
>>>>> What is the recommended way to achieve what I described?
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>>
>>>> --
>>>> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
>>>> <ch...@doit-intl.com>
>>>> Staff Cloud Architect
>>>> EMEA North
>>>> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
>>>> <https://twitter.com/doitint> [image: linkedin]
>>>> <https://www.linkedin.com/company/doitintl> [image: medium]
>>>> <https://blog.doit-intl.com/>
>>>>
>>>

Re: Using custom validation logic on PipelineOptions

Posted by Ori Popowski <or...@gmail.com>.
Tried that. It doesn't work. Unfortunately Dataflow happily ignores
everything that happens outside the pipeline itself. I even threw an
exception that was ignored:

The Job Name is indeed "test" as you can see in the screenshot. It doesn't
matter what I do, it just ignores it. Even logger.error and
System.err.println.

  def main(args: Array[String]): Unit = {
    PipelineOptionsFactory.register(classOf[JobOptions])
    val options = PipelineOptionsFactory.fromArgs(args:
_*).withValidation().create().as(classOf[JobOptions])

    if (options.getJobName == "test") {
      throw new RuntimeException("oops")
    }

    val pipeline = Pipeline.create(options)

    if (options.getJobName == "test") {
      throw new RuntimeException("oops")
    }

    val updater = new AerospikeUpdater

    pipeline
      .apply("Read report", TextIO.read().from(options.getReportPath))
      .apply("Update Aerospike", ParDo.of(new UpdateAerospike(updater)))

    if (options.getJobName == "test") {
      throw new RuntimeException("oops")
    }

    pipeline.run()
  }

[image: image.png]



On Tue, Feb 1, 2022 at 7:05 PM Luke Cwik <lc...@google.com> wrote:

> Validating at pipeline construction time makes sense unless you are
> building a template. In this case your validating that user is set and your
> getting an exception. If user is necessary then you can do something like:
> if (!options.getUser().isAccessible()) {
>   throw new IllegalArgumentException("Required property user is unset.");
> } else if (options.getUser().get() != X) {
>   throw new IllegalArgumentException("User property does not satisfy X");
> }
> If user is optional then:
> if (options.getUser().isAccessible() && option.getUser().get() != X) {
>   throw new IllegalArgumentException("User property does not satisfy X");
> }
>
>
> On Tue, Feb 1, 2022 at 6:17 AM Ori Popowski <or...@gmail.com> wrote:
>
>> Thanks.
>>
>> I am trying a fail-fast approach here, so I thought the best thing is to
>> validate the options before starting the pipeline.
>>
>> I think validating the options in processElements on each element is
>> wasteful since the validation code will run many times instead of just
>> once. The options are not available for DoFn's @Setup method - only for
>> @StartBundle. Is there any better place to validate the options except for
>> @StartBundle?
>>
>> Thanks
>>
>> On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <ch...@doit-intl.com> wrote:
>>
>>> Hi Ori,
>>>
>>> The error message hints that you are trying to use a ValueProvider
>>> during pipeline construction. You should only call the .get() method
>>> from code that is run during pipeline execution, i.e. in a lambda for
>>> MapElements or in a DoFn's processElements.
>>>
>>> Check the Dataflow docs on this topic here [1]. It also spells out your
>>> error message.
>>>
>>> Best,
>>> Chris
>>>
>>>
>>> [1]
>>> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>>>
>>> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:
>>>
>>>> Hi all!
>>>>
>>>> I am extending PipelineOptions and I have getters/setters for several
>>>> properties which are passed to my job from Dataflow. The parameters are
>>>> indeed passed correctly and I can access them from my pipeline.
>>>>
>>>> However, I'd like to have some custom logic for my parameters which
>>>> cannot be expressed by a metadata file with regexes, or
>>>> @Validation.Required annotation.
>>>>
>>>> I tried to add the following code:
>>>>
>>>> val options = PipelineOptionsFactory.fromArgs(args:
>>>> _*).withValidation().create().as(classOf[JobOptions])
>>>>
>>>> if (options.getUserId().get() == "…" && …) {
>>>>     throw new IllegalArgumentException("oops")
>>>> }
>>>>
>>>> Unfortunately, I am getting this exception when I am staging the
>>>> pipeline to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>>>
>>>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>>>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>>>> --region=europe-west3
>>>> [error] Exception in thread "main" java.lang.IllegalStateException:
>>>> Value only available at runtime, but accessed from a non-runtime context:
>>>> RuntimeValueProvider{propertyName=userId, default=null}
>>>> [error]         at
>>>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>>>> [error]         at walkme.Main$.main(Main.scala:13)
>>>> [error]         at walkme.Main.main(Main.scala)
>>>>
>>>> What is the recommended way to achieve what I described?
>>>>
>>>> Thanks!
>>>>
>>>
>>>
>>> --
>>> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
>>> <ch...@doit-intl.com>
>>> Staff Cloud Architect
>>> EMEA North
>>> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
>>> <https://twitter.com/doitint> [image: linkedin]
>>> <https://www.linkedin.com/company/doitintl> [image: medium]
>>> <https://blog.doit-intl.com/>
>>>
>>

Re: Using custom validation logic on PipelineOptions

Posted by Luke Cwik <lc...@google.com>.
Validating at pipeline construction time makes sense unless you are
building a template. In this case your validating that user is set and your
getting an exception. If user is necessary then you can do something like:
if (!options.getUser().isAccessible()) {
  throw new IllegalArgumentException("Required property user is unset.");
} else if (options.getUser().get() != X) {
  throw new IllegalArgumentException("User property does not satisfy X");
}
If user is optional then:
if (options.getUser().isAccessible() && option.getUser().get() != X) {
  throw new IllegalArgumentException("User property does not satisfy X");
}


On Tue, Feb 1, 2022 at 6:17 AM Ori Popowski <or...@gmail.com> wrote:

> Thanks.
>
> I am trying a fail-fast approach here, so I thought the best thing is to
> validate the options before starting the pipeline.
>
> I think validating the options in processElements on each element is
> wasteful since the validation code will run many times instead of just
> once. The options are not available for DoFn's @Setup method - only for
> @StartBundle. Is there any better place to validate the options except for
> @StartBundle?
>
> Thanks
>
> On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <ch...@doit-intl.com> wrote:
>
>> Hi Ori,
>>
>> The error message hints that you are trying to use a ValueProvider during
>> pipeline construction. You should only call the .get() method from code
>> that is run during pipeline execution, i.e. in a lambda for MapElements or
>> in a DoFn's processElements.
>>
>> Check the Dataflow docs on this topic here [1]. It also spells out your
>> error message.
>>
>> Best,
>> Chris
>>
>>
>> [1]
>> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>>
>> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:
>>
>>> Hi all!
>>>
>>> I am extending PipelineOptions and I have getters/setters for several
>>> properties which are passed to my job from Dataflow. The parameters are
>>> indeed passed correctly and I can access them from my pipeline.
>>>
>>> However, I'd like to have some custom logic for my parameters which
>>> cannot be expressed by a metadata file with regexes, or
>>> @Validation.Required annotation.
>>>
>>> I tried to add the following code:
>>>
>>> val options = PipelineOptionsFactory.fromArgs(args:
>>> _*).withValidation().create().as(classOf[JobOptions])
>>>
>>> if (options.getUserId().get() == "…" && …) {
>>>     throw new IllegalArgumentException("oops")
>>> }
>>>
>>> Unfortunately, I am getting this exception when I am staging the
>>> pipeline to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>>
>>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>>> --region=europe-west3
>>> [error] Exception in thread "main" java.lang.IllegalStateException:
>>> Value only available at runtime, but accessed from a non-runtime context:
>>> RuntimeValueProvider{propertyName=userId, default=null}
>>> [error]         at
>>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>>> [error]         at walkme.Main$.main(Main.scala:13)
>>> [error]         at walkme.Main.main(Main.scala)
>>>
>>> What is the recommended way to achieve what I described?
>>>
>>> Thanks!
>>>
>>
>>
>> --
>> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
>> <ch...@doit-intl.com>
>> Staff Cloud Architect
>> EMEA North
>> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
>> <https://twitter.com/doitint> [image: linkedin]
>> <https://www.linkedin.com/company/doitintl> [image: medium]
>> <https://blog.doit-intl.com/>
>>
>

Re: Using custom validation logic on PipelineOptions

Posted by Ori Popowski <or...@gmail.com>.
Thanks.

I am trying a fail-fast approach here, so I thought the best thing is to
validate the options before starting the pipeline.

I think validating the options in processElements on each element is
wasteful since the validation code will run many times instead of just
once. The options are not available for DoFn's @Setup method - only for
@StartBundle. Is there any better place to validate the options except for
@StartBundle?

Thanks

On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <ch...@doit-intl.com> wrote:

> Hi Ori,
>
> The error message hints that you are trying to use a ValueProvider during
> pipeline construction. You should only call the .get() method from code
> that is run during pipeline execution, i.e. in a lambda for MapElements or
> in a DoFn's processElements.
>
> Check the Dataflow docs on this topic here [1]. It also spells out your
> error message.
>
> Best,
> Chris
>
>
> [1]
> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>
> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:
>
>> Hi all!
>>
>> I am extending PipelineOptions and I have getters/setters for several
>> properties which are passed to my job from Dataflow. The parameters are
>> indeed passed correctly and I can access them from my pipeline.
>>
>> However, I'd like to have some custom logic for my parameters which
>> cannot be expressed by a metadata file with regexes, or
>> @Validation.Required annotation.
>>
>> I tried to add the following code:
>>
>> val options = PipelineOptionsFactory.fromArgs(args:
>> _*).withValidation().create().as(classOf[JobOptions])
>>
>> if (options.getUserId().get() == "…" && …) {
>>     throw new IllegalArgumentException("oops")
>> }
>>
>> Unfortunately, I am getting this exception when I am staging the pipeline
>> to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>
>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>> --region=europe-west3
>> [error] Exception in thread "main" java.lang.IllegalStateException: Value
>> only available at runtime, but accessed from a non-runtime context:
>> RuntimeValueProvider{propertyName=userId, default=null}
>> [error]         at
>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>> [error]         at walkme.Main$.main(Main.scala:13)
>> [error]         at walkme.Main.main(Main.scala)
>>
>> What is the recommended way to achieve what I described?
>>
>> Thanks!
>>
>
>
> --
> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
> <ch...@doit-intl.com>
> Staff Cloud Architect
> EMEA North
> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
> <https://twitter.com/doitint> [image: linkedin]
> <https://www.linkedin.com/company/doitintl> [image: medium]
> <https://blog.doit-intl.com/>
>

Re: Using custom validation logic on PipelineOptions

Posted by Chris Soujon <ch...@doit-intl.com>.
Hi Ori,

The error message hints that you are trying to use a ValueProvider during
pipeline construction. You should only call the .get() method from code
that is run during pipeline execution, i.e. in a lambda for MapElements or
in a DoFn's processElements.

Check the Dataflow docs on this topic here [1]. It also spells out your
error message.

Best,
Chris


[1]
https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface

On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:

> Hi all!
>
> I am extending PipelineOptions and I have getters/setters for several
> properties which are passed to my job from Dataflow. The parameters are
> indeed passed correctly and I can access them from my pipeline.
>
> However, I'd like to have some custom logic for my parameters which cannot
> be expressed by a metadata file with regexes, or @Validation.Required
> annotation.
>
> I tried to add the following code:
>
> val options = PipelineOptionsFactory.fromArgs(args:
> _*).withValidation().create().as(classOf[JobOptions])
>
> if (options.getUserId().get() == "…" && …) {
>     throw new IllegalArgumentException("oops")
> }
>
> Unfortunately, I am getting this exception when I am staging the pipeline
> to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>
> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
> --region=europe-west3
> [error] Exception in thread "main" java.lang.IllegalStateException: Value
> only available at runtime, but accessed from a non-runtime context:
> RuntimeValueProvider{propertyName=userId, default=null}
> [error]         at
> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
> [error]         at walkme.Main$.main(Main.scala:13)
> [error]         at walkme.Main.main(Main.scala)
>
> What is the recommended way to achieve what I described?
>
> Thanks!
>


-- 
[image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
<ch...@doit-intl.com>
Staff Cloud Architect
EMEA North
[image: facebook] <https://fb.me/DoIT.International> [image: twitter]
<https://twitter.com/doitint> [image: linkedin]
<https://www.linkedin.com/company/doitintl> [image: medium]
<https://blog.doit-intl.com/>