You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by rahul patwari <ra...@gmail.com> on 2019/07/16 13:37:38 UTC

Slowly changing lookup cache as a Table in BeamSql

Hi,

we are following [*Pattern: Slowly-changing lookup cache*] from
https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1

We have a use case to read slowly changing bounded data as a PCollection
along with the main PCollection from Kafka(windowed) and use it in the
query of BeamSql.

Is it possible to design such a use case with Beam Java SDK?

Approaches followed but not Successful:

1) GenerateSequence => GlobalWindow with Data Trigger => Composite
Transform(which applies Beam I/O on the
pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
to PCollection<Row> Apply BeamSQL
Comments: Beam I/O reads data only once even though a long value is
generated from GenerateSequece with periodicity. The expectation is that
whenever a long value is generated, Beam I/O will be used to read the
latest data. Is this because of optimizations in the DAG? Can the
optimizations be overridden?

2) The pipeline is the same as approach 1. But, instead of using a
composite transform, a DoFn is used where a for loop will emit each Row of
the PCollection.
comments: The output PCollection is unbounded. But, we need a bounded
PCollection as this PCollection is used to JOIN with PCollection of each
window from Kafka. How can we convert an Unbounded PCollection to Bounded
PCollection inside a DoFn?

Are there any better Approaches?

Regards,
Rahul

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by rahul patwari <ra...@gmail.com>.
Hi Reza, Rui,

Can we use [slowly changing lookup cache] approach if the source is [HDFS
(or) HIVE] (data is changing), where the PCollection cannot fit into Memory
in BeamSQL?
This PCollection will be JOINED with Windowed PCollection Created from
Reading data in Kafka in BeamSQL.

Thanks and Regards,
Rahul

On Wed, Jul 17, 2019 at 3:07 AM Reza Rokni <re...@google.com> wrote:

> +1
>
> On Tue, 16 Jul 2019 at 20:36, Rui Wang <ru...@google.com> wrote:
>
>> Another approach is to let BeamSQL support it natively, as the title of
>> this thread says: "as a Table in BeamSQL".
>>
>> We might be able to define a table with properties that says this table
>> return a PCollectionView. By doing so we will have a trigger based
>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>> only need to construct a table and set it to SqlTransform
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>> *. *
>>
>> Create a JIRA to track this idea:
>> https://jira.apache.org/jira/browse/BEAM-7758
>>
>>
>> -Rui
>>
>>
>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>>
>>> Hi Rahul,
>>>
>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>> code example )
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>
>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>> to manually call BigQuery using the BigQuery client.
>>>
>>> Regards
>>>
>>> Reza
>>>
>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>
>>>> We have a use case to read slowly changing bounded data as a
>>>> PCollection along with the main PCollection from Kafka(windowed) and use it
>>>> in the query of BeamSql.
>>>>
>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>
>>>> Approaches followed but not Successful:
>>>>
>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>>> Transform(which applies Beam I/O on the
>>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>>> to PCollection<Row> Apply BeamSQL
>>>> Comments: Beam I/O reads data only once even though a long value is
>>>> generated from GenerateSequece with periodicity. The expectation is that
>>>> whenever a long value is generated, Beam I/O will be used to read the
>>>> latest data. Is this because of optimizations in the DAG? Can the
>>>> optimizations be overridden?
>>>>
>>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>>> the PCollection.
>>>> comments: The output PCollection is unbounded. But, we need a bounded
>>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>>> PCollection inside a DoFn?
>>>>
>>>> Are there any better Approaches?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by rahul patwari <ra...@gmail.com>.
Hi Reza, Rui,

Can we use [slowly changing lookup cache] approach if the source is [HDFS
(or) HIVE] (data is changing), where the PCollection cannot fit into Memory
in BeamSQL?
This PCollection will be JOINED with Windowed PCollection Created from
Reading data in Kafka in BeamSQL.

Thanks and Regards,
Rahul

On Wed, Jul 17, 2019 at 3:07 AM Reza Rokni <re...@google.com> wrote:

> +1
>
> On Tue, 16 Jul 2019 at 20:36, Rui Wang <ru...@google.com> wrote:
>
>> Another approach is to let BeamSQL support it natively, as the title of
>> this thread says: "as a Table in BeamSQL".
>>
>> We might be able to define a table with properties that says this table
>> return a PCollectionView. By doing so we will have a trigger based
>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>> only need to construct a table and set it to SqlTransform
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>> *. *
>>
>> Create a JIRA to track this idea:
>> https://jira.apache.org/jira/browse/BEAM-7758
>>
>>
>> -Rui
>>
>>
>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>>
>>> Hi Rahul,
>>>
>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>> code example )
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>
>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>> to manually call BigQuery using the BigQuery client.
>>>
>>> Regards
>>>
>>> Reza
>>>
>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>
>>>> We have a use case to read slowly changing bounded data as a
>>>> PCollection along with the main PCollection from Kafka(windowed) and use it
>>>> in the query of BeamSql.
>>>>
>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>
>>>> Approaches followed but not Successful:
>>>>
>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>>> Transform(which applies Beam I/O on the
>>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>>> to PCollection<Row> Apply BeamSQL
>>>> Comments: Beam I/O reads data only once even though a long value is
>>>> generated from GenerateSequece with periodicity. The expectation is that
>>>> whenever a long value is generated, Beam I/O will be used to read the
>>>> latest data. Is this because of optimizations in the DAG? Can the
>>>> optimizations be overridden?
>>>>
>>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>>> the PCollection.
>>>> comments: The output PCollection is unbounded. But, we need a bounded
>>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>>> PCollection inside a DoFn?
>>>>
>>>> Are there any better Approaches?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Reza Rokni <re...@google.com>.
+1

On Tue, 16 Jul 2019 at 20:36, Rui Wang <ru...@google.com> wrote:

> Another approach is to let BeamSQL support it natively, as the title of
> this thread says: "as a Table in BeamSQL".
>
> We might be able to define a table with properties that says this table
> return a PCollectionView. By doing so we will have a trigger based
> PCollectionView available in SQL rel nodes, thus SQL will be able to
> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
> only need to construct a table and set it to SqlTransform
> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
> *. *
>
> Create a JIRA to track this idea:
> https://jira.apache.org/jira/browse/BEAM-7758
>
>
> -Rui
>
>
> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>
>> Hi Rahul,
>>
>> FYI, that patterns is also available in the Beam docs  ( with updated
>> code example )
>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>
>> Please note in the DoFn that feeds the View.asSingleton() you will need
>> to manually call BigQuery using the BigQuery client.
>>
>> Regards
>>
>> Reza
>>
>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> We have a use case to read slowly changing bounded data as a PCollection
>>> along with the main PCollection from Kafka(windowed) and use it in the
>>> query of BeamSql.
>>>
>>> Is it possible to design such a use case with Beam Java SDK?
>>>
>>> Approaches followed but not Successful:
>>>
>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>> Transform(which applies Beam I/O on the
>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>> to PCollection<Row> Apply BeamSQL
>>> Comments: Beam I/O reads data only once even though a long value is
>>> generated from GenerateSequece with periodicity. The expectation is that
>>> whenever a long value is generated, Beam I/O will be used to read the
>>> latest data. Is this because of optimizations in the DAG? Can the
>>> optimizations be overridden?
>>>
>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>> the PCollection.
>>> comments: The output PCollection is unbounded. But, we need a bounded
>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>> PCollection inside a DoFn?
>>>
>>> Are there any better Approaches?
>>>
>>> Regards,
>>> Rahul
>>>
>>>
>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Reza Rokni <re...@google.com>.
*Can we use [slowly changing lookup cache] approach if the source is [HDFS
(or) HIVE] (data is changing), where the PCollection cannot fit into Memory
in BeamSQL?*

Can depend on the runner, in stream mode for the Dataflow runner the
sideinput needs to fit into memory.

On Wed, 17 Jul 2019 at 15:31, rahul patwari <ra...@gmail.com>
wrote:

> Hi,
>
> Please add me as a contributor to the Beam Issue Tracker. I would like to
> work on this feature. My ASF Jira Username: "rahul8383"
>
> Thanks,
> Rahul
>
>
>
> On Wed, Jul 17, 2019 at 1:06 AM Rui Wang <ru...@google.com> wrote:
>
>> Another approach is to let BeamSQL support it natively, as the title of
>> this thread says: "as a Table in BeamSQL".
>>
>> We might be able to define a table with properties that says this table
>> return a PCollectionView. By doing so we will have a trigger based
>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>> only need to construct a table and set it to SqlTransform
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>> *. *
>>
>> Create a JIRA to track this idea:
>> https://jira.apache.org/jira/browse/BEAM-7758
>>
>>
>> -Rui
>>
>>
>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>>
>>> Hi Rahul,
>>>
>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>> code example )
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>
>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>> to manually call BigQuery using the BigQuery client.
>>>
>>> Regards
>>>
>>> Reza
>>>
>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>
>>>> We have a use case to read slowly changing bounded data as a
>>>> PCollection along with the main PCollection from Kafka(windowed) and use it
>>>> in the query of BeamSql.
>>>>
>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>
>>>> Approaches followed but not Successful:
>>>>
>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>>> Transform(which applies Beam I/O on the
>>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>>> to PCollection<Row> Apply BeamSQL
>>>> Comments: Beam I/O reads data only once even though a long value is
>>>> generated from GenerateSequece with periodicity. The expectation is that
>>>> whenever a long value is generated, Beam I/O will be used to read the
>>>> latest data. Is this because of optimizations in the DAG? Can the
>>>> optimizations be overridden?
>>>>
>>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>>> the PCollection.
>>>> comments: The output PCollection is unbounded. But, we need a bounded
>>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>>> PCollection inside a DoFn?
>>>>
>>>> Are there any better Approaches?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by rahul patwari <ra...@gmail.com>.
Hi,

Please add me as a contributor to the Beam Issue Tracker. I would like to
work on this feature. My ASF Jira Username: "rahul8383"

Thanks,
Rahul



On Wed, Jul 17, 2019 at 1:06 AM Rui Wang <ru...@google.com> wrote:

> Another approach is to let BeamSQL support it natively, as the title of
> this thread says: "as a Table in BeamSQL".
>
> We might be able to define a table with properties that says this table
> return a PCollectionView. By doing so we will have a trigger based
> PCollectionView available in SQL rel nodes, thus SQL will be able to
> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
> only need to construct a table and set it to SqlTransform
> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
> *. *
>
> Create a JIRA to track this idea:
> https://jira.apache.org/jira/browse/BEAM-7758
>
>
> -Rui
>
>
> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>
>> Hi Rahul,
>>
>> FYI, that patterns is also available in the Beam docs  ( with updated
>> code example )
>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>
>> Please note in the DoFn that feeds the View.asSingleton() you will need
>> to manually call BigQuery using the BigQuery client.
>>
>> Regards
>>
>> Reza
>>
>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> We have a use case to read slowly changing bounded data as a PCollection
>>> along with the main PCollection from Kafka(windowed) and use it in the
>>> query of BeamSql.
>>>
>>> Is it possible to design such a use case with Beam Java SDK?
>>>
>>> Approaches followed but not Successful:
>>>
>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>> Transform(which applies Beam I/O on the
>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>> to PCollection<Row> Apply BeamSQL
>>> Comments: Beam I/O reads data only once even though a long value is
>>> generated from GenerateSequece with periodicity. The expectation is that
>>> whenever a long value is generated, Beam I/O will be used to read the
>>> latest data. Is this because of optimizations in the DAG? Can the
>>> optimizations be overridden?
>>>
>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>> the PCollection.
>>> comments: The output PCollection is unbounded. But, we need a bounded
>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>> PCollection inside a DoFn?
>>>
>>> Are there any better Approaches?
>>>
>>> Regards,
>>> Rahul
>>>
>>>
>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Reza Rokni <re...@google.com>.
+1

On Tue, 16 Jul 2019 at 20:36, Rui Wang <ru...@google.com> wrote:

> Another approach is to let BeamSQL support it natively, as the title of
> this thread says: "as a Table in BeamSQL".
>
> We might be able to define a table with properties that says this table
> return a PCollectionView. By doing so we will have a trigger based
> PCollectionView available in SQL rel nodes, thus SQL will be able to
> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
> only need to construct a table and set it to SqlTransform
> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
> *. *
>
> Create a JIRA to track this idea:
> https://jira.apache.org/jira/browse/BEAM-7758
>
>
> -Rui
>
>
> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>
>> Hi Rahul,
>>
>> FYI, that patterns is also available in the Beam docs  ( with updated
>> code example )
>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>
>> Please note in the DoFn that feeds the View.asSingleton() you will need
>> to manually call BigQuery using the BigQuery client.
>>
>> Regards
>>
>> Reza
>>
>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> We have a use case to read slowly changing bounded data as a PCollection
>>> along with the main PCollection from Kafka(windowed) and use it in the
>>> query of BeamSql.
>>>
>>> Is it possible to design such a use case with Beam Java SDK?
>>>
>>> Approaches followed but not Successful:
>>>
>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>> Transform(which applies Beam I/O on the
>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>> to PCollection<Row> Apply BeamSQL
>>> Comments: Beam I/O reads data only once even though a long value is
>>> generated from GenerateSequece with periodicity. The expectation is that
>>> whenever a long value is generated, Beam I/O will be used to read the
>>> latest data. Is this because of optimizations in the DAG? Can the
>>> optimizations be overridden?
>>>
>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>> the PCollection.
>>> comments: The output PCollection is unbounded. But, we need a bounded
>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>> PCollection inside a DoFn?
>>>
>>> Are there any better Approaches?
>>>
>>> Regards,
>>> Rahul
>>>
>>>
>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Rui Wang <ru...@google.com>.
> The fact that Bounded vs Unbounded JOIN is performed by considering
> Bounded PCollection as a Sideinput means that the Bounded PCollection
> should fit into Memory. Am I right? In that case bounded PCollection of
> Hive (or) HDFS, where data might not fit into Memory cannot be JOINED with
> Kafka?
>

My discussion above didn't involve a slow changing data source that is
larger than fitting into memory, since the [*Pattern: Slowly-changing
lookup cache*] does not focus on data size.

I don't have insight on the case in which sources are HDFS or Hive that
contains a very large volume of data while the data is slowly changing.


>
> Does this approach have something to do with Watermark? As the computation
> might take time depending on the size of the Bounded Data, and the window
> might get expired before the result for the window is emitted.
>

Indeed there are details of watermark that you have to check implementation
to understand on how does it work on core Beam primitives. There are some
high-level explanation [1] and [2] for your reference.

I think in Beam model, you do not need to reason watermark when processing
data. If data is not considered too late (not later than GC watermark),
your pipeline will for sure to process those data, otherwise data will be
dropped.



[1]: https://www.youtube.com/watch?v=TWxSLmkWPm4
[2]:
https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#heading=h.7a03n7d5mf6g

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by rahul patwari <ra...@gmail.com>.
The fact that Bounded vs Unbounded JOIN is performed by considering Bounded
PCollection as a Sideinput means that the Bounded PCollection should fit
into Memory. Am I right? In that case bounded PCollection of Hive (or)
HDFS, where data might not fit into Memory cannot be JOINED with Kafka?

Does this approach have something to do with Watermark? As the computation
might take time depending on the size of the Bounded Data, and the window
might get expired before the result for the window is emitted.

Thanks,
Rahul

On Thu, Jul 18, 2019 at 10:13 PM Rui Wang <ru...@google.com> wrote:

> The idea is slowing change table is treated as a PCollectionView, which
> leads to a sideinput join implementation in which you join an unbounded
> windowed stream (Kafka) with triggering based sideinput (the slowing
> changing data). That's how it follows the pattern. If you are considering
> windowed data join windowed data case, in which one side of data is slowing
> changing, sounds like you only need to make the PCollectionView windowed by
> your need(fixed windowing for example. Also need to double check if
> sideinput can be triggered on non-global window), in this case windowing
> strategy seems has to be consistent for both sides.
>
> In BeamSQL, if one side of binary join is bounded PCollection, BeamSQL
> constructs a sideinput join already (you can check [1] for implementation
> detail). It's even more straightforward to do: one side of join is a
> PCollectionView? go sideinput join.
>
>
> [1]:
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L185
>
> -Rui
>
> On Thu, Jul 18, 2019 at 7:52 AM rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi Rui,
>>
>> I have a query about BEAM-7758.
>> If [Pattern: slowly changing lookup cache] is followed while defining and
>> constructing the lookup table and set it with SqlTransform, if any
>> aggregation (JOIN) need to be performed, say, with windowed Kafka
>> PCollection table and the lookup table, the aggregation cannot be done
>> unless both the PCollections have matching WindowFns as they are unbounded.
>> What can be done to treat the lookup table as Bounded PCollection and
>> perform aggregation with every window of Kafka's PCollection?
>>
>> Thanks,
>> Rahul
>>
>>
>> On Wed, Jul 17, 2019 at 1:06 AM Rui Wang <ru...@google.com> wrote:
>>
>>> Another approach is to let BeamSQL support it natively, as the title of
>>> this thread says: "as a Table in BeamSQL".
>>>
>>> We might be able to define a table with properties that says this table
>>> return a PCollectionView. By doing so we will have a trigger based
>>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>>> only need to construct a table and set it to SqlTransform
>>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>>> *. *
>>>
>>> Create a JIRA to track this idea:
>>> https://jira.apache.org/jira/browse/BEAM-7758
>>>
>>>
>>> -Rui
>>>
>>>
>>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>>>
>>>> Hi Rahul,
>>>>
>>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>>> code example )
>>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>>
>>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>>> to manually call BigQuery using the BigQuery client.
>>>>
>>>> Regards
>>>>
>>>> Reza
>>>>
>>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>>
>>>>> We have a use case to read slowly changing bounded data as a
>>>>> PCollection along with the main PCollection from Kafka(windowed) and use it
>>>>> in the query of BeamSql.
>>>>>
>>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>>
>>>>> Approaches followed but not Successful:
>>>>>
>>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>>>> Transform(which applies Beam I/O on the
>>>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>>>> to PCollection<Row> Apply BeamSQL
>>>>> Comments: Beam I/O reads data only once even though a long value is
>>>>> generated from GenerateSequece with periodicity. The expectation is that
>>>>> whenever a long value is generated, Beam I/O will be used to read the
>>>>> latest data. Is this because of optimizations in the DAG? Can the
>>>>> optimizations be overridden?
>>>>>
>>>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>>>> the PCollection.
>>>>> comments: The output PCollection is unbounded. But, we need a bounded
>>>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>>>> PCollection inside a DoFn?
>>>>>
>>>>> Are there any better Approaches?
>>>>>
>>>>> Regards,
>>>>> Rahul
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> This email may be confidential and privileged. If you received this
>>>> communication by mistake, please don't forward it to anyone else, please
>>>> erase all copies and attachments, and please let me know that it has gone
>>>> to the wrong person.
>>>>
>>>> The above terms reflect a potential business arrangement, are provided
>>>> solely as a basis for further discussion, and are not intended to be and do
>>>> not constitute a legally binding obligation. No legally binding obligations
>>>> will be created, implied, or inferred until an agreement in final form is
>>>> executed in writing by all parties involved.
>>>>
>>>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Rui Wang <ru...@google.com>.
The idea is slowing change table is treated as a PCollectionView, which
leads to a sideinput join implementation in which you join an unbounded
windowed stream (Kafka) with triggering based sideinput (the slowing
changing data). That's how it follows the pattern. If you are considering
windowed data join windowed data case, in which one side of data is slowing
changing, sounds like you only need to make the PCollectionView windowed by
your need(fixed windowing for example. Also need to double check if
sideinput can be triggered on non-global window), in this case windowing
strategy seems has to be consistent for both sides.

In BeamSQL, if one side of binary join is bounded PCollection, BeamSQL
constructs a sideinput join already (you can check [1] for implementation
detail). It's even more straightforward to do: one side of join is a
PCollectionView? go sideinput join.


[1]:
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java#L185

-Rui

On Thu, Jul 18, 2019 at 7:52 AM rahul patwari <ra...@gmail.com>
wrote:

> Hi Rui,
>
> I have a query about BEAM-7758.
> If [Pattern: slowly changing lookup cache] is followed while defining and
> constructing the lookup table and set it with SqlTransform, if any
> aggregation (JOIN) need to be performed, say, with windowed Kafka
> PCollection table and the lookup table, the aggregation cannot be done
> unless both the PCollections have matching WindowFns as they are unbounded.
> What can be done to treat the lookup table as Bounded PCollection and
> perform aggregation with every window of Kafka's PCollection?
>
> Thanks,
> Rahul
>
>
> On Wed, Jul 17, 2019 at 1:06 AM Rui Wang <ru...@google.com> wrote:
>
>> Another approach is to let BeamSQL support it natively, as the title of
>> this thread says: "as a Table in BeamSQL".
>>
>> We might be able to define a table with properties that says this table
>> return a PCollectionView. By doing so we will have a trigger based
>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>> only need to construct a table and set it to SqlTransform
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>> *. *
>>
>> Create a JIRA to track this idea:
>> https://jira.apache.org/jira/browse/BEAM-7758
>>
>>
>> -Rui
>>
>>
>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>>
>>> Hi Rahul,
>>>
>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>> code example )
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>
>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>> to manually call BigQuery using the BigQuery client.
>>>
>>> Regards
>>>
>>> Reza
>>>
>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>
>>>> We have a use case to read slowly changing bounded data as a
>>>> PCollection along with the main PCollection from Kafka(windowed) and use it
>>>> in the query of BeamSql.
>>>>
>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>
>>>> Approaches followed but not Successful:
>>>>
>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>>> Transform(which applies Beam I/O on the
>>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>>> to PCollection<Row> Apply BeamSQL
>>>> Comments: Beam I/O reads data only once even though a long value is
>>>> generated from GenerateSequece with periodicity. The expectation is that
>>>> whenever a long value is generated, Beam I/O will be used to read the
>>>> latest data. Is this because of optimizations in the DAG? Can the
>>>> optimizations be overridden?
>>>>
>>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>>> the PCollection.
>>>> comments: The output PCollection is unbounded. But, we need a bounded
>>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>>> PCollection inside a DoFn?
>>>>
>>>> Are there any better Approaches?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Pablo Estrada <pa...@google.com>.
I've added you as a contributor.

On Thu, Jul 18, 2019 at 7:52 AM rahul patwari <ra...@gmail.com>
wrote:

> Hi Rui,
>
> I have a query about BEAM-7758.
> If [Pattern: slowly changing lookup cache] is followed while defining and
> constructing the lookup table and set it with SqlTransform, if any
> aggregation (JOIN) need to be performed, say, with windowed Kafka
> PCollection table and the lookup table, the aggregation cannot be done
> unless both the PCollections have matching WindowFns as they are unbounded.
> What can be done to treat the lookup table as Bounded PCollection and
> perform aggregation with every window of Kafka's PCollection?
>
> Thanks,
> Rahul
>
>
> On Wed, Jul 17, 2019 at 1:06 AM Rui Wang <ru...@google.com> wrote:
>
>> Another approach is to let BeamSQL support it natively, as the title of
>> this thread says: "as a Table in BeamSQL".
>>
>> We might be able to define a table with properties that says this table
>> return a PCollectionView. By doing so we will have a trigger based
>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>> only need to construct a table and set it to SqlTransform
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>> *. *
>>
>> Create a JIRA to track this idea:
>> https://jira.apache.org/jira/browse/BEAM-7758
>>
>>
>> -Rui
>>
>>
>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>>
>>> Hi Rahul,
>>>
>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>> code example )
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>
>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>> to manually call BigQuery using the BigQuery client.
>>>
>>> Regards
>>>
>>> Reza
>>>
>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>
>>>> We have a use case to read slowly changing bounded data as a
>>>> PCollection along with the main PCollection from Kafka(windowed) and use it
>>>> in the query of BeamSql.
>>>>
>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>
>>>> Approaches followed but not Successful:
>>>>
>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>>> Transform(which applies Beam I/O on the
>>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>>> to PCollection<Row> Apply BeamSQL
>>>> Comments: Beam I/O reads data only once even though a long value is
>>>> generated from GenerateSequece with periodicity. The expectation is that
>>>> whenever a long value is generated, Beam I/O will be used to read the
>>>> latest data. Is this because of optimizations in the DAG? Can the
>>>> optimizations be overridden?
>>>>
>>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>>> the PCollection.
>>>> comments: The output PCollection is unbounded. But, we need a bounded
>>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>>> PCollection inside a DoFn?
>>>>
>>>> Are there any better Approaches?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by rahul patwari <ra...@gmail.com>.
Hi Rui,

I have a query about BEAM-7758.
If [Pattern: slowly changing lookup cache] is followed while defining and
constructing the lookup table and set it with SqlTransform, if any
aggregation (JOIN) need to be performed, say, with windowed Kafka
PCollection table and the lookup table, the aggregation cannot be done
unless both the PCollections have matching WindowFns as they are unbounded.
What can be done to treat the lookup table as Bounded PCollection and
perform aggregation with every window of Kafka's PCollection?

Thanks,
Rahul


On Wed, Jul 17, 2019 at 1:06 AM Rui Wang <ru...@google.com> wrote:

> Another approach is to let BeamSQL support it natively, as the title of
> this thread says: "as a Table in BeamSQL".
>
> We might be able to define a table with properties that says this table
> return a PCollectionView. By doing so we will have a trigger based
> PCollectionView available in SQL rel nodes, thus SQL will be able to
> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
> only need to construct a table and set it to SqlTransform
> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
> *. *
>
> Create a JIRA to track this idea:
> https://jira.apache.org/jira/browse/BEAM-7758
>
>
> -Rui
>
>
> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>
>> Hi Rahul,
>>
>> FYI, that patterns is also available in the Beam docs  ( with updated
>> code example )
>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>
>> Please note in the DoFn that feeds the View.asSingleton() you will need
>> to manually call BigQuery using the BigQuery client.
>>
>> Regards
>>
>> Reza
>>
>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> We have a use case to read slowly changing bounded data as a PCollection
>>> along with the main PCollection from Kafka(windowed) and use it in the
>>> query of BeamSql.
>>>
>>> Is it possible to design such a use case with Beam Java SDK?
>>>
>>> Approaches followed but not Successful:
>>>
>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>> Transform(which applies Beam I/O on the
>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>> to PCollection<Row> Apply BeamSQL
>>> Comments: Beam I/O reads data only once even though a long value is
>>> generated from GenerateSequece with periodicity. The expectation is that
>>> whenever a long value is generated, Beam I/O will be used to read the
>>> latest data. Is this because of optimizations in the DAG? Can the
>>> optimizations be overridden?
>>>
>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>> the PCollection.
>>> comments: The output PCollection is unbounded. But, we need a bounded
>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>> PCollection inside a DoFn?
>>>
>>> Are there any better Approaches?
>>>
>>> Regards,
>>> Rahul
>>>
>>>
>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Rui Wang <ru...@google.com>.
Hi Rahul,

Let's go to the JIRA and discuss details there. I will update it soon to
answer your questions.

-Rui

On Fri, Jul 19, 2019 at 8:41 AM rahul patwari <ra...@gmail.com>
wrote:

> Hi Rui,
>
> I am trying to understand what exactly needs to be done for
> https://jira.apache.org/jira/browse/BEAM-7758
>
> Does the Expectation follow these statements:
>
>    - User should be able to create a virtual Table of
>    PCollectionView(like
>    https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/),
>    where [rate of GenerateSequence], [Starting value from GenerateSequence],
>    [a class which implements DoFn, which will be used to read data from a
>    source], [the view, whether as Map (or) List (or) MultiMap (or) singleton]
>    can be provided with TBLPROPERTIES.
>    - User should be able to use the table created above to do a JOIN with
>    Unbounded Data source and support other SQL operators.
>
> Thanks,
> Rahul
>
> On Wed, Jul 17, 2019 at 1:06 AM Rui Wang <ru...@google.com> wrote:
>
>> Another approach is to let BeamSQL support it natively, as the title of
>> this thread says: "as a Table in BeamSQL".
>>
>> We might be able to define a table with properties that says this table
>> return a PCollectionView. By doing so we will have a trigger based
>> PCollectionView available in SQL rel nodes, thus SQL will be able to
>> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
>> only need to construct a table and set it to SqlTransform
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
>> *. *
>>
>> Create a JIRA to track this idea:
>> https://jira.apache.org/jira/browse/BEAM-7758
>>
>>
>> -Rui
>>
>>
>> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>>
>>> Hi Rahul,
>>>
>>> FYI, that patterns is also available in the Beam docs  ( with updated
>>> code example )
>>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>>
>>> Please note in the DoFn that feeds the View.asSingleton() you will need
>>> to manually call BigQuery using the BigQuery client.
>>>
>>> Regards
>>>
>>> Reza
>>>
>>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>
>>>> We have a use case to read slowly changing bounded data as a
>>>> PCollection along with the main PCollection from Kafka(windowed) and use it
>>>> in the query of BeamSql.
>>>>
>>>> Is it possible to design such a use case with Beam Java SDK?
>>>>
>>>> Approaches followed but not Successful:
>>>>
>>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>>> Transform(which applies Beam I/O on the
>>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>>> to PCollection<Row> Apply BeamSQL
>>>> Comments: Beam I/O reads data only once even though a long value is
>>>> generated from GenerateSequece with periodicity. The expectation is that
>>>> whenever a long value is generated, Beam I/O will be used to read the
>>>> latest data. Is this because of optimizations in the DAG? Can the
>>>> optimizations be overridden?
>>>>
>>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>>> the PCollection.
>>>> comments: The output PCollection is unbounded. But, we need a bounded
>>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>>> PCollection inside a DoFn?
>>>>
>>>> Are there any better Approaches?
>>>>
>>>> Regards,
>>>> Rahul
>>>>
>>>>
>>>>
>>>
>>> --
>>>
>>> This email may be confidential and privileged. If you received this
>>> communication by mistake, please don't forward it to anyone else, please
>>> erase all copies and attachments, and please let me know that it has gone
>>> to the wrong person.
>>>
>>> The above terms reflect a potential business arrangement, are provided
>>> solely as a basis for further discussion, and are not intended to be and do
>>> not constitute a legally binding obligation. No legally binding obligations
>>> will be created, implied, or inferred until an agreement in final form is
>>> executed in writing by all parties involved.
>>>
>>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by rahul patwari <ra...@gmail.com>.
Hi Rui,

I am trying to understand what exactly needs to be done for
https://jira.apache.org/jira/browse/BEAM-7758

Does the Expectation follow these statements:

   - User should be able to create a virtual Table of PCollectionView(like
   https://beam.apache.org/documentation/dsls/sql/extensions/create-external-table/),
   where [rate of GenerateSequence], [Starting value from GenerateSequence],
   [a class which implements DoFn, which will be used to read data from a
   source], [the view, whether as Map (or) List (or) MultiMap (or) singleton]
   can be provided with TBLPROPERTIES.
   - User should be able to use the table created above to do a JOIN with
   Unbounded Data source and support other SQL operators.

Thanks,
Rahul

On Wed, Jul 17, 2019 at 1:06 AM Rui Wang <ru...@google.com> wrote:

> Another approach is to let BeamSQL support it natively, as the title of
> this thread says: "as a Table in BeamSQL".
>
> We might be able to define a table with properties that says this table
> return a PCollectionView. By doing so we will have a trigger based
> PCollectionView available in SQL rel nodes, thus SQL will be able to
> implement [*Pattern: Slowly-changing lookup cache].* By this way, users
> only need to construct a table and set it to SqlTransform
> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
> *. *
>
> Create a JIRA to track this idea:
> https://jira.apache.org/jira/browse/BEAM-7758
>
>
> -Rui
>
>
> On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:
>
>> Hi Rahul,
>>
>> FYI, that patterns is also available in the Beam docs  ( with updated
>> code example )
>> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>>
>> Please note in the DoFn that feeds the View.asSingleton() you will need
>> to manually call BigQuery using the BigQuery client.
>>
>> Regards
>>
>> Reza
>>
>> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> we are following [*Pattern: Slowly-changing lookup cache*] from
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> We have a use case to read slowly changing bounded data as a PCollection
>>> along with the main PCollection from Kafka(windowed) and use it in the
>>> query of BeamSql.
>>>
>>> Is it possible to design such a use case with Beam Java SDK?
>>>
>>> Approaches followed but not Successful:
>>>
>>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>>> Transform(which applies Beam I/O on the
>>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>>> to PCollection<Row> Apply BeamSQL
>>> Comments: Beam I/O reads data only once even though a long value is
>>> generated from GenerateSequece with periodicity. The expectation is that
>>> whenever a long value is generated, Beam I/O will be used to read the
>>> latest data. Is this because of optimizations in the DAG? Can the
>>> optimizations be overridden?
>>>
>>> 2) The pipeline is the same as approach 1. But, instead of using a
>>> composite transform, a DoFn is used where a for loop will emit each Row of
>>> the PCollection.
>>> comments: The output PCollection is unbounded. But, we need a bounded
>>> PCollection as this PCollection is used to JOIN with PCollection of each
>>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>>> PCollection inside a DoFn?
>>>
>>> Are there any better Approaches?
>>>
>>> Regards,
>>> Rahul
>>>
>>>
>>>
>>
>> --
>>
>> This email may be confidential and privileged. If you received this
>> communication by mistake, please don't forward it to anyone else, please
>> erase all copies and attachments, and please let me know that it has gone
>> to the wrong person.
>>
>> The above terms reflect a potential business arrangement, are provided
>> solely as a basis for further discussion, and are not intended to be and do
>> not constitute a legally binding obligation. No legally binding obligations
>> will be created, implied, or inferred until an agreement in final form is
>> executed in writing by all parties involved.
>>
>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Rui Wang <ru...@google.com>.
Another approach is to let BeamSQL support it natively, as the title of
this thread says: "as a Table in BeamSQL".

We might be able to define a table with properties that says this table
return a PCollectionView. By doing so we will have a trigger based
PCollectionView available in SQL rel nodes, thus SQL will be able to
implement [*Pattern: Slowly-changing lookup cache].* By this way, users
only need to construct a table and set it to SqlTransform
<https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
*. *

Create a JIRA to track this idea:
https://jira.apache.org/jira/browse/BEAM-7758


-Rui


On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:

> Hi Rahul,
>
> FYI, that patterns is also available in the Beam docs  ( with updated code
> example )
> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>
> Please note in the DoFn that feeds the View.asSingleton() you will need to
> manually call BigQuery using the BigQuery client.
>
> Regards
>
> Reza
>
> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi,
>>
>> we are following [*Pattern: Slowly-changing lookup cache*] from
>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>
>> We have a use case to read slowly changing bounded data as a PCollection
>> along with the main PCollection from Kafka(windowed) and use it in the
>> query of BeamSql.
>>
>> Is it possible to design such a use case with Beam Java SDK?
>>
>> Approaches followed but not Successful:
>>
>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>> Transform(which applies Beam I/O on the
>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>> to PCollection<Row> Apply BeamSQL
>> Comments: Beam I/O reads data only once even though a long value is
>> generated from GenerateSequece with periodicity. The expectation is that
>> whenever a long value is generated, Beam I/O will be used to read the
>> latest data. Is this because of optimizations in the DAG? Can the
>> optimizations be overridden?
>>
>> 2) The pipeline is the same as approach 1. But, instead of using a
>> composite transform, a DoFn is used where a for loop will emit each Row of
>> the PCollection.
>> comments: The output PCollection is unbounded. But, we need a bounded
>> PCollection as this PCollection is used to JOIN with PCollection of each
>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>> PCollection inside a DoFn?
>>
>> Are there any better Approaches?
>>
>> Regards,
>> Rahul
>>
>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Rui Wang <ru...@google.com>.
Another approach is to let BeamSQL support it natively, as the title of
this thread says: "as a Table in BeamSQL".

We might be able to define a table with properties that says this table
return a PCollectionView. By doing so we will have a trigger based
PCollectionView available in SQL rel nodes, thus SQL will be able to
implement [*Pattern: Slowly-changing lookup cache].* By this way, users
only need to construct a table and set it to SqlTransform
<https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java#L186>
*. *

Create a JIRA to track this idea:
https://jira.apache.org/jira/browse/BEAM-7758


-Rui


On Tue, Jul 16, 2019 at 7:12 AM Reza Rokni <re...@google.com> wrote:

> Hi Rahul,
>
> FYI, that patterns is also available in the Beam docs  ( with updated code
> example )
> https://beam.apache.org/documentation/patterns/side-input-patterns/.
>
> Please note in the DoFn that feeds the View.asSingleton() you will need to
> manually call BigQuery using the BigQuery client.
>
> Regards
>
> Reza
>
> On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi,
>>
>> we are following [*Pattern: Slowly-changing lookup cache*] from
>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>
>> We have a use case to read slowly changing bounded data as a PCollection
>> along with the main PCollection from Kafka(windowed) and use it in the
>> query of BeamSql.
>>
>> Is it possible to design such a use case with Beam Java SDK?
>>
>> Approaches followed but not Successful:
>>
>> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
>> Transform(which applies Beam I/O on the
>> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
>> to PCollection<Row> Apply BeamSQL
>> Comments: Beam I/O reads data only once even though a long value is
>> generated from GenerateSequece with periodicity. The expectation is that
>> whenever a long value is generated, Beam I/O will be used to read the
>> latest data. Is this because of optimizations in the DAG? Can the
>> optimizations be overridden?
>>
>> 2) The pipeline is the same as approach 1. But, instead of using a
>> composite transform, a DoFn is used where a for loop will emit each Row of
>> the PCollection.
>> comments: The output PCollection is unbounded. But, we need a bounded
>> PCollection as this PCollection is used to JOIN with PCollection of each
>> window from Kafka. How can we convert an Unbounded PCollection to Bounded
>> PCollection inside a DoFn?
>>
>> Are there any better Approaches?
>>
>> Regards,
>> Rahul
>>
>>
>>
>
> --
>
> This email may be confidential and privileged. If you received this
> communication by mistake, please don't forward it to anyone else, please
> erase all copies and attachments, and please let me know that it has gone
> to the wrong person.
>
> The above terms reflect a potential business arrangement, are provided
> solely as a basis for further discussion, and are not intended to be and do
> not constitute a legally binding obligation. No legally binding obligations
> will be created, implied, or inferred until an agreement in final form is
> executed in writing by all parties involved.
>

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Reza Rokni <re...@google.com>.
Hi Rahul,

FYI, that patterns is also available in the Beam docs  ( with updated code
example )
https://beam.apache.org/documentation/patterns/side-input-patterns/.

Please note in the DoFn that feeds the View.asSingleton() you will need to
manually call BigQuery using the BigQuery client.

Regards

Reza

On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
wrote:

> Hi,
>
> we are following [*Pattern: Slowly-changing lookup cache*] from
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>
> We have a use case to read slowly changing bounded data as a PCollection
> along with the main PCollection from Kafka(windowed) and use it in the
> query of BeamSql.
>
> Is it possible to design such a use case with Beam Java SDK?
>
> Approaches followed but not Successful:
>
> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
> Transform(which applies Beam I/O on the
> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
> to PCollection<Row> Apply BeamSQL
> Comments: Beam I/O reads data only once even though a long value is
> generated from GenerateSequece with periodicity. The expectation is that
> whenever a long value is generated, Beam I/O will be used to read the
> latest data. Is this because of optimizations in the DAG? Can the
> optimizations be overridden?
>
> 2) The pipeline is the same as approach 1. But, instead of using a
> composite transform, a DoFn is used where a for loop will emit each Row of
> the PCollection.
> comments: The output PCollection is unbounded. But, we need a bounded
> PCollection as this PCollection is used to JOIN with PCollection of each
> window from Kafka. How can we convert an Unbounded PCollection to Bounded
> PCollection inside a DoFn?
>
> Are there any better Approaches?
>
> Regards,
> Rahul
>
>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.

Re: Slowly changing lookup cache as a Table in BeamSql

Posted by Reza Rokni <re...@google.com>.
Hi Rahul,

FYI, that patterns is also available in the Beam docs  ( with updated code
example )
https://beam.apache.org/documentation/patterns/side-input-patterns/.

Please note in the DoFn that feeds the View.asSingleton() you will need to
manually call BigQuery using the BigQuery client.

Regards

Reza

On Tue, 16 Jul 2019 at 14:37, rahul patwari <ra...@gmail.com>
wrote:

> Hi,
>
> we are following [*Pattern: Slowly-changing lookup cache*] from
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>
> We have a use case to read slowly changing bounded data as a PCollection
> along with the main PCollection from Kafka(windowed) and use it in the
> query of BeamSql.
>
> Is it possible to design such a use case with Beam Java SDK?
>
> Approaches followed but not Successful:
>
> 1) GenerateSequence => GlobalWindow with Data Trigger => Composite
> Transform(which applies Beam I/O on the
> pipeline[PCollection.getPipeline()]) => Convert the resulting PCollection
> to PCollection<Row> Apply BeamSQL
> Comments: Beam I/O reads data only once even though a long value is
> generated from GenerateSequece with periodicity. The expectation is that
> whenever a long value is generated, Beam I/O will be used to read the
> latest data. Is this because of optimizations in the DAG? Can the
> optimizations be overridden?
>
> 2) The pipeline is the same as approach 1. But, instead of using a
> composite transform, a DoFn is used where a for loop will emit each Row of
> the PCollection.
> comments: The output PCollection is unbounded. But, we need a bounded
> PCollection as this PCollection is used to JOIN with PCollection of each
> window from Kafka. How can we convert an Unbounded PCollection to Bounded
> PCollection inside a DoFn?
>
> Are there any better Approaches?
>
> Regards,
> Rahul
>
>
>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.