You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Rajath BK <ra...@gmail.com> on 2023/10/20 13:04:34 UTC

Indeterminate behavior while accessing side inputs

Hi ,
   We are using a few side Inputs access data that would be used in
filtering out elements in the pipelines. The source of the side input data
is redis instance in Google cloud memory store.
The jobs are deployed using the generated pipeline templates via the
Dataflow service APIs.

The side inputs are generated as below
PCollectionView<Map<String, String>> targetCountryByJourneyId =
p.apply("Generate a sequence for extracting resource_id->targetCountry
map", GenerateSequence.from(0).withRate(1,

Duration.standardSeconds(Long.parseLong(options.getRulesRefreshDuration().get()))))
        .apply("Get all rules from Redis for resourceId->targetCountry
map", ParDo.of(new
GetAllRulesFromRedis(Integer.parseInt(options.getRedisPort().get()),
options.getRedisHost().get(), statsdHost, statsdPort)))
        .apply("Transform all Rules into journeyId->targetCountry map",
ParDo.of(new TransformRulesAsJourneyIdToMetadataMap(statsdHost, statsdPort,
Rule.TARGET_COUNTRY)))
        .apply("Window journeyId->targetCountry map", Window.<Map<String,
String>>into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                .discardingFiredPanes())
        .apply("singleton view of journeyId -> targetCountry map",
View.asSingleton());

and accessed as below

PCollection<UserData> qualifiedUser = userValidatedEntry.
apply("Eliminate user by target country",
        ParDo.of(new EliminateUserByTargetCountry(
                options.getProjectNameForBT().get(),
                options.getInstanceNameForBT().get(),
                bigTableAppProfileId,
                targetCountryByJourneyId,
                statsdHost,
                statsdPort)
        )
                .withSideInputs(targetCountryByJourneyId));

The pipelines are being run on Google cloud Dataflow and
We have been noticing 2 kinds of issues
1. Despite closing the window on the side input collection when the first
element occurs, we are seeing this error. Strange thing is we don't see
these errors occurring across all the workers in the job and we have also
noticed this issue to get resolved during new deployments and only to
resurface later.

java.lang.IllegalArgumentException: PCollection with more than one element
accessed as a singleton view. Consider using
Combine.globally().asSingleton() to combine the PCollection into a single
value at
org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:434)
at
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:524)
at
org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:493)
at
org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:2051)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
at
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
at
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
at
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1450)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
at
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1125)
at
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

2. Secondly, we have noticed the side input to not contain data even when
the underlying data exists. This side input view being a map, when this
occurs, we see empty keys and breaks our code while accessing it.

Has anyone faced similar issues ? I would greatly help if someone can shed
some light on this seemingly non-deterministic behavior of the side inputs
when created as Singletons.

-Thanks in advance
Rajath

Re: Indeterminate behavior while accessing side inputs

Posted by Bruno Volpato via user <us...@beam.apache.org>.
Hi Rajath,

I think you need to guarantee that the PCollection will have a single
element, so you can use .asSingleton().

A possible approach is getting the latest for the window prior to
.asSingleton():

PCollectionView<Map<String, String>> targetCountryByJourneyId =
p.apply("Generate a sequence for extracting resource_id->targetCountry
map", GenerateSequence.from(0).withRate(1,

Duration.standardSeconds(Long.parseLong(options.getRulesRefreshDuration().get()))))
        .apply("Get all rules from Redis for resourceId->targetCountry
map", ParDo.of(new
GetAllRulesFromRedis(Integer.parseInt(options.getRedisPort().get()),
options.getRedisHost().get(), statsdHost, statsdPort)))
        .apply("Transform all Rules into journeyId->targetCountry map",
ParDo.of(new TransformRulesAsJourneyIdToMetadataMap(statsdHost, statsdPort,
Rule.TARGET_COUNTRY)))
        .apply("Window journeyId->targetCountry map", Window.<Map<String,
String>>into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                .discardingFiredPanes())

*        .apply("Combine to Latest", Latest.globally())*
.apply("singleton view of journeyId -> targetCountry map",
View.asSingleton());


Best,
Bruno

On Fri, Oct 20, 2023 at 9:05 AM Rajath BK <ra...@gmail.com> wrote:

>
> Hi ,
>    We are using a few side Inputs access data that would be used in
> filtering out elements in the pipelines. The source of the side input data
> is redis instance in Google cloud memory store.
> The jobs are deployed using the generated pipeline templates via the
> Dataflow service APIs.
>
> The side inputs are generated as below
> PCollectionView<Map<String, String>> targetCountryByJourneyId =
> p.apply("Generate a sequence for extracting resource_id->targetCountry
> map", GenerateSequence.from(0).withRate(1,
>
> Duration.standardSeconds(Long.parseLong(options.getRulesRefreshDuration().get()))))
>         .apply("Get all rules from Redis for resourceId->targetCountry
> map", ParDo.of(new
> GetAllRulesFromRedis(Integer.parseInt(options.getRedisPort().get()),
> options.getRedisHost().get(), statsdHost, statsdPort)))
>         .apply("Transform all Rules into journeyId->targetCountry map",
> ParDo.of(new TransformRulesAsJourneyIdToMetadataMap(statsdHost, statsdPort,
> Rule.TARGET_COUNTRY)))
>         .apply("Window journeyId->targetCountry map", Window.<Map<String,
> String>>into(new GlobalWindows())
>
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>                 .discardingFiredPanes())
>         .apply("singleton view of journeyId -> targetCountry map",
> View.asSingleton());
>
> and accessed as below
>
> PCollection<UserData> qualifiedUser = userValidatedEntry.
> apply("Eliminate user by target country",
>         ParDo.of(new EliminateUserByTargetCountry(
>                 options.getProjectNameForBT().get(),
>                 options.getInstanceNameForBT().get(),
>                 bigTableAppProfileId,
>                 targetCountryByJourneyId,
>                 statsdHost,
>                 statsdPort)
>         )
>                 .withSideInputs(targetCountryByJourneyId));
>
> The pipelines are being run on Google cloud Dataflow and
> We have been noticing 2 kinds of issues
> 1. Despite closing the window on the side input collection when the first
> element occurs, we are seeing this error. Strange thing is we don't see
> these errors occurring across all the workers in the job and we have also
> noticed this issue to get resolved during new deployments and only to
> resurface later.
>
> java.lang.IllegalArgumentException: PCollection with more than one element
> accessed as a singleton view. Consider using
> Combine.globally().asSingleton() to combine the PCollection into a single
> value at
> org.apache.beam.sdk.transforms.View$SingletonCombineFn.apply(View.java:434)
> at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:524)
> at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.addInput(Combine.java:493)
> at
> org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:2051)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
> at
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
> at
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:43)
> at
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
> at
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> at
> org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:137)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
> at
> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
> at
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1450)
> at
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:165)
> at
> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1125)
> at
> org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:133)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> 2. Secondly, we have noticed the side input to not contain data even when
> the underlying data exists. This side input view being a map, when this
> occurs, we see empty keys and breaks our code while accessing it.
>
> Has anyone faced similar issues ? I would greatly help if someone can shed
> some light on this seemingly non-deterministic behavior of the side inputs
> when created as Singletons.
>
> -Thanks in advance
> Rajath
>