You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ori Popowski <or...@gmail.com> on 2022/02/01 13:54:42 UTC
Using custom validation logic on PipelineOptions
Hi all!
I am extending PipelineOptions and I have getters/setters for several
properties which are passed to my job from Dataflow. The parameters are
indeed passed correctly and I can access them from my pipeline.
However, I'd like to have some custom logic for my parameters which cannot
be expressed by a metadata file with regexes, or @Validation.Required
annotation.
I tried to add the following code:
val options = PipelineOptionsFactory.fromArgs(args:
_*).withValidation().create().as(classOf[JobOptions])
if (options.getUserId().get() == "…" && …) {
throw new IllegalArgumentException("oops")
}
Unfortunately, I am getting this exception when I am staging the pipeline
to Dataflow with mvn -Pdataflow-runner compile exec:java …:
[info] running (fork) walkme.Main --runner=DataflowRunner --project=…
--stagingLocation=gs://…/staging --templateLocation=gs://…/template
--region=europe-west3
[error] Exception in thread "main" java.lang.IllegalStateException: Value
only available at runtime, but accessed from a non-runtime context:
RuntimeValueProvider{propertyName=userId, default=null}
[error] at
org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
[error] at walkme.Main$.main(Main.scala:13)
[error] at walkme.Main.main(Main.scala)
What is the recommended way to achieve what I described?
Thanks!
Re: Using custom validation logic on PipelineOptions
Posted by Robert Bradshaw <ro...@google.com>.
I would highly recommend using Flex Templates [1] which will avoid all the
surprises and idiosyncrasies of ValueProviders and give you full
flexibility to do what you need (including complex validation) in your main
program.
[1]
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
On Wed, Feb 2, 2022 at 3:54 AM Ori Popowski <or...@gmail.com> wrote:
>
> Tried that. It doesn't work. Unfortunately Dataflow happily ignores
> everything that happens outside the pipeline itself. I even threw an
> exception that was ignored:
>
> The Job Name is indeed "test" as you can see in the screenshot. It doesn't
> matter what I do, it just ignores it. Even logger.error and
> System.err.println.
>
> def main(args: Array[String]): Unit = {
> PipelineOptionsFactory.register(classOf[JobOptions])
> val options = PipelineOptionsFactory.fromArgs(args:
> _*).withValidation().create().as(classOf[JobOptions])
>
> if (options.getJobName == "test") {
> throw new RuntimeException("oops")
> }
>
> val pipeline = Pipeline.create(options)
>
> if (options.getJobName == "test") {
> throw new RuntimeException("oops")
> }
>
> val updater = new AerospikeUpdater
>
> pipeline
> .apply("Read report", TextIO.read().from(options.getReportPath))
> .apply("Update Aerospike", ParDo.of(new UpdateAerospike(updater)))
>
> if (options.getJobName == "test") {
> throw new RuntimeException("oops")
> }
>
> pipeline.run()
> }
>
> [image: image.png]
>
>
>
> On Tue, Feb 1, 2022 at 7:05 PM Luke Cwik <lc...@google.com> wrote:
>
>> Validating at pipeline construction time makes sense unless you are
>> building a template. In this case your validating that user is set and your
>> getting an exception. If user is necessary then you can do something like:
>> if (!options.getUser().isAccessible()) {
>> throw new IllegalArgumentException("Required property user is unset.");
>> } else if (options.getUser().get() != X) {
>> throw new IllegalArgumentException("User property does not satisfy X");
>> }
>> If user is optional then:
>> if (options.getUser().isAccessible() && option.getUser().get() != X) {
>> throw new IllegalArgumentException("User property does not satisfy X");
>> }
>>
>>
>> On Tue, Feb 1, 2022 at 6:17 AM Ori Popowski <or...@gmail.com> wrote:
>>
>>> Thanks.
>>>
>>> I am trying a fail-fast approach here, so I thought the best thing is to
>>> validate the options before starting the pipeline.
>>>
>>> I think validating the options in processElements on each element is
>>> wasteful since the validation code will run many times instead of just
>>> once. The options are not available for DoFn's @Setup method - only for
>>> @StartBundle. Is there any better place to validate the options except for
>>> @StartBundle?
>>>
>>> Thanks
>>>
>>> On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <ch...@doit-intl.com> wrote:
>>>
>>>> Hi Ori,
>>>>
>>>> The error message hints that you are trying to use a ValueProvider
>>>> during pipeline construction. You should only call the .get() method
>>>> from code that is run during pipeline execution, i.e. in a lambda for
>>>> MapElements or in a DoFn's processElements.
>>>>
>>>> Check the Dataflow docs on this topic here [1]. It also spells out your
>>>> error message.
>>>>
>>>> Best,
>>>> Chris
>>>>
>>>>
>>>> [1]
>>>> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>>>>
>>>> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:
>>>>
>>>>> Hi all!
>>>>>
>>>>> I am extending PipelineOptions and I have getters/setters for several
>>>>> properties which are passed to my job from Dataflow. The parameters are
>>>>> indeed passed correctly and I can access them from my pipeline.
>>>>>
>>>>> However, I'd like to have some custom logic for my parameters which
>>>>> cannot be expressed by a metadata file with regexes, or
>>>>> @Validation.Required annotation.
>>>>>
>>>>> I tried to add the following code:
>>>>>
>>>>> val options = PipelineOptionsFactory.fromArgs(args:
>>>>> _*).withValidation().create().as(classOf[JobOptions])
>>>>>
>>>>> if (options.getUserId().get() == "…" && …) {
>>>>> throw new IllegalArgumentException("oops")
>>>>> }
>>>>>
>>>>> Unfortunately, I am getting this exception when I am staging the
>>>>> pipeline to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>>>>
>>>>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>>>>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>>>>> --region=europe-west3
>>>>> [error] Exception in thread "main" java.lang.IllegalStateException:
>>>>> Value only available at runtime, but accessed from a non-runtime context:
>>>>> RuntimeValueProvider{propertyName=userId, default=null}
>>>>> [error] at
>>>>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>>>>> [error] at walkme.Main$.main(Main.scala:13)
>>>>> [error] at walkme.Main.main(Main.scala)
>>>>>
>>>>> What is the recommended way to achieve what I described?
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>>
>>>> --
>>>> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
>>>> <ch...@doit-intl.com>
>>>> Staff Cloud Architect
>>>> EMEA North
>>>> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
>>>> <https://twitter.com/doitint> [image: linkedin]
>>>> <https://www.linkedin.com/company/doitintl> [image: medium]
>>>> <https://blog.doit-intl.com/>
>>>>
>>>
Re: Using custom validation logic on PipelineOptions
Posted by Ori Popowski <or...@gmail.com>.
Tried that. It doesn't work. Unfortunately Dataflow happily ignores
everything that happens outside the pipeline itself. I even threw an
exception that was ignored:
The Job Name is indeed "test" as you can see in the screenshot. It doesn't
matter what I do, it just ignores it. Even logger.error and
System.err.println.
def main(args: Array[String]): Unit = {
PipelineOptionsFactory.register(classOf[JobOptions])
val options = PipelineOptionsFactory.fromArgs(args:
_*).withValidation().create().as(classOf[JobOptions])
if (options.getJobName == "test") {
throw new RuntimeException("oops")
}
val pipeline = Pipeline.create(options)
if (options.getJobName == "test") {
throw new RuntimeException("oops")
}
val updater = new AerospikeUpdater
pipeline
.apply("Read report", TextIO.read().from(options.getReportPath))
.apply("Update Aerospike", ParDo.of(new UpdateAerospike(updater)))
if (options.getJobName == "test") {
throw new RuntimeException("oops")
}
pipeline.run()
}
[image: image.png]
On Tue, Feb 1, 2022 at 7:05 PM Luke Cwik <lc...@google.com> wrote:
> Validating at pipeline construction time makes sense unless you are
> building a template. In this case your validating that user is set and your
> getting an exception. If user is necessary then you can do something like:
> if (!options.getUser().isAccessible()) {
> throw new IllegalArgumentException("Required property user is unset.");
> } else if (options.getUser().get() != X) {
> throw new IllegalArgumentException("User property does not satisfy X");
> }
> If user is optional then:
> if (options.getUser().isAccessible() && option.getUser().get() != X) {
> throw new IllegalArgumentException("User property does not satisfy X");
> }
>
>
> On Tue, Feb 1, 2022 at 6:17 AM Ori Popowski <or...@gmail.com> wrote:
>
>> Thanks.
>>
>> I am trying a fail-fast approach here, so I thought the best thing is to
>> validate the options before starting the pipeline.
>>
>> I think validating the options in processElements on each element is
>> wasteful since the validation code will run many times instead of just
>> once. The options are not available for DoFn's @Setup method - only for
>> @StartBundle. Is there any better place to validate the options except for
>> @StartBundle?
>>
>> Thanks
>>
>> On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <ch...@doit-intl.com> wrote:
>>
>>> Hi Ori,
>>>
>>> The error message hints that you are trying to use a ValueProvider
>>> during pipeline construction. You should only call the .get() method
>>> from code that is run during pipeline execution, i.e. in a lambda for
>>> MapElements or in a DoFn's processElements.
>>>
>>> Check the Dataflow docs on this topic here [1]. It also spells out your
>>> error message.
>>>
>>> Best,
>>> Chris
>>>
>>>
>>> [1]
>>> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>>>
>>> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:
>>>
>>>> Hi all!
>>>>
>>>> I am extending PipelineOptions and I have getters/setters for several
>>>> properties which are passed to my job from Dataflow. The parameters are
>>>> indeed passed correctly and I can access them from my pipeline.
>>>>
>>>> However, I'd like to have some custom logic for my parameters which
>>>> cannot be expressed by a metadata file with regexes, or
>>>> @Validation.Required annotation.
>>>>
>>>> I tried to add the following code:
>>>>
>>>> val options = PipelineOptionsFactory.fromArgs(args:
>>>> _*).withValidation().create().as(classOf[JobOptions])
>>>>
>>>> if (options.getUserId().get() == "…" && …) {
>>>> throw new IllegalArgumentException("oops")
>>>> }
>>>>
>>>> Unfortunately, I am getting this exception when I am staging the
>>>> pipeline to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>>>
>>>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>>>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>>>> --region=europe-west3
>>>> [error] Exception in thread "main" java.lang.IllegalStateException:
>>>> Value only available at runtime, but accessed from a non-runtime context:
>>>> RuntimeValueProvider{propertyName=userId, default=null}
>>>> [error] at
>>>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>>>> [error] at walkme.Main$.main(Main.scala:13)
>>>> [error] at walkme.Main.main(Main.scala)
>>>>
>>>> What is the recommended way to achieve what I described?
>>>>
>>>> Thanks!
>>>>
>>>
>>>
>>> --
>>> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
>>> <ch...@doit-intl.com>
>>> Staff Cloud Architect
>>> EMEA North
>>> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
>>> <https://twitter.com/doitint> [image: linkedin]
>>> <https://www.linkedin.com/company/doitintl> [image: medium]
>>> <https://blog.doit-intl.com/>
>>>
>>
Re: Using custom validation logic on PipelineOptions
Posted by Luke Cwik <lc...@google.com>.
Validating at pipeline construction time makes sense unless you are
building a template. In this case your validating that user is set and your
getting an exception. If user is necessary then you can do something like:
if (!options.getUser().isAccessible()) {
throw new IllegalArgumentException("Required property user is unset.");
} else if (options.getUser().get() != X) {
throw new IllegalArgumentException("User property does not satisfy X");
}
If user is optional then:
if (options.getUser().isAccessible() && option.getUser().get() != X) {
throw new IllegalArgumentException("User property does not satisfy X");
}
On Tue, Feb 1, 2022 at 6:17 AM Ori Popowski <or...@gmail.com> wrote:
> Thanks.
>
> I am trying a fail-fast approach here, so I thought the best thing is to
> validate the options before starting the pipeline.
>
> I think validating the options in processElements on each element is
> wasteful since the validation code will run many times instead of just
> once. The options are not available for DoFn's @Setup method - only for
> @StartBundle. Is there any better place to validate the options except for
> @StartBundle?
>
> Thanks
>
> On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <ch...@doit-intl.com> wrote:
>
>> Hi Ori,
>>
>> The error message hints that you are trying to use a ValueProvider during
>> pipeline construction. You should only call the .get() method from code
>> that is run during pipeline execution, i.e. in a lambda for MapElements or
>> in a DoFn's processElements.
>>
>> Check the Dataflow docs on this topic here [1]. It also spells out your
>> error message.
>>
>> Best,
>> Chris
>>
>>
>> [1]
>> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>>
>> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:
>>
>>> Hi all!
>>>
>>> I am extending PipelineOptions and I have getters/setters for several
>>> properties which are passed to my job from Dataflow. The parameters are
>>> indeed passed correctly and I can access them from my pipeline.
>>>
>>> However, I'd like to have some custom logic for my parameters which
>>> cannot be expressed by a metadata file with regexes, or
>>> @Validation.Required annotation.
>>>
>>> I tried to add the following code:
>>>
>>> val options = PipelineOptionsFactory.fromArgs(args:
>>> _*).withValidation().create().as(classOf[JobOptions])
>>>
>>> if (options.getUserId().get() == "…" && …) {
>>> throw new IllegalArgumentException("oops")
>>> }
>>>
>>> Unfortunately, I am getting this exception when I am staging the
>>> pipeline to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>>
>>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>>> --region=europe-west3
>>> [error] Exception in thread "main" java.lang.IllegalStateException:
>>> Value only available at runtime, but accessed from a non-runtime context:
>>> RuntimeValueProvider{propertyName=userId, default=null}
>>> [error] at
>>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>>> [error] at walkme.Main$.main(Main.scala:13)
>>> [error] at walkme.Main.main(Main.scala)
>>>
>>> What is the recommended way to achieve what I described?
>>>
>>> Thanks!
>>>
>>
>>
>> --
>> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
>> <ch...@doit-intl.com>
>> Staff Cloud Architect
>> EMEA North
>> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
>> <https://twitter.com/doitint> [image: linkedin]
>> <https://www.linkedin.com/company/doitintl> [image: medium]
>> <https://blog.doit-intl.com/>
>>
>
Re: Using custom validation logic on PipelineOptions
Posted by Ori Popowski <or...@gmail.com>.
Thanks.
I am trying a fail-fast approach here, so I thought the best thing is to
validate the options before starting the pipeline.
I think validating the options in processElements on each element is
wasteful since the validation code will run many times instead of just
once. The options are not available for DoFn's @Setup method - only for
@StartBundle. Is there any better place to validate the options except for
@StartBundle?
Thanks
On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <ch...@doit-intl.com> wrote:
> Hi Ori,
>
> The error message hints that you are trying to use a ValueProvider during
> pipeline construction. You should only call the .get() method from code
> that is run during pipeline execution, i.e. in a lambda for MapElements or
> in a DoFn's processElements.
>
> Check the Dataflow docs on this topic here [1]. It also spells out your
> error message.
>
> Best,
> Chris
>
>
> [1]
> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>
> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:
>
>> Hi all!
>>
>> I am extending PipelineOptions and I have getters/setters for several
>> properties which are passed to my job from Dataflow. The parameters are
>> indeed passed correctly and I can access them from my pipeline.
>>
>> However, I'd like to have some custom logic for my parameters which
>> cannot be expressed by a metadata file with regexes, or
>> @Validation.Required annotation.
>>
>> I tried to add the following code:
>>
>> val options = PipelineOptionsFactory.fromArgs(args:
>> _*).withValidation().create().as(classOf[JobOptions])
>>
>> if (options.getUserId().get() == "…" && …) {
>> throw new IllegalArgumentException("oops")
>> }
>>
>> Unfortunately, I am getting this exception when I am staging the pipeline
>> to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>
>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>> --region=europe-west3
>> [error] Exception in thread "main" java.lang.IllegalStateException: Value
>> only available at runtime, but accessed from a non-runtime context:
>> RuntimeValueProvider{propertyName=userId, default=null}
>> [error] at
>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>> [error] at walkme.Main$.main(Main.scala:13)
>> [error] at walkme.Main.main(Main.scala)
>>
>> What is the recommended way to achieve what I described?
>>
>> Thanks!
>>
>
>
> --
> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
> <ch...@doit-intl.com>
> Staff Cloud Architect
> EMEA North
> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
> <https://twitter.com/doitint> [image: linkedin]
> <https://www.linkedin.com/company/doitintl> [image: medium]
> <https://blog.doit-intl.com/>
>
Re: Using custom validation logic on PipelineOptions
Posted by Chris Soujon <ch...@doit-intl.com>.
Hi Ori,
The error message hints that you are trying to use a ValueProvider during
pipeline construction. You should only call the .get() method from code
that is run during pipeline execution, i.e. in a lambda for MapElements or
in a DoFn's processElements.
Check the Dataflow docs on this topic here [1]. It also spells out your
error message.
Best,
Chris
[1]
https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <or...@gmail.com> wrote:
> Hi all!
>
> I am extending PipelineOptions and I have getters/setters for several
> properties which are passed to my job from Dataflow. The parameters are
> indeed passed correctly and I can access them from my pipeline.
>
> However, I'd like to have some custom logic for my parameters which cannot
> be expressed by a metadata file with regexes, or @Validation.Required
> annotation.
>
> I tried to add the following code:
>
> val options = PipelineOptionsFactory.fromArgs(args:
> _*).withValidation().create().as(classOf[JobOptions])
>
> if (options.getUserId().get() == "…" && …) {
> throw new IllegalArgumentException("oops")
> }
>
> Unfortunately, I am getting this exception when I am staging the pipeline
> to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>
> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
> --region=europe-west3
> [error] Exception in thread "main" java.lang.IllegalStateException: Value
> only available at runtime, but accessed from a non-runtime context:
> RuntimeValueProvider{propertyName=userId, default=null}
> [error] at
> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
> [error] at walkme.Main$.main(Main.scala:13)
> [error] at walkme.Main.main(Main.scala)
>
> What is the recommended way to achieve what I described?
>
> Thanks!
>
--
[image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
<ch...@doit-intl.com>
Staff Cloud Architect
EMEA North
[image: facebook] <https://fb.me/DoIT.International> [image: twitter]
<https://twitter.com/doitint> [image: linkedin]
<https://www.linkedin.com/company/doitintl> [image: medium]
<https://blog.doit-intl.com/>