You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Alan Krumholz <al...@betterup.co> on 2020/01/27 17:32:10 UTC

snowflake source/sink

Hi,
We are using beam and (dataflow) at my company and would like to use it to
read and write data from snowflake.

Does anybody know if there are any source/sink available for snowflake?

if not, what would be the easiest way to create those? (maybe there is
something for sqlalchemy that we could leverage for that?)


Thanks so much!

Alan

Re: snowflake source/sink

Posted by Alexey Romanenko <ar...@gmail.com>.
At first glance, I think it even could be implemented and incapsulated inside JdbcIO, like JdbcIO.readParallel(), and helps users to read in parallel (under some conditions and limitations of course) if it satisfies their requests. 
Wdyt? Eugene, Reza?

> On 30 Jan 2020, at 03:28, Reza Rokni <re...@google.com> wrote:
> 
> @Eugene Kirpichov <ma...@google.com> that pattern would be a nice one to add to the Apache Beam patterns page:
> 
> https://beam.apache.org/documentation/patterns/overview/ <https://beam.apache.org/documentation/patterns/overview/>
> 
> Raised a jira, would be nice first PR for anyone wanting to contribute to Beam :D
> 
> https://issues.apache.org/jira/browse/BEAM-9222 <https://issues.apache.org/jira/browse/BEAM-9222>
> On Thu, 30 Jan 2020 at 05:16, Eugene Kirpichov <jkff@google.com <ma...@google.com>> wrote:
> Note that https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/ <https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/> , while focused on the JdbcIO connector, still explains how to read from a database in parallel - that solution would equally apply to your code, because it is not specific to JDBC.
> You'd need to decide on a primary key that supports efficient range scans, and create a pipeline like: (create key ranges) | reshuffle | (ReadSnowflake each range).
> 
> On Wed, Jan 29, 2020 at 11:01 AM Alan Krumholz <alan.krumholz@betterup.co <ma...@betterup.co>> wrote:
> Hi Brice,
> 
> Thanks so much for your suggestion! I did the following and it seems to work:
> 
> class ReadSnowflake(beam.DoFn):
>     def process(self, _):
>         import snowflake.connector
>         ctx = snowflake.connector.connect(...)
>         cs = ctx.cursor()
>         cs.execute('SELECT a, b FROM t')
>         while True:
>             data =  cs.fetchmany(1000)
>             if len(data) == 0:
>                 break;
>             for d in data:
>                 yield {'a':d[0], 'b':d[1]}
> 
> pipeline = beam.Pipeline()
> p = (
>     pipeline
>     | 'Empty start' >> beam.Create([''])
>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
> 
> 
> I know this is not optimal as it is reading sequentially from snowflake (instead of doing it in parallel as I'm sure the BQ source does) but apart from that, do you see any other problems (or possible improvements) with this code?
> 
> 
> Thank you so much!
> 
> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <briceg@gmail.com <ma...@gmail.com>> wrote:
> I am using a pattern which I saw online (but can't seem to locate) which performs i/o in a DoFn using a standard python client. I use this to read and write to Google cloud storage in streaming mode. You could use this idea to perform almost any i/o. Depending on your use case and workflow, this may be an approach you could consider. Shout if you need some boilerplate.
> 
> It does look like native support is coming and you know it is true as I read it on the internet.    :)
> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/ <https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/> 
> 
> You could also setup an external service or endpoint to perform the query and read the results into your pipeline in a pipeline step similar to the enrichment idea here: https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1 <https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1>
> 
> And you could always write your own connector. Not a task to be taken too lightly but it can be done.
> 
> HTH
> 
> 
> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <alan.krumholz@betterup.co <ma...@betterup.co>> wrote:
> Thanks for sharing this Erik!
> 
> It would be really nice/convenient to have a python option to do something like that. Our ML team is mostly a python shop and we are also using kubeflow pipelines to orchestrate our ML pipelines (mostly using their python sdk to author these).
> 
> Please let me know if you can think of any way we could do this with python.
> 
> Thanks so much!
> 
> 
> 
> 
> 
> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <erik.willsey@panderasystems.com <ma...@panderasystems.com>> wrote:
> You can use the JDBC driver.  Here's a blog that describes JDBC usage in general:   https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/ <https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/>  
> 
> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <alan.krumholz@betterup.co <ma...@betterup.co>> wrote:
> Hi,
> We are using beam and (dataflow) at my company and would like to use it to read and write data from snowflake.
> 
> Does anybody know if there are any source/sink available for snowflake?
> 
> if not, what would be the easiest way to create those? (maybe there is something for sqlalchemy that we could leverage for that?)
> 
> 
> Thanks so much!
> 
> Alan


Re: snowflake source/sink

Posted by Reza Rokni <re...@google.com>.
@Eugene Kirpichov <ki...@google.com> that pattern would be a nice one
to add to the Apache Beam patterns page:

https://beam.apache.org/documentation/patterns/overview/

Raised a jira, would be nice first PR for anyone wanting to contribute to
Beam :D

https://issues.apache.org/jira/browse/BEAM-9222

On Thu, 30 Jan 2020 at 05:16, Eugene Kirpichov <jk...@google.com> wrote:

> Note that
> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
> , while focused on the JdbcIO connector, still explains how to read from a
> database in parallel - that solution would equally apply to your code,
> because it is not specific to JDBC.
> You'd need to decide on a primary key that supports efficient range scans,
> and create a pipeline like: (create key ranges) | reshuffle |
> (ReadSnowflake each range).
>
> On Wed, Jan 29, 2020 at 11:01 AM Alan Krumholz <al...@betterup.co>
> wrote:
>
>> Hi Brice,
>>
>> Thanks so much for your suggestion! I did the following and it seems to
>> work:
>>
>> class ReadSnowflake(beam.DoFn):
>>     def process(self, _):
>>         import snowflake.connector
>>         ctx = snowflake.connector.connect(...)
>>         cs = ctx.cursor()
>>         cs.execute('SELECT a, b FROM t')
>>         while True:
>>             data =  cs.fetchmany(1000)
>>             if len(data) == 0:
>>                 break;
>>             for d in data:
>>                 yield {'a':d[0], 'b':d[1]}
>>
>> pipeline = beam.Pipeline()
>> p = (
>>     pipeline
>>     | 'Empty start' >> beam.Create([''])
>>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>>
>>
>> I know this is not optimal as it is reading sequentially from snowflake
>> (instead of doing it in parallel as I'm sure the BQ source does) but apart
>> from that, do you see any other problems (or possible improvements) with
>> this code?
>>
>>
>> Thank you so much!
>>
>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <br...@gmail.com>
>> wrote:
>>
>>> I am using a pattern which I saw online (but can't seem to locate) which
>>> performs i/o in a DoFn using a standard python client. I use this to read
>>> and write to Google cloud storage in streaming mode. You could use this
>>> idea to perform almost any i/o. Depending on your use case and workflow,
>>> this may be an approach you could consider. Shout if you need some
>>> boilerplate.
>>>
>>> It does look like native support is coming and you know it is true as I
>>> read it on the internet.    :)
>>>
>>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>>
>>>
>>> You could also setup an external service or endpoint to perform the
>>> query and read the results into your pipeline in a pipeline step similar to
>>> the enrichment idea here:
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> And you could always write your own connector. Not a task to be taken
>>> too lightly but it can be done.
>>>
>>> HTH
>>>
>>>
>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <
>>> alan.krumholz@betterup.co> wrote:
>>>
>>>> Thanks for sharing this Erik!
>>>>
>>>> It would be really nice/convenient to have a python option to do
>>>> something like that. Our ML team is mostly a python shop and we are also
>>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>>>> their python sdk to author these).
>>>>
>>>> Please let me know if you can think of any way we could do this with
>>>> python.
>>>>
>>>> Thanks so much!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>>>> erik.willsey@panderasystems.com> wrote:
>>>>
>>>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage
>>>>> in general:
>>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>>>
>>>>>
>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>>>> alan.krumholz@betterup.co> wrote:
>>>>>
>>>>>> Hi,
>>>>>> We are using beam and (dataflow) at my company and would like to use
>>>>>> it to read and write data from snowflake.
>>>>>>
>>>>>> Does anybody know if there are any source/sink available for
>>>>>> snowflake?
>>>>>>
>>>>>> if not, what would be the easiest way to create those? (maybe there
>>>>>> is something for sqlalchemy that we could leverage for that?)
>>>>>>
>>>>>>
>>>>>> Thanks so much!
>>>>>>
>>>>>> Alan
>>>>>>
>>>>>

Re: snowflake source/sink

Posted by Eugene Kirpichov <jk...@google.com>.
Note that
https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
, while focused on the JdbcIO connector, still explains how to read from a
database in parallel - that solution would equally apply to your code,
because it is not specific to JDBC.
You'd need to decide on a primary key that supports efficient range scans,
and create a pipeline like: (create key ranges) | reshuffle |
(ReadSnowflake each range).

On Wed, Jan 29, 2020 at 11:01 AM Alan Krumholz <al...@betterup.co>
wrote:

> Hi Brice,
>
> Thanks so much for your suggestion! I did the following and it seems to
> work:
>
> class ReadSnowflake(beam.DoFn):
>     def process(self, _):
>         import snowflake.connector
>         ctx = snowflake.connector.connect(...)
>         cs = ctx.cursor()
>         cs.execute('SELECT a, b FROM t')
>         while True:
>             data =  cs.fetchmany(1000)
>             if len(data) == 0:
>                 break;
>             for d in data:
>                 yield {'a':d[0], 'b':d[1]}
>
> pipeline = beam.Pipeline()
> p = (
>     pipeline
>     | 'Empty start' >> beam.Create([''])
>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>
>
> I know this is not optimal as it is reading sequentially from snowflake
> (instead of doing it in parallel as I'm sure the BQ source does) but apart
> from that, do you see any other problems (or possible improvements) with
> this code?
>
>
> Thank you so much!
>
> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <br...@gmail.com> wrote:
>
>> I am using a pattern which I saw online (but can't seem to locate) which
>> performs i/o in a DoFn using a standard python client. I use this to read
>> and write to Google cloud storage in streaming mode. You could use this
>> idea to perform almost any i/o. Depending on your use case and workflow,
>> this may be an approach you could consider. Shout if you need some
>> boilerplate.
>>
>> It does look like native support is coming and you know it is true as I
>> read it on the internet.    :)
>>
>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>
>>
>> You could also setup an external service or endpoint to perform the query
>> and read the results into your pipeline in a pipeline step similar to the
>> enrichment idea here:
>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>
>> And you could always write your own connector. Not a task to be taken too
>> lightly but it can be done.
>>
>> HTH
>>
>>
>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <al...@betterup.co>
>> wrote:
>>
>>> Thanks for sharing this Erik!
>>>
>>> It would be really nice/convenient to have a python option to do
>>> something like that. Our ML team is mostly a python shop and we are also
>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>>> their python sdk to author these).
>>>
>>> Please let me know if you can think of any way we could do this with
>>> python.
>>>
>>> Thanks so much!
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>>> erik.willsey@panderasystems.com> wrote:
>>>
>>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage
>>>> in general:
>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>>
>>>>
>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>>> alan.krumholz@betterup.co> wrote:
>>>>
>>>>> Hi,
>>>>> We are using beam and (dataflow) at my company and would like to use
>>>>> it to read and write data from snowflake.
>>>>>
>>>>> Does anybody know if there are any source/sink available for snowflake?
>>>>>
>>>>> if not, what would be the easiest way to create those? (maybe there is
>>>>> something for sqlalchemy that we could leverage for that?)
>>>>>
>>>>>
>>>>> Thanks so much!
>>>>>
>>>>> Alan
>>>>>
>>>>

Re: snowflake source/sink

Posted by Alan Krumholz <al...@betterup.co>.
Hi Robert, beam.util.GroupIntoBatches(x) works perfectly. Thanks so much!

On Wed, Jan 29, 2020 at 12:58 PM Robert Bradshaw <ro...@google.com>
wrote:

> You could use the beam.util.BatchElements transform to batch rows into
> larger chunks.
>
> On Wed, Jan 29, 2020 at 12:01 PM Alan Krumholz
> <al...@betterup.co> wrote:
> >
> > Thanks Brice! I'll look into wrapping the connector.
> >
> > One more question.
> > I'm trying now to develop a sink too. This is what I have:
> >
> > def writeSnowflake(row):
> >     import snowflake.connector
> >     ctx = snowflake.connector.connect(...)
> >     cs = ctx.cursor()
> >     cs.execute(
> >         'INSERT INTO t(a, b) VALUES ({a}, {b})'.format(
> >             a = str(row['a']),
> >             b = str(row['b'])
> >         )
> >     )
> >     return row
> >
> > pipeline = beam.Pipeline(...)
> > p = (
> >     pipeline
> >     | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...))
> >     | 'Write to SnowFlake' >> beam.Map(writeSnowflake)
> > )
> >
> > This seems to (slowly) work but it feels extremely inefficient to send
> an INSERT query to the DW for each row in the dataset.
> > Is there an easy way to have the pipeline maybe stack all my data rows
> into chunks of 1,000 so I can insert these by chunks instead. I'm mostly
> curious about how to have the pipeline pass 1K rows at a time to
> "writeSnowflake()" instead of passing one by one.
> > Maybe by using a GroupByKey transformation and using randomly sampled
> keys to create the chunks of the desired size? (or can you think of a
> better way to achieve this?)
> >
> > Thank you so much for all your help!
> >
> > On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht <br...@gmail.com>
> wrote:
> >>
> >> Lovely! You absolutely have the right idea.
> >>
> >> Your comments are spot on but the suboptimal solution that works is
> usually preferable to the optimal one that doesn't (exist).
> >>
> >> I don't have any experience with the Snowflake connector but if it can
> be kept around and reused and is slow or expensive to create, you may want
> to consider a wrapper/class to manage the client/connector that you can use
> across your pipeline steps if needed.
> >>
> >> Happy processing.
> >> -Brice
> >>
> >> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <
> alan.krumholz@betterup.co> wrote:
> >>>
> >>> Hi Brice,
> >>>
> >>> Thanks so much for your suggestion! I did the following and it seems
> to work:
> >>>
> >>> class ReadSnowflake(beam.DoFn):
> >>>     def process(self, _):
> >>>         import snowflake.connector
> >>>         ctx = snowflake.connector.connect(...)
> >>>         cs = ctx.cursor()
> >>>         cs.execute('SELECT a, b FROM t')
> >>>         while True:
> >>>             data =  cs.fetchmany(1000)
> >>>             if len(data) == 0:
> >>>                 break;
> >>>             for d in data:
> >>>                 yield {'a':d[0], 'b':d[1]}
> >>>
> >>> pipeline = beam.Pipeline()
> >>> p = (
> >>>     pipeline
> >>>     | 'Empty start' >> beam.Create([''])
> >>>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
> >>>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
> >>>
> >>>
> >>> I know this is not optimal as it is reading sequentially from
> snowflake (instead of doing it in parallel as I'm sure the BQ source does)
> but apart from that, do you see any other problems (or possible
> improvements) with this code?
> >>>
> >>>
> >>> Thank you so much!
> >>>
> >>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <br...@gmail.com>
> wrote:
> >>>>
> >>>> I am using a pattern which I saw online (but can't seem to locate)
> which performs i/o in a DoFn using a standard python client. I use this to
> read and write to Google cloud storage in streaming mode. You could use
> this idea to perform almost any i/o. Depending on your use case and
> workflow, this may be an approach you could consider. Shout if you need
> some boilerplate.
> >>>>
> >>>> It does look like native support is coming and you know it is true as
> I read it on the internet.    :)
> >>>>
> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
> >>>>
> >>>> You could also setup an external service or endpoint to perform the
> query and read the results into your pipeline in a pipeline step similar to
> the enrichment idea here:
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
> >>>>
> >>>> And you could always write your own connector. Not a task to be taken
> too lightly but it can be done.
> >>>>
> >>>> HTH
> >>>>
> >>>>
> >>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <
> alan.krumholz@betterup.co> wrote:
> >>>>>
> >>>>> Thanks for sharing this Erik!
> >>>>>
> >>>>> It would be really nice/convenient to have a python option to do
> something like that. Our ML team is mostly a python shop and we are also
> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
> their python sdk to author these).
> >>>>>
> >>>>> Please let me know if you can think of any way we could do this with
> python.
> >>>>>
> >>>>> Thanks so much!
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
> erik.willsey@panderasystems.com> wrote:
> >>>>>>
> >>>>>> You can use the JDBC driver.  Here's a blog that describes JDBC
> usage in general:
> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
> >>>>>>
> >>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
> alan.krumholz@betterup.co> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>> We are using beam and (dataflow) at my company and would like to
> use it to read and write data from snowflake.
> >>>>>>>
> >>>>>>> Does anybody know if there are any source/sink available for
> snowflake?
> >>>>>>>
> >>>>>>> if not, what would be the easiest way to create those? (maybe
> there is something for sqlalchemy that we could leverage for that?)
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks so much!
> >>>>>>>
> >>>>>>> Alan
>

Re: snowflake source/sink

Posted by Robert Bradshaw <ro...@google.com>.
You could use the beam.util.BatchElements transform to batch rows into
larger chunks.

On Wed, Jan 29, 2020 at 12:01 PM Alan Krumholz
<al...@betterup.co> wrote:
>
> Thanks Brice! I'll look into wrapping the connector.
>
> One more question.
> I'm trying now to develop a sink too. This is what I have:
>
> def writeSnowflake(row):
>     import snowflake.connector
>     ctx = snowflake.connector.connect(...)
>     cs = ctx.cursor()
>     cs.execute(
>         'INSERT INTO t(a, b) VALUES ({a}, {b})'.format(
>             a = str(row['a']),
>             b = str(row['b'])
>         )
>     )
>     return row
>
> pipeline = beam.Pipeline(...)
> p = (
>     pipeline
>     | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...))
>     | 'Write to SnowFlake' >> beam.Map(writeSnowflake)
> )
>
> This seems to (slowly) work but it feels extremely inefficient to send an INSERT query to the DW for each row in the dataset.
> Is there an easy way to have the pipeline maybe stack all my data rows into chunks of 1,000 so I can insert these by chunks instead. I'm mostly curious about how to have the pipeline pass 1K rows at a time to "writeSnowflake()" instead of passing one by one.
> Maybe by using a GroupByKey transformation and using randomly sampled keys to create the chunks of the desired size? (or can you think of a better way to achieve this?)
>
> Thank you so much for all your help!
>
> On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht <br...@gmail.com> wrote:
>>
>> Lovely! You absolutely have the right idea.
>>
>> Your comments are spot on but the suboptimal solution that works is usually preferable to the optimal one that doesn't (exist).
>>
>> I don't have any experience with the Snowflake connector but if it can be kept around and reused and is slow or expensive to create, you may want to consider a wrapper/class to manage the client/connector that you can use across your pipeline steps if needed.
>>
>> Happy processing.
>> -Brice
>>
>> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <al...@betterup.co> wrote:
>>>
>>> Hi Brice,
>>>
>>> Thanks so much for your suggestion! I did the following and it seems to work:
>>>
>>> class ReadSnowflake(beam.DoFn):
>>>     def process(self, _):
>>>         import snowflake.connector
>>>         ctx = snowflake.connector.connect(...)
>>>         cs = ctx.cursor()
>>>         cs.execute('SELECT a, b FROM t')
>>>         while True:
>>>             data =  cs.fetchmany(1000)
>>>             if len(data) == 0:
>>>                 break;
>>>             for d in data:
>>>                 yield {'a':d[0], 'b':d[1]}
>>>
>>> pipeline = beam.Pipeline()
>>> p = (
>>>     pipeline
>>>     | 'Empty start' >> beam.Create([''])
>>>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>>>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>>>
>>>
>>> I know this is not optimal as it is reading sequentially from snowflake (instead of doing it in parallel as I'm sure the BQ source does) but apart from that, do you see any other problems (or possible improvements) with this code?
>>>
>>>
>>> Thank you so much!
>>>
>>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <br...@gmail.com> wrote:
>>>>
>>>> I am using a pattern which I saw online (but can't seem to locate) which performs i/o in a DoFn using a standard python client. I use this to read and write to Google cloud storage in streaming mode. You could use this idea to perform almost any i/o. Depending on your use case and workflow, this may be an approach you could consider. Shout if you need some boilerplate.
>>>>
>>>> It does look like native support is coming and you know it is true as I read it on the internet.    :)
>>>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>>>
>>>> You could also setup an external service or endpoint to perform the query and read the results into your pipeline in a pipeline step similar to the enrichment idea here: https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>>
>>>> And you could always write your own connector. Not a task to be taken too lightly but it can be done.
>>>>
>>>> HTH
>>>>
>>>>
>>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <al...@betterup.co> wrote:
>>>>>
>>>>> Thanks for sharing this Erik!
>>>>>
>>>>> It would be really nice/convenient to have a python option to do something like that. Our ML team is mostly a python shop and we are also using kubeflow pipelines to orchestrate our ML pipelines (mostly using their python sdk to author these).
>>>>>
>>>>> Please let me know if you can think of any way we could do this with python.
>>>>>
>>>>> Thanks so much!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <er...@panderasystems.com> wrote:
>>>>>>
>>>>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage in general:   https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>>>>
>>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <al...@betterup.co> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>> We are using beam and (dataflow) at my company and would like to use it to read and write data from snowflake.
>>>>>>>
>>>>>>> Does anybody know if there are any source/sink available for snowflake?
>>>>>>>
>>>>>>> if not, what would be the easiest way to create those? (maybe there is something for sqlalchemy that we could leverage for that?)
>>>>>>>
>>>>>>>
>>>>>>> Thanks so much!
>>>>>>>
>>>>>>> Alan

Re: snowflake source/sink

Posted by Alan Krumholz <al...@betterup.co>.
Thanks Brice! I'll look into wrapping the connector.

One more question.
I'm trying now to develop a sink too. This is what I have:

def writeSnowflake(row):
    import snowflake.connector
    ctx = snowflake.connector.connect(...)
    cs = ctx.cursor()
    cs.execute(
        'INSERT INTO t(a, b) VALUES ({a}, {b})'.format(
            a = str(row['a']),
            b = str(row['b'])
        )
    )
    return row

pipeline = beam.Pipeline(...)
p = (
    pipeline
    | 'Read from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(...))
    | 'Write to SnowFlake' >> beam.Map(writeSnowflake)
)

This seems to (slowly) work but it feels extremely inefficient to send an
INSERT query to the DW for each row in the dataset.
Is there an easy way to have the pipeline maybe stack all my data rows into
chunks of 1,000 so I can insert these by chunks instead. I'm mostly curious
about how to have the pipeline pass 1K rows at a time to "writeSnowflake()"
instead of passing one by one.
Maybe by using a GroupByKey transformation and using randomly sampled keys
to create the chunks of the desired size? (or can you think of a better way
to achieve this?)

Thank you so much for all your help!

On Wed, Jan 29, 2020 at 11:30 AM Brice Giesbrecht <br...@gmail.com> wrote:

> Lovely! You absolutely have the right idea.
>
> Your comments are spot on but the suboptimal solution that works is
> usually preferable to the optimal one that doesn't (exist).
>
> I don't have any experience with the Snowflake connector but if it can be
> kept around and reused and is slow or expensive to create, you may want to
> consider a wrapper/class to manage the client/connector that you can use
> across your pipeline steps if needed.
>
> Happy processing.
> -Brice
>
> On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <al...@betterup.co>
> wrote:
>
>> Hi Brice,
>>
>> Thanks so much for your suggestion! I did the following and it seems to
>> work:
>>
>> class ReadSnowflake(beam.DoFn):
>>     def process(self, _):
>>         import snowflake.connector
>>         ctx = snowflake.connector.connect(...)
>>         cs = ctx.cursor()
>>         cs.execute('SELECT a, b FROM t')
>>         while True:
>>             data =  cs.fetchmany(1000)
>>             if len(data) == 0:
>>                 break;
>>             for d in data:
>>                 yield {'a':d[0], 'b':d[1]}
>>
>> pipeline = beam.Pipeline()
>> p = (
>>     pipeline
>>     | 'Empty start' >> beam.Create([''])
>>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>>
>>
>> I know this is not optimal as it is reading sequentially from snowflake
>> (instead of doing it in parallel as I'm sure the BQ source does) but apart
>> from that, do you see any other problems (or possible improvements) with
>> this code?
>>
>>
>> Thank you so much!
>>
>> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <br...@gmail.com>
>> wrote:
>>
>>> I am using a pattern which I saw online (but can't seem to locate) which
>>> performs i/o in a DoFn using a standard python client. I use this to read
>>> and write to Google cloud storage in streaming mode. You could use this
>>> idea to perform almost any i/o. Depending on your use case and workflow,
>>> this may be an approach you could consider. Shout if you need some
>>> boilerplate.
>>>
>>> It does look like native support is coming and you know it is true as I
>>> read it on the internet.    :)
>>>
>>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>>
>>>
>>> You could also setup an external service or endpoint to perform the
>>> query and read the results into your pipeline in a pipeline step similar to
>>> the enrichment idea here:
>>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>
>>> And you could always write your own connector. Not a task to be taken
>>> too lightly but it can be done.
>>>
>>> HTH
>>>
>>>
>>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <
>>> alan.krumholz@betterup.co> wrote:
>>>
>>>> Thanks for sharing this Erik!
>>>>
>>>> It would be really nice/convenient to have a python option to do
>>>> something like that. Our ML team is mostly a python shop and we are also
>>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>>>> their python sdk to author these).
>>>>
>>>> Please let me know if you can think of any way we could do this with
>>>> python.
>>>>
>>>> Thanks so much!
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>>>> erik.willsey@panderasystems.com> wrote:
>>>>
>>>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage
>>>>> in general:
>>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>>>
>>>>>
>>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>>>> alan.krumholz@betterup.co> wrote:
>>>>>
>>>>>> Hi,
>>>>>> We are using beam and (dataflow) at my company and would like to use
>>>>>> it to read and write data from snowflake.
>>>>>>
>>>>>> Does anybody know if there are any source/sink available for
>>>>>> snowflake?
>>>>>>
>>>>>> if not, what would be the easiest way to create those? (maybe there
>>>>>> is something for sqlalchemy that we could leverage for that?)
>>>>>>
>>>>>>
>>>>>> Thanks so much!
>>>>>>
>>>>>> Alan
>>>>>>
>>>>>

Re: snowflake source/sink

Posted by Brice Giesbrecht <br...@gmail.com>.
Lovely! You absolutely have the right idea.

Your comments are spot on but the suboptimal solution that works is usually
preferable to the optimal one that doesn't (exist).

I don't have any experience with the Snowflake connector but if it can be
kept around and reused and is slow or expensive to create, you may want to
consider a wrapper/class to manage the client/connector that you can use
across your pipeline steps if needed.

Happy processing.
-Brice

On Wed, Jan 29, 2020 at 2:01 PM Alan Krumholz <al...@betterup.co>
wrote:

> Hi Brice,
>
> Thanks so much for your suggestion! I did the following and it seems to
> work:
>
> class ReadSnowflake(beam.DoFn):
>     def process(self, _):
>         import snowflake.connector
>         ctx = snowflake.connector.connect(...)
>         cs = ctx.cursor()
>         cs.execute('SELECT a, b FROM t')
>         while True:
>             data =  cs.fetchmany(1000)
>             if len(data) == 0:
>                 break;
>             for d in data:
>                 yield {'a':d[0], 'b':d[1]}
>
> pipeline = beam.Pipeline()
> p = (
>     pipeline
>     | 'Empty start' >> beam.Create([''])
>     | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
>     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))
>
>
> I know this is not optimal as it is reading sequentially from snowflake
> (instead of doing it in parallel as I'm sure the BQ source does) but apart
> from that, do you see any other problems (or possible improvements) with
> this code?
>
>
> Thank you so much!
>
> On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <br...@gmail.com> wrote:
>
>> I am using a pattern which I saw online (but can't seem to locate) which
>> performs i/o in a DoFn using a standard python client. I use this to read
>> and write to Google cloud storage in streaming mode. You could use this
>> idea to perform almost any i/o. Depending on your use case and workflow,
>> this may be an approach you could consider. Shout if you need some
>> boilerplate.
>>
>> It does look like native support is coming and you know it is true as I
>> read it on the internet.    :)
>>
>> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>>
>>
>> You could also setup an external service or endpoint to perform the query
>> and read the results into your pipeline in a pipeline step similar to the
>> enrichment idea here:
>> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>
>> And you could always write your own connector. Not a task to be taken too
>> lightly but it can be done.
>>
>> HTH
>>
>>
>> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <al...@betterup.co>
>> wrote:
>>
>>> Thanks for sharing this Erik!
>>>
>>> It would be really nice/convenient to have a python option to do
>>> something like that. Our ML team is mostly a python shop and we are also
>>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>>> their python sdk to author these).
>>>
>>> Please let me know if you can think of any way we could do this with
>>> python.
>>>
>>> Thanks so much!
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>>> erik.willsey@panderasystems.com> wrote:
>>>
>>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage
>>>> in general:
>>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>>
>>>>
>>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>>> alan.krumholz@betterup.co> wrote:
>>>>
>>>>> Hi,
>>>>> We are using beam and (dataflow) at my company and would like to use
>>>>> it to read and write data from snowflake.
>>>>>
>>>>> Does anybody know if there are any source/sink available for snowflake?
>>>>>
>>>>> if not, what would be the easiest way to create those? (maybe there is
>>>>> something for sqlalchemy that we could leverage for that?)
>>>>>
>>>>>
>>>>> Thanks so much!
>>>>>
>>>>> Alan
>>>>>
>>>>

Re: snowflake source/sink

Posted by Alan Krumholz <al...@betterup.co>.
Hi Brice,

Thanks so much for your suggestion! I did the following and it seems to
work:

class ReadSnowflake(beam.DoFn):
    def process(self, _):
        import snowflake.connector
        ctx = snowflake.connector.connect(...)
        cs = ctx.cursor()
        cs.execute('SELECT a, b FROM t')
        while True:
            data =  cs.fetchmany(1000)
            if len(data) == 0:
                break;
            for d in data:
                yield {'a':d[0], 'b':d[1]}

pipeline = beam.Pipeline()
p = (
    pipeline
    | 'Empty start' >> beam.Create([''])
    | 'Read from Snowflake' >> beam.ParDo(ReadSnowflake())
    | 'Write to BigQuery' >> beam.io.WriteToBigQuery(...))


I know this is not optimal as it is reading sequentially from snowflake
(instead of doing it in parallel as I'm sure the BQ source does) but apart
from that, do you see any other problems (or possible improvements) with
this code?


Thank you so much!

On Wed, Jan 29, 2020 at 8:45 AM Brice Giesbrecht <br...@gmail.com> wrote:

> I am using a pattern which I saw online (but can't seem to locate) which
> performs i/o in a DoFn using a standard python client. I use this to read
> and write to Google cloud storage in streaming mode. You could use this
> idea to perform almost any i/o. Depending on your use case and workflow,
> this may be an approach you could consider. Shout if you need some
> boilerplate.
>
> It does look like native support is coming and you know it is true as I
> read it on the internet.    :)
>
> https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/
>
>
> You could also setup an external service or endpoint to perform the query
> and read the results into your pipeline in a pipeline step similar to the
> enrichment idea here:
> https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>
> And you could always write your own connector. Not a task to be taken too
> lightly but it can be done.
>
> HTH
>
>
> On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <al...@betterup.co>
> wrote:
>
>> Thanks for sharing this Erik!
>>
>> It would be really nice/convenient to have a python option to do
>> something like that. Our ML team is mostly a python shop and we are also
>> using kubeflow pipelines to orchestrate our ML pipelines (mostly using
>> their python sdk to author these).
>>
>> Please let me know if you can think of any way we could do this with
>> python.
>>
>> Thanks so much!
>>
>>
>>
>>
>>
>> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
>> erik.willsey@panderasystems.com> wrote:
>>
>>> You can use the JDBC driver.  Here's a blog that describes JDBC usage in
>>> general:
>>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>>
>>>
>>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <
>>> alan.krumholz@betterup.co> wrote:
>>>
>>>> Hi,
>>>> We are using beam and (dataflow) at my company and would like to use it
>>>> to read and write data from snowflake.
>>>>
>>>> Does anybody know if there are any source/sink available for snowflake?
>>>>
>>>> if not, what would be the easiest way to create those? (maybe there is
>>>> something for sqlalchemy that we could leverage for that?)
>>>>
>>>>
>>>> Thanks so much!
>>>>
>>>> Alan
>>>>
>>>

Re: snowflake source/sink

Posted by Brice Giesbrecht <br...@gmail.com>.
I am using a pattern which I saw online (but can't seem to locate) which
performs i/o in a DoFn using a standard python client. I use this to read
and write to Google cloud storage in streaming mode. You could use this
idea to perform almost any i/o. Depending on your use case and workflow,
this may be an approach you could consider. Shout if you need some
boilerplate.

It does look like native support is coming and you know it is true as I
read it on the internet.    :)
https://www.snowflake.com/blog/snowflake-on-google-cloud-platform-now-in-preview/


You could also setup an external service or endpoint to perform the query
and read the results into your pipeline in a pipeline step similar to the
enrichment idea here:
https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1

And you could always write your own connector. Not a task to be taken too
lightly but it can be done.

HTH


On Wed, Jan 29, 2020 at 10:17 AM Alan Krumholz <al...@betterup.co>
wrote:

> Thanks for sharing this Erik!
>
> It would be really nice/convenient to have a python option to do something
> like that. Our ML team is mostly a python shop and we are also using
> kubeflow pipelines to orchestrate our ML pipelines (mostly using their
> python sdk to author these).
>
> Please let me know if you can think of any way we could do this with
> python.
>
> Thanks so much!
>
>
>
>
>
> On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
> erik.willsey@panderasystems.com> wrote:
>
>> You can use the JDBC driver.  Here's a blog that describes JDBC usage in
>> general:
>> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>>
>>
>> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <al...@betterup.co>
>> wrote:
>>
>>> Hi,
>>> We are using beam and (dataflow) at my company and would like to use it
>>> to read and write data from snowflake.
>>>
>>> Does anybody know if there are any source/sink available for snowflake?
>>>
>>> if not, what would be the easiest way to create those? (maybe there is
>>> something for sqlalchemy that we could leverage for that?)
>>>
>>>
>>> Thanks so much!
>>>
>>> Alan
>>>
>>

Re: snowflake source/sink

Posted by Alan Krumholz <al...@betterup.co>.
Thanks for sharing this Erik!

It would be really nice/convenient to have a python option to do something
like that. Our ML team is mostly a python shop and we are also using
kubeflow pipelines to orchestrate our ML pipelines (mostly using their
python sdk to author these).

Please let me know if you can think of any way we could do this with python.

Thanks so much!





On Mon, Jan 27, 2020 at 1:18 PM Erik Willsey <
erik.willsey@panderasystems.com> wrote:

> You can use the JDBC driver.  Here's a blog that describes JDBC usage in
> general:
> https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/
>
>
> On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <al...@betterup.co>
> wrote:
>
>> Hi,
>> We are using beam and (dataflow) at my company and would like to use it
>> to read and write data from snowflake.
>>
>> Does anybody know if there are any source/sink available for snowflake?
>>
>> if not, what would be the easiest way to create those? (maybe there is
>> something for sqlalchemy that we could leverage for that?)
>>
>>
>> Thanks so much!
>>
>> Alan
>>
>

Re: snowflake source/sink

Posted by Erik Willsey <er...@panderasystems.com>.
You can use the JDBC driver.  Here's a blog that describes JDBC usage in
general:
https://nl.devoteam.com/en/blog-post/querying-jdbc-database-parallel-google-dataflow-apache-beam/


On Mon, Jan 27, 2020 at 12:32 PM Alan Krumholz <al...@betterup.co>
wrote:

> Hi,
> We are using beam and (dataflow) at my company and would like to use it to
> read and write data from snowflake.
>
> Does anybody know if there are any source/sink available for snowflake?
>
> if not, what would be the easiest way to create those? (maybe there is
> something for sqlalchemy that we could leverage for that?)
>
>
> Thanks so much!
>
> Alan
>