You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ravi Kapoor <ka...@gmail.com> on 2022/06/13 15:09:39 UTC

Chained Job Graph Apache Beam | Dataflow

Hi Team,

I am currently using Beam in my project with Dataflow Runner.
I am trying to create a pipeline where the data flows from the source to
staging then to target such as:

A (Source) -> B(Staging) -> C (Target)

When I create a pipeline as below:

PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
        .from("project:dataset.table_A"));

table_A_records.apply(BigQueryIO.writeTableRows().
        to("project:dataset.table_B")
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
        .from("project:dataset.table_B"));
table_B_records.apply(BigQueryIO.writeTableRows().
        to("project:dataset.table_C")
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();


It basically creates two parallel job graphs in dataflow instead creating a
transformation as expected:
A -> B
B -> C
I needed to create data pipeline which flows the data in chain like:
                     D
                   /
A -> B -> C
                  \
                    E
Is there a way to achieve this transformation in between source and target
tables?

Thanks,
Ravi

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Daniel Collins <dp...@google.com>.
The UI does not know anything about external destinations. Table C in
BigQuery is outside the beam model. If you want to construct this pipeline
within the beam model, you can combine the PCollections from A and B into a
single PCollection using Flatten. You can then perform whatever
modifications you want on the PCollection and write that directly to table
D. Table C is not needed. If you want to preserve table C as an output, you
can also write the post-flatten PCollection to table C.

-Daniel

On Wed, Jun 15, 2022 at 3:27 PM Ravi Kapoor <ka...@gmail.com> wrote:

> Hi Daniel,
>
> I have a use case where I join two tables say A and B and write the joined
> Collection to C.
> Then I would like to filter some records on C and put it to another table
> say D.
> So, the pipeline on Dataflow UI should look like this right?
>
> A
>    \
>     C -> D
>    /
> B
>
> However, the pipeline is writing C -> D in parallel.
> How can this pipeline run in parallel as data has not been pushed yet to C
> by the previous pipeline?
>
> Even when I ran this pipeline, Table D did not get any records inserted as
> well which is apparent.
> Can you help me with this use case?
>
> Thanks,
> Ravi
>
>
>
> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dp...@google.com>
> wrote:
>
>> Can you speak to what specifically you want to be different? The job
>> graph you see, with the A -> B and B-> C being separate is an accurate
>> reflection of your pipeline. table_B is outside of the beam model, by
>> pushing your data there, Dataflow has no ability to identify that no
>> manipulation of data is happening at table_B.
>>
>> If you want to just process data from A to destinations D and E, while
>> writing an intermediate output to table_B, you should just remove the read
>> from table B and modify table_A_records again directly. If that is not what
>> you want, you would need to explain more specifically what you want that is
>> different. Is it a pure UI change? Is it a functional change?
>>
>> -Daniel
>>
>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com>
>> wrote:
>>
>>> Team,
>>> Any update on this?
>>>
>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am currently using Beam in my project with Dataflow Runner.
>>>> I am trying to create a pipeline where the data flows from the
>>>> source to staging then to target such as:
>>>>
>>>> A (Source) -> B(Staging) -> C (Target)
>>>>
>>>> When I create a pipeline as below:
>>>>
>>>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>>>         .from("project:dataset.table_A"));
>>>>
>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>>         to("project:dataset.table_B")
>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>
>>>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>>>         .from("project:dataset.table_B"));
>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>>         to("project:dataset.table_C")
>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>> p.run().waitUntilFinish();
>>>>
>>>>
>>>> It basically creates two parallel job graphs in dataflow instead
>>>> creating a transformation as expected:
>>>> A -> B
>>>> B -> C
>>>> I needed to create data pipeline which flows the data in chain like:
>>>>                      D
>>>>                    /
>>>> A -> B -> C
>>>>                   \
>>>>                     E
>>>> Is there a way to achieve this transformation in between source and
>>>> target tables?
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ravi Kapoor
>>> +91-9818764564 <+91%2098187%2064564>
>>> kapoorravi63@gmail.com
>>>
>>
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564 <+91%2098187%2064564>
> kapoorravi63@gmail.com
>

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Evan Galpin <eg...@apache.org>.
It may also be helpful to explore CoGroupByKey as a way of joining data,
though depending on the shape of the data doing so may not fit in mem:
https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/

- Evan

On Wed, Jun 15, 2022 at 3:45 PM Bruno Volpato <br...@gmail.com>
wrote:

> Hello,
>
> I am not sure what is the context behind your join, but I just wanted to
> point out that Beam SQL [1] or the Join-library extension [2] may be
> helpful in your scenario to avoid changing semantics or the need to
> orchestrate your jobs outside Beam.
>
> [1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/
> [2] https://beam.apache.org/documentation/sdks/java-extensions/
>
>
> Best,
> Bruno
>
>
>
> On Wed, Jun 15, 2022 at 3:35 PM Jack McCluskey <jr...@google.com>
> wrote:
>
>> Hey Ravi,
>>
>> The problem you're running into is that the act of writing data to a
>> table and reading from it are not joined actions in the Beam model. There's
>> no connecting PCollection tying those together, so they are split and run
>> in parallel. If you want to do this and need the data written to C, you
>> should re-use the PCollection written to C in your filtering step instead
>> of reading the data from C again. That should produce the graph you're
>> looking for in a batch context.
>>
>> Thanks,
>>
>> Jack McCluskey
>>
>> On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor <ka...@gmail.com>
>> wrote:
>>
>>> FYI
>>>
>>> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi Daniel,
>>>>
>>>> I have a use case where I join two tables say A and B and write the
>>>> joined Collection to C.
>>>> Then I would like to filter some records on C and put it to another
>>>> table say D.
>>>> So, the pipeline on Dataflow UI should look like this right?
>>>>
>>>> A
>>>>    \
>>>>     C -> D
>>>>    /
>>>> B
>>>>
>>>> However, the pipeline is writing C -> D in parallel.
>>>> How can this pipeline run in parallel as data has not been pushed yet
>>>> to C by the previous pipeline?
>>>>
>>>> Even when I ran this pipeline, Table D did not get any records inserted
>>>> as well which is apparent.
>>>> Can you help me with this use case?
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>>
>>>>
>>>> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dp...@google.com>
>>>> wrote:
>>>>
>>>>> Can you speak to what specifically you want to be different? The job
>>>>> graph you see, with the A -> B and B-> C being separate is an accurate
>>>>> reflection of your pipeline. table_B is outside of the beam model, by
>>>>> pushing your data there, Dataflow has no ability to identify that no
>>>>> manipulation of data is happening at table_B.
>>>>>
>>>>> If you want to just process data from A to destinations D and E, while
>>>>> writing an intermediate output to table_B, you should just remove the read
>>>>> from table B and modify table_A_records again directly. If that is not what
>>>>> you want, you would need to explain more specifically what you want that is
>>>>> different. Is it a pure UI change? Is it a functional change?
>>>>>
>>>>> -Daniel
>>>>>
>>>>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Team,
>>>>>> Any update on this?
>>>>>>
>>>>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> I am currently using Beam in my project with Dataflow Runner.
>>>>>>> I am trying to create a pipeline where the data flows from the
>>>>>>> source to staging then to target such as:
>>>>>>>
>>>>>>> A (Source) -> B(Staging) -> C (Target)
>>>>>>>
>>>>>>> When I create a pipeline as below:
>>>>>>>
>>>>>>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>>>>>>         .from("project:dataset.table_A"));
>>>>>>>
>>>>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>>>>>         to("project:dataset.table_B")
>>>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>>>
>>>>>>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>>>>>>         .from("project:dataset.table_B"));
>>>>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>>>>>         to("project:dataset.table_C")
>>>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>>> p.run().waitUntilFinish();
>>>>>>>
>>>>>>>
>>>>>>> It basically creates two parallel job graphs in dataflow instead
>>>>>>> creating a transformation as expected:
>>>>>>> A -> B
>>>>>>> B -> C
>>>>>>> I needed to create data pipeline which flows the data in chain like:
>>>>>>>                      D
>>>>>>>                    /
>>>>>>> A -> B -> C
>>>>>>>                   \
>>>>>>>                     E
>>>>>>> Is there a way to achieve this transformation in between source and
>>>>>>> target tables?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ravi
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Ravi Kapoor
>>>>>> +91-9818764564 <+91%2098187%2064564>
>>>>>> kapoorravi63@gmail.com
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Ravi Kapoor
>>>> +91-9818764564 <+91%2098187%2064564>
>>>> kapoorravi63@gmail.com
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ravi Kapoor
>>> +91-9818764564 <+91%2098187%2064564>
>>> kapoorravi63@gmail.com
>>>
>>

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Evan Galpin <eg...@apache.org>.
It may also be helpful to explore CoGroupByKey as a way of joining data,
though depending on the shape of the data doing so may not fit in mem:
https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/

- Evan

On Wed, Jun 15, 2022 at 3:45 PM Bruno Volpato <br...@gmail.com>
wrote:

> Hello,
>
> I am not sure what is the context behind your join, but I just wanted to
> point out that Beam SQL [1] or the Join-library extension [2] may be
> helpful in your scenario to avoid changing semantics or the need to
> orchestrate your jobs outside Beam.
>
> [1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/
> [2] https://beam.apache.org/documentation/sdks/java-extensions/
>
>
> Best,
> Bruno
>
>
>
> On Wed, Jun 15, 2022 at 3:35 PM Jack McCluskey <jr...@google.com>
> wrote:
>
>> Hey Ravi,
>>
>> The problem you're running into is that the act of writing data to a
>> table and reading from it are not joined actions in the Beam model. There's
>> no connecting PCollection tying those together, so they are split and run
>> in parallel. If you want to do this and need the data written to C, you
>> should re-use the PCollection written to C in your filtering step instead
>> of reading the data from C again. That should produce the graph you're
>> looking for in a batch context.
>>
>> Thanks,
>>
>> Jack McCluskey
>>
>> On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor <ka...@gmail.com>
>> wrote:
>>
>>> FYI
>>>
>>> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi Daniel,
>>>>
>>>> I have a use case where I join two tables say A and B and write the
>>>> joined Collection to C.
>>>> Then I would like to filter some records on C and put it to another
>>>> table say D.
>>>> So, the pipeline on Dataflow UI should look like this right?
>>>>
>>>> A
>>>>    \
>>>>     C -> D
>>>>    /
>>>> B
>>>>
>>>> However, the pipeline is writing C -> D in parallel.
>>>> How can this pipeline run in parallel as data has not been pushed yet
>>>> to C by the previous pipeline?
>>>>
>>>> Even when I ran this pipeline, Table D did not get any records inserted
>>>> as well which is apparent.
>>>> Can you help me with this use case?
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>>
>>>>
>>>> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dp...@google.com>
>>>> wrote:
>>>>
>>>>> Can you speak to what specifically you want to be different? The job
>>>>> graph you see, with the A -> B and B-> C being separate is an accurate
>>>>> reflection of your pipeline. table_B is outside of the beam model, by
>>>>> pushing your data there, Dataflow has no ability to identify that no
>>>>> manipulation of data is happening at table_B.
>>>>>
>>>>> If you want to just process data from A to destinations D and E, while
>>>>> writing an intermediate output to table_B, you should just remove the read
>>>>> from table B and modify table_A_records again directly. If that is not what
>>>>> you want, you would need to explain more specifically what you want that is
>>>>> different. Is it a pure UI change? Is it a functional change?
>>>>>
>>>>> -Daniel
>>>>>
>>>>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Team,
>>>>>> Any update on this?
>>>>>>
>>>>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>> I am currently using Beam in my project with Dataflow Runner.
>>>>>>> I am trying to create a pipeline where the data flows from the
>>>>>>> source to staging then to target such as:
>>>>>>>
>>>>>>> A (Source) -> B(Staging) -> C (Target)
>>>>>>>
>>>>>>> When I create a pipeline as below:
>>>>>>>
>>>>>>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>>>>>>         .from("project:dataset.table_A"));
>>>>>>>
>>>>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>>>>>         to("project:dataset.table_B")
>>>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>>>
>>>>>>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>>>>>>         .from("project:dataset.table_B"));
>>>>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>>>>>         to("project:dataset.table_C")
>>>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>>> p.run().waitUntilFinish();
>>>>>>>
>>>>>>>
>>>>>>> It basically creates two parallel job graphs in dataflow instead
>>>>>>> creating a transformation as expected:
>>>>>>> A -> B
>>>>>>> B -> C
>>>>>>> I needed to create data pipeline which flows the data in chain like:
>>>>>>>                      D
>>>>>>>                    /
>>>>>>> A -> B -> C
>>>>>>>                   \
>>>>>>>                     E
>>>>>>> Is there a way to achieve this transformation in between source and
>>>>>>> target tables?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ravi
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>> Ravi Kapoor
>>>>>> +91-9818764564 <+91%2098187%2064564>
>>>>>> kapoorravi63@gmail.com
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Ravi Kapoor
>>>> +91-9818764564 <+91%2098187%2064564>
>>>> kapoorravi63@gmail.com
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ravi Kapoor
>>> +91-9818764564 <+91%2098187%2064564>
>>> kapoorravi63@gmail.com
>>>
>>

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Bruno Volpato <br...@gmail.com>.
Hello,

I am not sure what is the context behind your join, but I just wanted to
point out that Beam SQL [1] or the Join-library extension [2] may be
helpful in your scenario to avoid changing semantics or the need to
orchestrate your jobs outside Beam.

[1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/
[2] https://beam.apache.org/documentation/sdks/java-extensions/


Best,
Bruno



On Wed, Jun 15, 2022 at 3:35 PM Jack McCluskey <jr...@google.com>
wrote:

> Hey Ravi,
>
> The problem you're running into is that the act of writing data to a table
> and reading from it are not joined actions in the Beam model. There's no
> connecting PCollection tying those together, so they are split and run in
> parallel. If you want to do this and need the data written to C, you should
> re-use the PCollection written to C in your filtering step instead of
> reading the data from C again. That should produce the graph you're looking
> for in a batch context.
>
> Thanks,
>
> Jack McCluskey
>
> On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor <ka...@gmail.com>
> wrote:
>
>> FYI
>>
>> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <ka...@gmail.com>
>> wrote:
>>
>>> Hi Daniel,
>>>
>>> I have a use case where I join two tables say A and B and write the
>>> joined Collection to C.
>>> Then I would like to filter some records on C and put it to another
>>> table say D.
>>> So, the pipeline on Dataflow UI should look like this right?
>>>
>>> A
>>>    \
>>>     C -> D
>>>    /
>>> B
>>>
>>> However, the pipeline is writing C -> D in parallel.
>>> How can this pipeline run in parallel as data has not been pushed yet to
>>> C by the previous pipeline?
>>>
>>> Even when I ran this pipeline, Table D did not get any records inserted
>>> as well which is apparent.
>>> Can you help me with this use case?
>>>
>>> Thanks,
>>> Ravi
>>>
>>>
>>>
>>> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> Can you speak to what specifically you want to be different? The job
>>>> graph you see, with the A -> B and B-> C being separate is an accurate
>>>> reflection of your pipeline. table_B is outside of the beam model, by
>>>> pushing your data there, Dataflow has no ability to identify that no
>>>> manipulation of data is happening at table_B.
>>>>
>>>> If you want to just process data from A to destinations D and E, while
>>>> writing an intermediate output to table_B, you should just remove the read
>>>> from table B and modify table_A_records again directly. If that is not what
>>>> you want, you would need to explain more specifically what you want that is
>>>> different. Is it a pure UI change? Is it a functional change?
>>>>
>>>> -Daniel
>>>>
>>>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Team,
>>>>> Any update on this?
>>>>>
>>>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> I am currently using Beam in my project with Dataflow Runner.
>>>>>> I am trying to create a pipeline where the data flows from the
>>>>>> source to staging then to target such as:
>>>>>>
>>>>>> A (Source) -> B(Staging) -> C (Target)
>>>>>>
>>>>>> When I create a pipeline as below:
>>>>>>
>>>>>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>>>>>         .from("project:dataset.table_A"));
>>>>>>
>>>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>>>>         to("project:dataset.table_B")
>>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>>
>>>>>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>>>>>         .from("project:dataset.table_B"));
>>>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>>>>         to("project:dataset.table_C")
>>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>> p.run().waitUntilFinish();
>>>>>>
>>>>>>
>>>>>> It basically creates two parallel job graphs in dataflow instead
>>>>>> creating a transformation as expected:
>>>>>> A -> B
>>>>>> B -> C
>>>>>> I needed to create data pipeline which flows the data in chain like:
>>>>>>                      D
>>>>>>                    /
>>>>>> A -> B -> C
>>>>>>                   \
>>>>>>                     E
>>>>>> Is there a way to achieve this transformation in between source and
>>>>>> target tables?
>>>>>>
>>>>>> Thanks,
>>>>>> Ravi
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Ravi Kapoor
>>>>> +91-9818764564 <+91%2098187%2064564>
>>>>> kapoorravi63@gmail.com
>>>>>
>>>>
>>>
>>> --
>>> Thanks,
>>> Ravi Kapoor
>>> +91-9818764564 <+91%2098187%2064564>
>>> kapoorravi63@gmail.com
>>>
>>
>>
>> --
>> Thanks,
>> Ravi Kapoor
>> +91-9818764564 <+91%2098187%2064564>
>> kapoorravi63@gmail.com
>>
>

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Bruno Volpato <br...@gmail.com>.
Hello,

I am not sure what is the context behind your join, but I just wanted to
point out that Beam SQL [1] or the Join-library extension [2] may be
helpful in your scenario to avoid changing semantics or the need to
orchestrate your jobs outside Beam.

[1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/
[2] https://beam.apache.org/documentation/sdks/java-extensions/


Best,
Bruno



On Wed, Jun 15, 2022 at 3:35 PM Jack McCluskey <jr...@google.com>
wrote:

> Hey Ravi,
>
> The problem you're running into is that the act of writing data to a table
> and reading from it are not joined actions in the Beam model. There's no
> connecting PCollection tying those together, so they are split and run in
> parallel. If you want to do this and need the data written to C, you should
> re-use the PCollection written to C in your filtering step instead of
> reading the data from C again. That should produce the graph you're looking
> for in a batch context.
>
> Thanks,
>
> Jack McCluskey
>
> On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor <ka...@gmail.com>
> wrote:
>
>> FYI
>>
>> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <ka...@gmail.com>
>> wrote:
>>
>>> Hi Daniel,
>>>
>>> I have a use case where I join two tables say A and B and write the
>>> joined Collection to C.
>>> Then I would like to filter some records on C and put it to another
>>> table say D.
>>> So, the pipeline on Dataflow UI should look like this right?
>>>
>>> A
>>>    \
>>>     C -> D
>>>    /
>>> B
>>>
>>> However, the pipeline is writing C -> D in parallel.
>>> How can this pipeline run in parallel as data has not been pushed yet to
>>> C by the previous pipeline?
>>>
>>> Even when I ran this pipeline, Table D did not get any records inserted
>>> as well which is apparent.
>>> Can you help me with this use case?
>>>
>>> Thanks,
>>> Ravi
>>>
>>>
>>>
>>> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dp...@google.com>
>>> wrote:
>>>
>>>> Can you speak to what specifically you want to be different? The job
>>>> graph you see, with the A -> B and B-> C being separate is an accurate
>>>> reflection of your pipeline. table_B is outside of the beam model, by
>>>> pushing your data there, Dataflow has no ability to identify that no
>>>> manipulation of data is happening at table_B.
>>>>
>>>> If you want to just process data from A to destinations D and E, while
>>>> writing an intermediate output to table_B, you should just remove the read
>>>> from table B and modify table_A_records again directly. If that is not what
>>>> you want, you would need to explain more specifically what you want that is
>>>> different. Is it a pure UI change? Is it a functional change?
>>>>
>>>> -Daniel
>>>>
>>>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Team,
>>>>> Any update on this?
>>>>>
>>>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> I am currently using Beam in my project with Dataflow Runner.
>>>>>> I am trying to create a pipeline where the data flows from the
>>>>>> source to staging then to target such as:
>>>>>>
>>>>>> A (Source) -> B(Staging) -> C (Target)
>>>>>>
>>>>>> When I create a pipeline as below:
>>>>>>
>>>>>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>>>>>         .from("project:dataset.table_A"));
>>>>>>
>>>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>>>>         to("project:dataset.table_B")
>>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>>
>>>>>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>>>>>         .from("project:dataset.table_B"));
>>>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>>>>         to("project:dataset.table_C")
>>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>> p.run().waitUntilFinish();
>>>>>>
>>>>>>
>>>>>> It basically creates two parallel job graphs in dataflow instead
>>>>>> creating a transformation as expected:
>>>>>> A -> B
>>>>>> B -> C
>>>>>> I needed to create data pipeline which flows the data in chain like:
>>>>>>                      D
>>>>>>                    /
>>>>>> A -> B -> C
>>>>>>                   \
>>>>>>                     E
>>>>>> Is there a way to achieve this transformation in between source and
>>>>>> target tables?
>>>>>>
>>>>>> Thanks,
>>>>>> Ravi
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Ravi Kapoor
>>>>> +91-9818764564 <+91%2098187%2064564>
>>>>> kapoorravi63@gmail.com
>>>>>
>>>>
>>>
>>> --
>>> Thanks,
>>> Ravi Kapoor
>>> +91-9818764564 <+91%2098187%2064564>
>>> kapoorravi63@gmail.com
>>>
>>
>>
>> --
>> Thanks,
>> Ravi Kapoor
>> +91-9818764564 <+91%2098187%2064564>
>> kapoorravi63@gmail.com
>>
>

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Jack McCluskey <jr...@google.com>.
Hey Ravi,

The problem you're running into is that the act of writing data to a table
and reading from it are not joined actions in the Beam model. There's no
connecting PCollection tying those together, so they are split and run in
parallel. If you want to do this and need the data written to C, you should
re-use the PCollection written to C in your filtering step instead of
reading the data from C again. That should produce the graph you're looking
for in a batch context.

Thanks,

Jack McCluskey

On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor <ka...@gmail.com> wrote:

> FYI
>
> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <ka...@gmail.com>
> wrote:
>
>> Hi Daniel,
>>
>> I have a use case where I join two tables say A and B and write the
>> joined Collection to C.
>> Then I would like to filter some records on C and put it to another table
>> say D.
>> So, the pipeline on Dataflow UI should look like this right?
>>
>> A
>>    \
>>     C -> D
>>    /
>> B
>>
>> However, the pipeline is writing C -> D in parallel.
>> How can this pipeline run in parallel as data has not been pushed yet to
>> C by the previous pipeline?
>>
>> Even when I ran this pipeline, Table D did not get any records inserted
>> as well which is apparent.
>> Can you help me with this use case?
>>
>> Thanks,
>> Ravi
>>
>>
>>
>> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> Can you speak to what specifically you want to be different? The job
>>> graph you see, with the A -> B and B-> C being separate is an accurate
>>> reflection of your pipeline. table_B is outside of the beam model, by
>>> pushing your data there, Dataflow has no ability to identify that no
>>> manipulation of data is happening at table_B.
>>>
>>> If you want to just process data from A to destinations D and E, while
>>> writing an intermediate output to table_B, you should just remove the read
>>> from table B and modify table_A_records again directly. If that is not what
>>> you want, you would need to explain more specifically what you want that is
>>> different. Is it a pure UI change? Is it a functional change?
>>>
>>> -Daniel
>>>
>>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com>
>>> wrote:
>>>
>>>> Team,
>>>> Any update on this?
>>>>
>>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I am currently using Beam in my project with Dataflow Runner.
>>>>> I am trying to create a pipeline where the data flows from the
>>>>> source to staging then to target such as:
>>>>>
>>>>> A (Source) -> B(Staging) -> C (Target)
>>>>>
>>>>> When I create a pipeline as below:
>>>>>
>>>>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>>>>         .from("project:dataset.table_A"));
>>>>>
>>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>>>         to("project:dataset.table_B")
>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>
>>>>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>>>>         .from("project:dataset.table_B"));
>>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>>>         to("project:dataset.table_C")
>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>> p.run().waitUntilFinish();
>>>>>
>>>>>
>>>>> It basically creates two parallel job graphs in dataflow instead
>>>>> creating a transformation as expected:
>>>>> A -> B
>>>>> B -> C
>>>>> I needed to create data pipeline which flows the data in chain like:
>>>>>                      D
>>>>>                    /
>>>>> A -> B -> C
>>>>>                   \
>>>>>                     E
>>>>> Is there a way to achieve this transformation in between source and
>>>>> target tables?
>>>>>
>>>>> Thanks,
>>>>> Ravi
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Ravi Kapoor
>>>> +91-9818764564 <+91%2098187%2064564>
>>>> kapoorravi63@gmail.com
>>>>
>>>
>>
>> --
>> Thanks,
>> Ravi Kapoor
>> +91-9818764564 <+91%2098187%2064564>
>> kapoorravi63@gmail.com
>>
>
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564 <+91%2098187%2064564>
> kapoorravi63@gmail.com
>

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Jack McCluskey <jr...@google.com>.
Hey Ravi,

The problem you're running into is that the act of writing data to a table
and reading from it are not joined actions in the Beam model. There's no
connecting PCollection tying those together, so they are split and run in
parallel. If you want to do this and need the data written to C, you should
re-use the PCollection written to C in your filtering step instead of
reading the data from C again. That should produce the graph you're looking
for in a batch context.

Thanks,

Jack McCluskey

On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor <ka...@gmail.com> wrote:

> FYI
>
> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <ka...@gmail.com>
> wrote:
>
>> Hi Daniel,
>>
>> I have a use case where I join two tables say A and B and write the
>> joined Collection to C.
>> Then I would like to filter some records on C and put it to another table
>> say D.
>> So, the pipeline on Dataflow UI should look like this right?
>>
>> A
>>    \
>>     C -> D
>>    /
>> B
>>
>> However, the pipeline is writing C -> D in parallel.
>> How can this pipeline run in parallel as data has not been pushed yet to
>> C by the previous pipeline?
>>
>> Even when I ran this pipeline, Table D did not get any records inserted
>> as well which is apparent.
>> Can you help me with this use case?
>>
>> Thanks,
>> Ravi
>>
>>
>>
>> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dp...@google.com>
>> wrote:
>>
>>> Can you speak to what specifically you want to be different? The job
>>> graph you see, with the A -> B and B-> C being separate is an accurate
>>> reflection of your pipeline. table_B is outside of the beam model, by
>>> pushing your data there, Dataflow has no ability to identify that no
>>> manipulation of data is happening at table_B.
>>>
>>> If you want to just process data from A to destinations D and E, while
>>> writing an intermediate output to table_B, you should just remove the read
>>> from table B and modify table_A_records again directly. If that is not what
>>> you want, you would need to explain more specifically what you want that is
>>> different. Is it a pure UI change? Is it a functional change?
>>>
>>> -Daniel
>>>
>>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com>
>>> wrote:
>>>
>>>> Team,
>>>> Any update on this?
>>>>
>>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Team,
>>>>>
>>>>> I am currently using Beam in my project with Dataflow Runner.
>>>>> I am trying to create a pipeline where the data flows from the
>>>>> source to staging then to target such as:
>>>>>
>>>>> A (Source) -> B(Staging) -> C (Target)
>>>>>
>>>>> When I create a pipeline as below:
>>>>>
>>>>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>>>>         .from("project:dataset.table_A"));
>>>>>
>>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>>>         to("project:dataset.table_B")
>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>>
>>>>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>>>>         .from("project:dataset.table_B"));
>>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>>>         to("project:dataset.table_C")
>>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>> p.run().waitUntilFinish();
>>>>>
>>>>>
>>>>> It basically creates two parallel job graphs in dataflow instead
>>>>> creating a transformation as expected:
>>>>> A -> B
>>>>> B -> C
>>>>> I needed to create data pipeline which flows the data in chain like:
>>>>>                      D
>>>>>                    /
>>>>> A -> B -> C
>>>>>                   \
>>>>>                     E
>>>>> Is there a way to achieve this transformation in between source and
>>>>> target tables?
>>>>>
>>>>> Thanks,
>>>>> Ravi
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Ravi Kapoor
>>>> +91-9818764564 <+91%2098187%2064564>
>>>> kapoorravi63@gmail.com
>>>>
>>>
>>
>> --
>> Thanks,
>> Ravi Kapoor
>> +91-9818764564 <+91%2098187%2064564>
>> kapoorravi63@gmail.com
>>
>
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564 <+91%2098187%2064564>
> kapoorravi63@gmail.com
>

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Ravi Kapoor <ka...@gmail.com>.
FYI

On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor <ka...@gmail.com> wrote:

> Hi Daniel,
>
> I have a use case where I join two tables say A and B and write the joined
> Collection to C.
> Then I would like to filter some records on C and put it to another table
> say D.
> So, the pipeline on Dataflow UI should look like this right?
>
> A
>    \
>     C -> D
>    /
> B
>
> However, the pipeline is writing C -> D in parallel.
> How can this pipeline run in parallel as data has not been pushed yet to C
> by the previous pipeline?
>
> Even when I ran this pipeline, Table D did not get any records inserted as
> well which is apparent.
> Can you help me with this use case?
>
> Thanks,
> Ravi
>
>
>
> On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dp...@google.com>
> wrote:
>
>> Can you speak to what specifically you want to be different? The job
>> graph you see, with the A -> B and B-> C being separate is an accurate
>> reflection of your pipeline. table_B is outside of the beam model, by
>> pushing your data there, Dataflow has no ability to identify that no
>> manipulation of data is happening at table_B.
>>
>> If you want to just process data from A to destinations D and E, while
>> writing an intermediate output to table_B, you should just remove the read
>> from table B and modify table_A_records again directly. If that is not what
>> you want, you would need to explain more specifically what you want that is
>> different. Is it a pure UI change? Is it a functional change?
>>
>> -Daniel
>>
>> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com>
>> wrote:
>>
>>> Team,
>>> Any update on this?
>>>
>>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I am currently using Beam in my project with Dataflow Runner.
>>>> I am trying to create a pipeline where the data flows from the
>>>> source to staging then to target such as:
>>>>
>>>> A (Source) -> B(Staging) -> C (Target)
>>>>
>>>> When I create a pipeline as below:
>>>>
>>>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>>>         .from("project:dataset.table_A"));
>>>>
>>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>>         to("project:dataset.table_B")
>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>
>>>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>>>         .from("project:dataset.table_B"));
>>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>>         to("project:dataset.table_C")
>>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>> p.run().waitUntilFinish();
>>>>
>>>>
>>>> It basically creates two parallel job graphs in dataflow instead
>>>> creating a transformation as expected:
>>>> A -> B
>>>> B -> C
>>>> I needed to create data pipeline which flows the data in chain like:
>>>>                      D
>>>>                    /
>>>> A -> B -> C
>>>>                   \
>>>>                     E
>>>> Is there a way to achieve this transformation in between source and
>>>> target tables?
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Ravi Kapoor
>>> +91-9818764564 <+91%2098187%2064564>
>>> kapoorravi63@gmail.com
>>>
>>
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564
> kapoorravi63@gmail.com
>


-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorravi63@gmail.com

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Ravi Kapoor <ka...@gmail.com>.
Hi Daniel,

I have a use case where I join two tables say A and B and write the joined
Collection to C.
Then I would like to filter some records on C and put it to another table
say D.
So, the pipeline on Dataflow UI should look like this right?

A
   \
    C -> D
   /
B

However, the pipeline is writing C -> D in parallel.
How can this pipeline run in parallel as data has not been pushed yet to C
by the previous pipeline?

Even when I ran this pipeline, Table D did not get any records inserted as
well which is apparent.
Can you help me with this use case?

Thanks,
Ravi



On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins <dp...@google.com> wrote:

> Can you speak to what specifically you want to be different? The job graph
> you see, with the A -> B and B-> C being separate is an accurate reflection
> of your pipeline. table_B is outside of the beam model, by pushing your
> data there, Dataflow has no ability to identify that no manipulation of
> data is happening at table_B.
>
> If you want to just process data from A to destinations D and E, while
> writing an intermediate output to table_B, you should just remove the read
> from table B and modify table_A_records again directly. If that is not what
> you want, you would need to explain more specifically what you want that is
> different. Is it a pure UI change? Is it a functional change?
>
> -Daniel
>
> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com>
> wrote:
>
>> Team,
>> Any update on this?
>>
>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am currently using Beam in my project with Dataflow Runner.
>>> I am trying to create a pipeline where the data flows from the source to
>>> staging then to target such as:
>>>
>>> A (Source) -> B(Staging) -> C (Target)
>>>
>>> When I create a pipeline as below:
>>>
>>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>>         .from("project:dataset.table_A"));
>>>
>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>>         to("project:dataset.table_B")
>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>
>>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>>         .from("project:dataset.table_B"));
>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>>         to("project:dataset.table_C")
>>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>> p.run().waitUntilFinish();
>>>
>>>
>>> It basically creates two parallel job graphs in dataflow instead
>>> creating a transformation as expected:
>>> A -> B
>>> B -> C
>>> I needed to create data pipeline which flows the data in chain like:
>>>                      D
>>>                    /
>>> A -> B -> C
>>>                   \
>>>                     E
>>> Is there a way to achieve this transformation in between source and
>>> target tables?
>>>
>>> Thanks,
>>> Ravi
>>>
>>
>>
>> --
>> Thanks,
>> Ravi Kapoor
>> +91-9818764564 <+91%2098187%2064564>
>> kapoorravi63@gmail.com
>>
>

-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorravi63@gmail.com

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Daniel Collins <dp...@google.com>.
Can you speak to what specifically you want to be different? The job graph
you see, with the A -> B and B-> C being separate is an accurate reflection
of your pipeline. table_B is outside of the beam model, by pushing your
data there, Dataflow has no ability to identify that no manipulation of
data is happening at table_B.

If you want to just process data from A to destinations D and E, while
writing an intermediate output to table_B, you should just remove the read
from table B and modify table_A_records again directly. If that is not what
you want, you would need to explain more specifically what you want that is
different. Is it a pure UI change? Is it a functional change?

-Daniel

On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com> wrote:

> Team,
> Any update on this?
>
> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> I am currently using Beam in my project with Dataflow Runner.
>> I am trying to create a pipeline where the data flows from the source to
>> staging then to target such as:
>>
>> A (Source) -> B(Staging) -> C (Target)
>>
>> When I create a pipeline as below:
>>
>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>         .from("project:dataset.table_A"));
>>
>> table_A_records.apply(BigQueryIO.writeTableRows().
>>         to("project:dataset.table_B")
>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>
>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>         .from("project:dataset.table_B"));
>> table_B_records.apply(BigQueryIO.writeTableRows().
>>         to("project:dataset.table_C")
>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>> p.run().waitUntilFinish();
>>
>>
>> It basically creates two parallel job graphs in dataflow instead creating
>> a transformation as expected:
>> A -> B
>> B -> C
>> I needed to create data pipeline which flows the data in chain like:
>>                      D
>>                    /
>> A -> B -> C
>>                   \
>>                     E
>> Is there a way to achieve this transformation in between source and
>> target tables?
>>
>> Thanks,
>> Ravi
>>
>
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564 <+91%2098187%2064564>
> kapoorravi63@gmail.com
>

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Bruno Volpato <br...@gmail.com>.
Hello Ravi,

I am not sure I follow what you are trying to do, but
BigQueryIO.writeTableRows is a sink and will return only insertion errors.

If you already have table_A_records, why bother reading it again from
BigQuery?
You could use table_A_records to run any intermediary transforms and write
to table_C, even though you are writing that to a staging area (table_B).
In this way, you can also leverage some parallelism.


Best,
Bruno





On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor <ka...@gmail.com> wrote:

> Team,
> Any update on this?
>
> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com>
> wrote:
>
>> Hi Team,
>>
>> I am currently using Beam in my project with Dataflow Runner.
>> I am trying to create a pipeline where the data flows from the source to
>> staging then to target such as:
>>
>> A (Source) -> B(Staging) -> C (Target)
>>
>> When I create a pipeline as below:
>>
>> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>>         .from("project:dataset.table_A"));
>>
>> table_A_records.apply(BigQueryIO.writeTableRows().
>>         to("project:dataset.table_B")
>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>
>> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>>         .from("project:dataset.table_B"));
>> table_B_records.apply(BigQueryIO.writeTableRows().
>>         to("project:dataset.table_C")
>>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>> p.run().waitUntilFinish();
>>
>>
>> It basically creates two parallel job graphs in dataflow instead creating
>> a transformation as expected:
>> A -> B
>> B -> C
>> I needed to create data pipeline which flows the data in chain like:
>>                      D
>>                    /
>> A -> B -> C
>>                   \
>>                     E
>> Is there a way to achieve this transformation in between source and
>> target tables?
>>
>> Thanks,
>> Ravi
>>
>
>
> --
> Thanks,
> Ravi Kapoor
> +91-9818764564
> kapoorravi63@gmail.com
>

Re: Chained Job Graph Apache Beam | Dataflow

Posted by Ravi Kapoor <ka...@gmail.com>.
Team,
Any update on this?

On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor <ka...@gmail.com> wrote:

> Hi Team,
>
> I am currently using Beam in my project with Dataflow Runner.
> I am trying to create a pipeline where the data flows from the source to
> staging then to target such as:
>
> A (Source) -> B(Staging) -> C (Target)
>
> When I create a pipeline as below:
>
> PCollection<TableRow> table_A_records = p.apply(BigQueryIO.readTableRows()
>         .from("project:dataset.table_A"));
>
> table_A_records.apply(BigQueryIO.writeTableRows().
>         to("project:dataset.table_B")
>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>
> PCollection<TableRow> table_B_records = p.apply(BigQueryIO.readTableRows()
>         .from("project:dataset.table_B"));
> table_B_records.apply(BigQueryIO.writeTableRows().
>         to("project:dataset.table_C")
>         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
> p.run().waitUntilFinish();
>
>
> It basically creates two parallel job graphs in dataflow instead creating
> a transformation as expected:
> A -> B
> B -> C
> I needed to create data pipeline which flows the data in chain like:
>                      D
>                    /
> A -> B -> C
>                   \
>                     E
> Is there a way to achieve this transformation in between source and target
> tables?
>
> Thanks,
> Ravi
>


-- 
Thanks,
Ravi Kapoor
+91-9818764564
kapoorravi63@gmail.com