You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Seshadri Raghunathan <se...@gmail.com> on 2017/04/19 22:54:17 UTC

how to enforce dependency among transforms in a piepline

Hi,

I am trying to do something like below  :

PCollection<String> pc1=<some collection>;

pc1.apply(TextIO.Write.to("gs://BucketA/fileA.txt"))

PCollection<String> pc2=TextIO.Read.from("gs://BucketA/fileA.txt")

When I execute the above in a pipeline, I get an error when I try to read
from the GS bucket-

java.lang.IllegalStateException: Unable to find any files matching
StaticValueProvider{value=gs://beamoutput/test105A.txt}

Is there a mechanism to ensure that the read above does not happen before
the write ?

Thanks,
Seshadri

Re: how to enforce dependency among transforms in a piepline

Posted by Davor Bonaci <da...@apache.org>.
Unfortunately, adding a dependency from TextIO.Write to TextIO.Read is not
something that can be done via Beam APIs today. Stay tuned, however -- this
is something we are interested in improving, and a lot of the groundwork
has already been done.

As a workaround, you can pipe input to TextIO.Write as an input of the step
currently consuming the output of TextIO.Read.

Also, the pipeline construction exception you are seeing can be worked
around by disabling validation on the TextIO.Read step; that, however,
would make the pipeline start executing, but wouldn't guarantee correct
results.

On Wed, Apr 19, 2017 at 3:54 PM, Seshadri Raghunathan <se...@gmail.com>
wrote:

> Hi,
>
> I am trying to do something like below  :
>
> PCollection<String> pc1=<some collection>;
>
> pc1.apply(TextIO.Write.to("gs://BucketA/fileA.txt"))
>
> PCollection<String> pc2=TextIO.Read.from("gs://BucketA/fileA.txt")
>
> When I execute the above in a pipeline, I get an error when I try to read
> from the GS bucket-
>
> java.lang.IllegalStateException: Unable to find any files matching
> StaticValueProvider{value=gs://beamoutput/test105A.txt}
>
> Is there a mechanism to ensure that the read above does not happen before
> the write ?
>
> Thanks,
> Seshadri
>
>