You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Dmitry Demeshchuk <dm...@postmates.com> on 2017/06/12 21:22:12 UTC

Redshift source for Python

Hi, list,

I was hoping someone could give me a general code review on a Redshift
source I wrote:
https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1. It also
relies on modules `s3` and `config` from our internal library, I can add
them too if needed, it just was more hassle to open up the entire
repository with the code, since it contains some company-specific code at
the moment.

My hope was also to find out if you wanted me to file a pull request, we'd
be totally fine to open source this piece, as well as some other AWS
sources and sinks in the future.

Finally, I have a specific question about cleanup. My impression was that
https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L153
would help making sure that there's no possible data loss after we delete
the S3 files, however, in a personal conversation Eugene Kirpichev pointed
out that this way does not ensure the PCollection persistence, and that
Dataflow will just fuse multiple phases together.

Also, Eugene pointed out that this cleanup problem has been worked around
in the BigQuery source in Java SDK. To my understanding, it's this one:
https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L100,
however I don't yet have enough knowledge about the parity between Java and
Python SDKs to tell whether I can or cannot implement a Python source in a
similar fashion (from what I remember, implementing sources is generally
frowned upon, as opposed to writing a DoFn instead).

Any thoughts and suggestions would be highly appreciated.

Thank you.

-- 
Best regards,
Dmitry Demeshchuk.

Re: Redshift source for Python

Posted by Chamikara Jayalath <ch...@apache.org>.
On Tue, Jun 13, 2017 at 10:35 AM Sourabh Bajaj <so...@google.com>
wrote:

> Thinking more about this, I think we need to consider a few more things:
>
> 1. Using the temporary directory as S3 won't be possible with the dataflow
> runner as the Runner won't have the credentials to clean up the files in
> S3. A work around could be to use a file expiration policy on the bucket in
> S3 to delete the files after 7 days and not do the cleaning in the
> pipeline, and use this bucket in your unload jobs.
>
> 2. The above also brings up another point that is that currently we don't
> have a notion of a finalize step in the pipeline. It is also hard to build
> this manually for some pipeline sinks that return a PDone instead of any
> metadata on what the sink did. This should be a separate thread on the dev
> list.
>

Agree that currently there is no runner/filesystem-independent way to
enforce cleanup. Solutions that enforce a certain structure to the pipeline
graph or cleaning-up after a blocking pipeline execution does not work for
this case since we are talking about enforcing cleanup after a read
transform. I agree that best solution currently might be to enforce some
expiration policy on temporary files.

Regarding your comment "but reading more code made me realize that I should
have used side inputs for that instead", I agree that using a side-input
better here since otherwise you are creating a copy of the column schema
for every record of the menifest file.

Thanks,
Cham


>
> 3. For the point about keys one of the ways to have them available to each
> worker would be to create a package similar to the julia-set example and
> then in the setup.py of that package pull data from KMS and then
> instantiate appropriate environment variables that are needed for access to
> AWS. This is just one way to do this and others might have better ideas.
>
> -Sourabh
>
> On Mon, Jun 12, 2017 at 5:38 PM Dmitry Demeshchuk <dm...@postmates.com>
> wrote:
>
>> I really like the idea of a general S3 FileSystem first, sounds like it
>> may also make the dynamic rebalancing work way easier.
>>
>> As for a generic PostgresIO source – yes, that's also in plans, because
>> we rely on regular Postgres as well. In Redshift, using UNLOAD on large
>> datasets is still the recommended approach, and we'd be using Redshift more
>> extensively, so I decided to start with this source. I think these two
>> sources will be complimentary to each other.
>>
>> As for the AWS keys – that's actually another question I wanted to ask:
>> what's the recommended approach to storing and passing around the secrets
>> like access tokens, passwords and such? The plan I originally had was to
>> instead make the Dataflow nodes make the appropriate API calls to Google
>> Cloud Storage and Google KMS. It feels, however, that it's generally safer
>> and easier to reuse when secrets are defined at object configuration time
>> and then just passed in serialized fashion to Dataflow.
>>
>> On Mon, Jun 12, 2017 at 5:23 PM, Sourabh Bajaj <so...@google.com>
>> wrote:
>>
>>> Couple of more things to add on what Cham mentioned in his email.
>>>
>>> 1. We currently don't have an implementation for the S3 FileSystem so
>>> adding that might be a good starting point before you implement the unload
>>> load patterns as you won't be able to use the pipeline's temporary
>>> directory as an S3 path without that.
>>>
>>> 2. Another option based on the size of the Redshift tables might be to
>>> directly use a psycopg2 to read the data instead of doing the unload. This
>>> can be a generic PostgresIO in python.
>>>
>>> On a side node: This is going to pass the users AWS keys on the wire to
>>> whatever runner. Might be good to make that optional and see if the machine
>>> IAM role can be used as well as adding a note to warn about this so that
>>> people can create keys with minimal access.
>>>
>>>
>>>
>>> On Mon, Jun 12, 2017 at 5:06 PM Eugene Kirpichov <ki...@google.com>
>>> wrote:
>>>
>>>> 1: Using BoundedSource is not an antipattern per se. It is
>>>> *recommended* in case you are able to use the capabilities that it provides
>>>> over a ParDo - otherwise, it's recommended to use ParDo: see
>>>> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
>>>> .
>>>>
>>>> 3: assume that, when you apply two ParDo's in a row to a collection,
>>>> they will typically be fused. Fusion (in Dataflow) is very aggressive and
>>>> will usually fuse things unless it is explicitly impossible (e.g. it's
>>>> impossible to fuse across a GroupByKey, and it also won't fuse across a
>>>> PCollection that gets passed to somebody as a side input).
>>>>
>>>> On Mon, Jun 12, 2017 at 5:02 PM Dmitry Demeshchuk <dm...@postmates.com>
>>>> wrote:
>>>>
>>>>> Hi Cham,
>>>>>
>>>>> Thanks a lot for the clarifications!
>>>>>
>>>>> (1) I wouldn't mind to use BoundedSource, it's just that my impression
>>>>> was that it was considered to be an anti-pattern. Seems like most of the
>>>>> logic will be left intact though, so shouldn't really be a problem. Is
>>>>> BoundedSource API going to stay mostly the same after the introduction of
>>>>> SplittableDoFn?
>>>>>
>>>>> (2) Makes sense, thanks!
>>>>>
>>>>> (3) Thanks for the tip! This makes me wonder if some sort of test-time
>>>>> fusion detector would be useful, which would take a pipeline, analyze it
>>>>> and print out which transformations may be fused by the runner. But maybe
>>>>> it's just easier to remember that ParDo steps tend to fuse.
>>>>>
>>>>> On a side note, as you may see, I've been putting the columns data
>>>>> into the PCollection (for example, here:
>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72),
>>>>> but reading more code made me realize that I should have used side inputs
>>>>> for that instead?
>>>>>
>>>>> On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath <
>>>>> chamikara@apache.org> wrote:
>>>>>
>>>>>> Hi Dmitry,
>>>>>>
>>>>>> Thanks for writing this. Some general comments.
>>>>>>
>>>>>> (1) Do you want to implement this using ParDos or using BoundedSource
>>>>>> [1] API. Using BoundedSource API has some benefits such as support for
>>>>>> dynamic work rebalancing (see [2]) though using ParDos will be more
>>>>>> future-proof (dynamic work rebalancing will be supported sometime in the
>>>>>> future through SplittableDoFn API [3][4]).
>>>>>>
>>>>>> (2) Seems like what Java BigQuery source is doing is, deleting the
>>>>>> temporary table at the location you mentioned, and deleting temporary
>>>>>> exported files by mapping the directory path to pipeline's temporary path
>>>>>> (which hopefully gets deleted by the runner). You should be able to utilize
>>>>>> a similar approach in Python SDK. You should not delete exported files in a
>>>>>> Pardo since a runner might rerun stages of a pipeline.
>>>>>>
>>>>>> (3) If you are using ParDo based approach, you should add a
>>>>>> GroupByKey between the ParDos 'GetS3Files' and 'LoadDataFromS3'. Otherwise
>>>>>> all of you ParDos might get fused to a single stage and you might end up
>>>>>> reading all the data from a single worker.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L73
>>>>>> [2]
>>>>>> https://beam.apache.org/documentation/io/authoring-overview/#read-transforms
>>>>>> [3] https://s.apache.org/splittable-do-fn
>>>>>> [4]
>>>>>> https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit#
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <
>>>>>> dmitry@postmates.com> wrote:
>>>>>>
>>>>>>> Hi, list,
>>>>>>>
>>>>>>> I was hoping someone could give me a general code review on a
>>>>>>> Redshift source I wrote:
>>>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1.
>>>>>>> It also relies on modules `s3` and `config` from our internal library, I
>>>>>>> can add them too if needed, it just was more hassle to open up the entire
>>>>>>> repository with the code, since it contains some company-specific code at
>>>>>>> the moment.
>>>>>>>
>>>>>>> My hope was also to find out if you wanted me to file a pull
>>>>>>> request, we'd be totally fine to open source this piece, as well as some
>>>>>>> other AWS sources and sinks in the future.
>>>>>>>
>>>>>>> Finally, I have a specific question about cleanup. My impression was
>>>>>>> that
>>>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L153
>>>>>>> would help making sure that there's no possible data loss after we delete
>>>>>>> the S3 files, however, in a personal conversation Eugene Kirpichev pointed
>>>>>>> out that this way does not ensure the PCollection persistence, and that
>>>>>>> Dataflow will just fuse multiple phases together.
>>>>>>>
>>>>>>> Also, Eugene pointed out that this cleanup problem has been worked
>>>>>>> around in the BigQuery source in Java SDK. To my understanding, it's this
>>>>>>> one:
>>>>>>> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L100,
>>>>>>> however I don't yet have enough knowledge about the parity between Java and
>>>>>>> Python SDKs to tell whether I can or cannot implement a Python source in a
>>>>>>> similar fashion (from what I remember, implementing sources is generally
>>>>>>> frowned upon, as opposed to writing a DoFn instead).
>>>>>>>
>>>>>>> Any thoughts and suggestions would be highly appreciated.
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Dmitry Demeshchuk.
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Dmitry Demeshchuk.
>>>>>
>>>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>

Re: Redshift source for Python

Posted by Sourabh Bajaj <so...@google.com>.
Thinking more about this, I think we need to consider a few more things:

1. Using the temporary directory as S3 won't be possible with the dataflow
runner as the Runner won't have the credentials to clean up the files in
S3. A work around could be to use a file expiration policy on the bucket in
S3 to delete the files after 7 days and not do the cleaning in the
pipeline, and use this bucket in your unload jobs.

2. The above also brings up another point that is that currently we don't
have a notion of a finalize step in the pipeline. It is also hard to build
this manually for some pipeline sinks that return a PDone instead of any
metadata on what the sink did. This should be a separate thread on the dev
list.

3. For the point about keys one of the ways to have them available to each
worker would be to create a package similar to the julia-set example and
then in the setup.py of that package pull data from KMS and then
instantiate appropriate environment variables that are needed for access to
AWS. This is just one way to do this and others might have better ideas.

-Sourabh

On Mon, Jun 12, 2017 at 5:38 PM Dmitry Demeshchuk <dm...@postmates.com>
wrote:

> I really like the idea of a general S3 FileSystem first, sounds like it
> may also make the dynamic rebalancing work way easier.
>
> As for a generic PostgresIO source – yes, that's also in plans, because we
> rely on regular Postgres as well. In Redshift, using UNLOAD on large
> datasets is still the recommended approach, and we'd be using Redshift more
> extensively, so I decided to start with this source. I think these two
> sources will be complimentary to each other.
>
> As for the AWS keys – that's actually another question I wanted to ask:
> what's the recommended approach to storing and passing around the secrets
> like access tokens, passwords and such? The plan I originally had was to
> instead make the Dataflow nodes make the appropriate API calls to Google
> Cloud Storage and Google KMS. It feels, however, that it's generally safer
> and easier to reuse when secrets are defined at object configuration time
> and then just passed in serialized fashion to Dataflow.
>
> On Mon, Jun 12, 2017 at 5:23 PM, Sourabh Bajaj <so...@google.com>
> wrote:
>
>> Couple of more things to add on what Cham mentioned in his email.
>>
>> 1. We currently don't have an implementation for the S3 FileSystem so
>> adding that might be a good starting point before you implement the unload
>> load patterns as you won't be able to use the pipeline's temporary
>> directory as an S3 path without that.
>>
>> 2. Another option based on the size of the Redshift tables might be to
>> directly use a psycopg2 to read the data instead of doing the unload. This
>> can be a generic PostgresIO in python.
>>
>> On a side node: This is going to pass the users AWS keys on the wire to
>> whatever runner. Might be good to make that optional and see if the machine
>> IAM role can be used as well as adding a note to warn about this so that
>> people can create keys with minimal access.
>>
>>
>>
>> On Mon, Jun 12, 2017 at 5:06 PM Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>> 1: Using BoundedSource is not an antipattern per se. It is *recommended*
>>> in case you are able to use the capabilities that it provides over a ParDo
>>> - otherwise, it's recommended to use ParDo: see
>>> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
>>> .
>>>
>>> 3: assume that, when you apply two ParDo's in a row to a collection,
>>> they will typically be fused. Fusion (in Dataflow) is very aggressive and
>>> will usually fuse things unless it is explicitly impossible (e.g. it's
>>> impossible to fuse across a GroupByKey, and it also won't fuse across a
>>> PCollection that gets passed to somebody as a side input).
>>>
>>> On Mon, Jun 12, 2017 at 5:02 PM Dmitry Demeshchuk <dm...@postmates.com>
>>> wrote:
>>>
>>>> Hi Cham,
>>>>
>>>> Thanks a lot for the clarifications!
>>>>
>>>> (1) I wouldn't mind to use BoundedSource, it's just that my impression
>>>> was that it was considered to be an anti-pattern. Seems like most of the
>>>> logic will be left intact though, so shouldn't really be a problem. Is
>>>> BoundedSource API going to stay mostly the same after the introduction of
>>>> SplittableDoFn?
>>>>
>>>> (2) Makes sense, thanks!
>>>>
>>>> (3) Thanks for the tip! This makes me wonder if some sort of test-time
>>>> fusion detector would be useful, which would take a pipeline, analyze it
>>>> and print out which transformations may be fused by the runner. But maybe
>>>> it's just easier to remember that ParDo steps tend to fuse.
>>>>
>>>> On a side note, as you may see, I've been putting the columns data into
>>>> the PCollection (for example, here:
>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72),
>>>> but reading more code made me realize that I should have used side inputs
>>>> for that instead?
>>>>
>>>> On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath <
>>>> chamikara@apache.org> wrote:
>>>>
>>>>> Hi Dmitry,
>>>>>
>>>>> Thanks for writing this. Some general comments.
>>>>>
>>>>> (1) Do you want to implement this using ParDos or using BoundedSource
>>>>> [1] API. Using BoundedSource API has some benefits such as support for
>>>>> dynamic work rebalancing (see [2]) though using ParDos will be more
>>>>> future-proof (dynamic work rebalancing will be supported sometime in the
>>>>> future through SplittableDoFn API [3][4]).
>>>>>
>>>>> (2) Seems like what Java BigQuery source is doing is, deleting the
>>>>> temporary table at the location you mentioned, and deleting temporary
>>>>> exported files by mapping the directory path to pipeline's temporary path
>>>>> (which hopefully gets deleted by the runner). You should be able to utilize
>>>>> a similar approach in Python SDK. You should not delete exported files in a
>>>>> Pardo since a runner might rerun stages of a pipeline.
>>>>>
>>>>> (3) If you are using ParDo based approach, you should add a GroupByKey
>>>>> between the ParDos 'GetS3Files' and 'LoadDataFromS3'. Otherwise all of you
>>>>> ParDos might get fused to a single stage and you might end up reading all
>>>>> the data from a single worker.
>>>>>
>>>>> Thanks,
>>>>> Cham.
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L73
>>>>> [2]
>>>>> https://beam.apache.org/documentation/io/authoring-overview/#read-transforms
>>>>> [3] https://s.apache.org/splittable-do-fn
>>>>> [4]
>>>>> https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit#
>>>>>
>>>>>
>>>>> On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <
>>>>> dmitry@postmates.com> wrote:
>>>>>
>>>>>> Hi, list,
>>>>>>
>>>>>> I was hoping someone could give me a general code review on a
>>>>>> Redshift source I wrote:
>>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1.
>>>>>> It also relies on modules `s3` and `config` from our internal library, I
>>>>>> can add them too if needed, it just was more hassle to open up the entire
>>>>>> repository with the code, since it contains some company-specific code at
>>>>>> the moment.
>>>>>>
>>>>>> My hope was also to find out if you wanted me to file a pull request,
>>>>>> we'd be totally fine to open source this piece, as well as some other AWS
>>>>>> sources and sinks in the future.
>>>>>>
>>>>>> Finally, I have a specific question about cleanup. My impression was
>>>>>> that
>>>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L153
>>>>>> would help making sure that there's no possible data loss after we delete
>>>>>> the S3 files, however, in a personal conversation Eugene Kirpichev pointed
>>>>>> out that this way does not ensure the PCollection persistence, and that
>>>>>> Dataflow will just fuse multiple phases together.
>>>>>>
>>>>>> Also, Eugene pointed out that this cleanup problem has been worked
>>>>>> around in the BigQuery source in Java SDK. To my understanding, it's this
>>>>>> one:
>>>>>> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L100,
>>>>>> however I don't yet have enough knowledge about the parity between Java and
>>>>>> Python SDKs to tell whether I can or cannot implement a Python source in a
>>>>>> similar fashion (from what I remember, implementing sources is generally
>>>>>> frowned upon, as opposed to writing a DoFn instead).
>>>>>>
>>>>>> Any thoughts and suggestions would be highly appreciated.
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Dmitry Demeshchuk.
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> Dmitry Demeshchuk.
>>>>
>>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Re: Redshift source for Python

Posted by Dmitry Demeshchuk <dm...@postmates.com>.
I really like the idea of a general S3 FileSystem first, sounds like it may
also make the dynamic rebalancing work way easier.

As for a generic PostgresIO source – yes, that's also in plans, because we
rely on regular Postgres as well. In Redshift, using UNLOAD on large
datasets is still the recommended approach, and we'd be using Redshift more
extensively, so I decided to start with this source. I think these two
sources will be complimentary to each other.

As for the AWS keys – that's actually another question I wanted to ask:
what's the recommended approach to storing and passing around the secrets
like access tokens, passwords and such? The plan I originally had was to
instead make the Dataflow nodes make the appropriate API calls to Google
Cloud Storage and Google KMS. It feels, however, that it's generally safer
and easier to reuse when secrets are defined at object configuration time
and then just passed in serialized fashion to Dataflow.

On Mon, Jun 12, 2017 at 5:23 PM, Sourabh Bajaj <so...@google.com>
wrote:

> Couple of more things to add on what Cham mentioned in his email.
>
> 1. We currently don't have an implementation for the S3 FileSystem so
> adding that might be a good starting point before you implement the unload
> load patterns as you won't be able to use the pipeline's temporary
> directory as an S3 path without that.
>
> 2. Another option based on the size of the Redshift tables might be to
> directly use a psycopg2 to read the data instead of doing the unload. This
> can be a generic PostgresIO in python.
>
> On a side node: This is going to pass the users AWS keys on the wire to
> whatever runner. Might be good to make that optional and see if the machine
> IAM role can be used as well as adding a note to warn about this so that
> people can create keys with minimal access.
>
>
>
> On Mon, Jun 12, 2017 at 5:06 PM Eugene Kirpichov <ki...@google.com>
> wrote:
>
>> 1: Using BoundedSource is not an antipattern per se. It is *recommended*
>> in case you are able to use the capabilities that it provides over a ParDo
>> - otherwise, it's recommended to use ParDo: see https://beam.apache.org/
>> documentation/io/authoring-overview/#when-to-implement-
>> using-the-source-api .
>>
>> 3: assume that, when you apply two ParDo's in a row to a collection, they
>> will typically be fused. Fusion (in Dataflow) is very aggressive and will
>> usually fuse things unless it is explicitly impossible (e.g. it's
>> impossible to fuse across a GroupByKey, and it also won't fuse across a
>> PCollection that gets passed to somebody as a side input).
>>
>> On Mon, Jun 12, 2017 at 5:02 PM Dmitry Demeshchuk <dm...@postmates.com>
>> wrote:
>>
>>> Hi Cham,
>>>
>>> Thanks a lot for the clarifications!
>>>
>>> (1) I wouldn't mind to use BoundedSource, it's just that my impression
>>> was that it was considered to be an anti-pattern. Seems like most of the
>>> logic will be left intact though, so shouldn't really be a problem. Is
>>> BoundedSource API going to stay mostly the same after the introduction of
>>> SplittableDoFn?
>>>
>>> (2) Makes sense, thanks!
>>>
>>> (3) Thanks for the tip! This makes me wonder if some sort of test-time
>>> fusion detector would be useful, which would take a pipeline, analyze it
>>> and print out which transformations may be fused by the runner. But maybe
>>> it's just easier to remember that ParDo steps tend to fuse.
>>>
>>> On a side note, as you may see, I've been putting the columns data into
>>> the PCollection (for example, here: https://gist.github.com/doubleyou/
>>> d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72), but reading
>>> more code made me realize that I should have used side inputs for that
>>> instead?
>>>
>>> On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath <
>>> chamikara@apache.org> wrote:
>>>
>>>> Hi Dmitry,
>>>>
>>>> Thanks for writing this. Some general comments.
>>>>
>>>> (1) Do you want to implement this using ParDos or using BoundedSource
>>>> [1] API. Using BoundedSource API has some benefits such as support for
>>>> dynamic work rebalancing (see [2]) though using ParDos will be more
>>>> future-proof (dynamic work rebalancing will be supported sometime in the
>>>> future through SplittableDoFn API [3][4]).
>>>>
>>>> (2) Seems like what Java BigQuery source is doing is, deleting the
>>>> temporary table at the location you mentioned, and deleting temporary
>>>> exported files by mapping the directory path to pipeline's temporary path
>>>> (which hopefully gets deleted by the runner). You should be able to utilize
>>>> a similar approach in Python SDK. You should not delete exported files in a
>>>> Pardo since a runner might rerun stages of a pipeline.
>>>>
>>>> (3) If you are using ParDo based approach, you should add a GroupByKey
>>>> between the ParDos 'GetS3Files' and 'LoadDataFromS3'. Otherwise all of you
>>>> ParDos might get fused to a single stage and you might end up reading all
>>>> the data from a single worker.
>>>>
>>>> Thanks,
>>>> Cham.
>>>>
>>>> [1] https://github.com/apache/beam/blob/master/sdks/python/
>>>> apache_beam/io/iobase.py#L73
>>>> [2] https://beam.apache.org/documentation/io/authoring-
>>>> overview/#read-transforms
>>>> [3] https://s.apache.org/splittable-do-fn
>>>> [4] https://docs.google.com/document/d/1h_
>>>> zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit#
>>>>
>>>>
>>>> On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <dm...@postmates.com>
>>>> wrote:
>>>>
>>>>> Hi, list,
>>>>>
>>>>> I was hoping someone could give me a general code review on a Redshift
>>>>> source I wrote: https://gist.github.com/doubleyou/
>>>>> d3236180691dc9b146e17bc046ec1fc1. It also relies on modules `s3` and
>>>>> `config` from our internal library, I can add them too if needed, it just
>>>>> was more hassle to open up the entire repository with the code, since it
>>>>> contains some company-specific code at the moment.
>>>>>
>>>>> My hope was also to find out if you wanted me to file a pull request,
>>>>> we'd be totally fine to open source this piece, as well as some other AWS
>>>>> sources and sinks in the future.
>>>>>
>>>>> Finally, I have a specific question about cleanup. My impression was
>>>>> that https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1f
>>>>> c1#file-redshift-py-L153 would help making sure that there's no
>>>>> possible data loss after we delete the S3 files, however, in a personal
>>>>> conversation Eugene Kirpichev pointed out that this way does not ensure the
>>>>> PCollection persistence, and that Dataflow will just fuse multiple phases
>>>>> together.
>>>>>
>>>>> Also, Eugene pointed out that this cleanup problem has been worked
>>>>> around in the BigQuery source in Java SDK. To my understanding, it's this
>>>>> one: https://github.com/apache/beam/blob/
>>>>> 70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/
>>>>> google-cloud-platform/src/main/java/org/apache/beam/sdk/
>>>>> io/gcp/bigquery/BigQuerySourceBase.java#L100, however I don't yet
>>>>> have enough knowledge about the parity between Java and Python SDKs to tell
>>>>> whether I can or cannot implement a Python source in a similar fashion
>>>>> (from what I remember, implementing sources is generally frowned upon, as
>>>>> opposed to writing a DoFn instead).
>>>>>
>>>>> Any thoughts and suggestions would be highly appreciated.
>>>>>
>>>>> Thank you.
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Dmitry Demeshchuk.
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>


-- 
Best regards,
Dmitry Demeshchuk.

Re: Redshift source for Python

Posted by Sourabh Bajaj <so...@google.com>.
Couple of more things to add on what Cham mentioned in his email.

1. We currently don't have an implementation for the S3 FileSystem so
adding that might be a good starting point before you implement the unload
load patterns as you won't be able to use the pipeline's temporary
directory as an S3 path without that.

2. Another option based on the size of the Redshift tables might be to
directly use a psycopg2 to read the data instead of doing the unload. This
can be a generic PostgresIO in python.

On a side node: This is going to pass the users AWS keys on the wire to
whatever runner. Might be good to make that optional and see if the machine
IAM role can be used as well as adding a note to warn about this so that
people can create keys with minimal access.



On Mon, Jun 12, 2017 at 5:06 PM Eugene Kirpichov <ki...@google.com>
wrote:

> 1: Using BoundedSource is not an antipattern per se. It is *recommended*
> in case you are able to use the capabilities that it provides over a ParDo
> - otherwise, it's recommended to use ParDo: see
> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
> .
>
> 3: assume that, when you apply two ParDo's in a row to a collection, they
> will typically be fused. Fusion (in Dataflow) is very aggressive and will
> usually fuse things unless it is explicitly impossible (e.g. it's
> impossible to fuse across a GroupByKey, and it also won't fuse across a
> PCollection that gets passed to somebody as a side input).
>
> On Mon, Jun 12, 2017 at 5:02 PM Dmitry Demeshchuk <dm...@postmates.com>
> wrote:
>
>> Hi Cham,
>>
>> Thanks a lot for the clarifications!
>>
>> (1) I wouldn't mind to use BoundedSource, it's just that my impression
>> was that it was considered to be an anti-pattern. Seems like most of the
>> logic will be left intact though, so shouldn't really be a problem. Is
>> BoundedSource API going to stay mostly the same after the introduction of
>> SplittableDoFn?
>>
>> (2) Makes sense, thanks!
>>
>> (3) Thanks for the tip! This makes me wonder if some sort of test-time
>> fusion detector would be useful, which would take a pipeline, analyze it
>> and print out which transformations may be fused by the runner. But maybe
>> it's just easier to remember that ParDo steps tend to fuse.
>>
>> On a side note, as you may see, I've been putting the columns data into
>> the PCollection (for example, here:
>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72),
>> but reading more code made me realize that I should have used side inputs
>> for that instead?
>>
>> On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath <chamikara@apache.org
>> > wrote:
>>
>>> Hi Dmitry,
>>>
>>> Thanks for writing this. Some general comments.
>>>
>>> (1) Do you want to implement this using ParDos or using BoundedSource
>>> [1] API. Using BoundedSource API has some benefits such as support for
>>> dynamic work rebalancing (see [2]) though using ParDos will be more
>>> future-proof (dynamic work rebalancing will be supported sometime in the
>>> future through SplittableDoFn API [3][4]).
>>>
>>> (2) Seems like what Java BigQuery source is doing is, deleting the
>>> temporary table at the location you mentioned, and deleting temporary
>>> exported files by mapping the directory path to pipeline's temporary path
>>> (which hopefully gets deleted by the runner). You should be able to utilize
>>> a similar approach in Python SDK. You should not delete exported files in a
>>> Pardo since a runner might rerun stages of a pipeline.
>>>
>>> (3) If you are using ParDo based approach, you should add a GroupByKey
>>> between the ParDos 'GetS3Files' and 'LoadDataFromS3'. Otherwise all of you
>>> ParDos might get fused to a single stage and you might end up reading all
>>> the data from a single worker.
>>>
>>> Thanks,
>>> Cham.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L73
>>> [2]
>>> https://beam.apache.org/documentation/io/authoring-overview/#read-transforms
>>> [3] https://s.apache.org/splittable-do-fn
>>> [4]
>>> https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit#
>>>
>>>
>>> On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <dm...@postmates.com>
>>> wrote:
>>>
>>>> Hi, list,
>>>>
>>>> I was hoping someone could give me a general code review on a Redshift
>>>> source I wrote:
>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1. It
>>>> also relies on modules `s3` and `config` from our internal library, I can
>>>> add them too if needed, it just was more hassle to open up the entire
>>>> repository with the code, since it contains some company-specific code at
>>>> the moment.
>>>>
>>>> My hope was also to find out if you wanted me to file a pull request,
>>>> we'd be totally fine to open source this piece, as well as some other AWS
>>>> sources and sinks in the future.
>>>>
>>>> Finally, I have a specific question about cleanup. My impression was
>>>> that
>>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L153
>>>> would help making sure that there's no possible data loss after we delete
>>>> the S3 files, however, in a personal conversation Eugene Kirpichev pointed
>>>> out that this way does not ensure the PCollection persistence, and that
>>>> Dataflow will just fuse multiple phases together.
>>>>
>>>> Also, Eugene pointed out that this cleanup problem has been worked
>>>> around in the BigQuery source in Java SDK. To my understanding, it's this
>>>> one:
>>>> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L100,
>>>> however I don't yet have enough knowledge about the parity between Java and
>>>> Python SDKs to tell whether I can or cannot implement a Python source in a
>>>> similar fashion (from what I remember, implementing sources is generally
>>>> frowned upon, as opposed to writing a DoFn instead).
>>>>
>>>> Any thoughts and suggestions would be highly appreciated.
>>>>
>>>> Thank you.
>>>>
>>>> --
>>>> Best regards,
>>>> Dmitry Demeshchuk.
>>>>
>>>
>>
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>

Re: Redshift source for Python

Posted by Eugene Kirpichov <ki...@google.com>.
1: Using BoundedSource is not an antipattern per se. It is *recommended* in
case you are able to use the capabilities that it provides over a ParDo -
otherwise, it's recommended to use ParDo: see
https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
.

3: assume that, when you apply two ParDo's in a row to a collection, they
will typically be fused. Fusion (in Dataflow) is very aggressive and will
usually fuse things unless it is explicitly impossible (e.g. it's
impossible to fuse across a GroupByKey, and it also won't fuse across a
PCollection that gets passed to somebody as a side input).

On Mon, Jun 12, 2017 at 5:02 PM Dmitry Demeshchuk <dm...@postmates.com>
wrote:

> Hi Cham,
>
> Thanks a lot for the clarifications!
>
> (1) I wouldn't mind to use BoundedSource, it's just that my impression was
> that it was considered to be an anti-pattern. Seems like most of the logic
> will be left intact though, so shouldn't really be a problem. Is
> BoundedSource API going to stay mostly the same after the introduction of
> SplittableDoFn?
>
> (2) Makes sense, thanks!
>
> (3) Thanks for the tip! This makes me wonder if some sort of test-time
> fusion detector would be useful, which would take a pipeline, analyze it
> and print out which transformations may be fused by the runner. But maybe
> it's just easier to remember that ParDo steps tend to fuse.
>
> On a side note, as you may see, I've been putting the columns data into
> the PCollection (for example, here:
> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72),
> but reading more code made me realize that I should have used side inputs
> for that instead?
>
> On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath <ch...@apache.org>
> wrote:
>
>> Hi Dmitry,
>>
>> Thanks for writing this. Some general comments.
>>
>> (1) Do you want to implement this using ParDos or using BoundedSource [1]
>> API. Using BoundedSource API has some benefits such as support for dynamic
>> work rebalancing (see [2]) though using ParDos will be more future-proof
>> (dynamic work rebalancing will be supported sometime in the future through
>> SplittableDoFn API [3][4]).
>>
>> (2) Seems like what Java BigQuery source is doing is, deleting the
>> temporary table at the location you mentioned, and deleting temporary
>> exported files by mapping the directory path to pipeline's temporary path
>> (which hopefully gets deleted by the runner). You should be able to utilize
>> a similar approach in Python SDK. You should not delete exported files in a
>> Pardo since a runner might rerun stages of a pipeline.
>>
>> (3) If you are using ParDo based approach, you should add a GroupByKey
>> between the ParDos 'GetS3Files' and 'LoadDataFromS3'. Otherwise all of you
>> ParDos might get fused to a single stage and you might end up reading all
>> the data from a single worker.
>>
>> Thanks,
>> Cham.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L73
>> [2]
>> https://beam.apache.org/documentation/io/authoring-overview/#read-transforms
>> [3] https://s.apache.org/splittable-do-fn
>> [4]
>> https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit#
>>
>>
>> On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <dm...@postmates.com>
>> wrote:
>>
>>> Hi, list,
>>>
>>> I was hoping someone could give me a general code review on a Redshift
>>> source I wrote:
>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1. It
>>> also relies on modules `s3` and `config` from our internal library, I can
>>> add them too if needed, it just was more hassle to open up the entire
>>> repository with the code, since it contains some company-specific code at
>>> the moment.
>>>
>>> My hope was also to find out if you wanted me to file a pull request,
>>> we'd be totally fine to open source this piece, as well as some other AWS
>>> sources and sinks in the future.
>>>
>>> Finally, I have a specific question about cleanup. My impression was
>>> that
>>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L153
>>> would help making sure that there's no possible data loss after we delete
>>> the S3 files, however, in a personal conversation Eugene Kirpichev pointed
>>> out that this way does not ensure the PCollection persistence, and that
>>> Dataflow will just fuse multiple phases together.
>>>
>>> Also, Eugene pointed out that this cleanup problem has been worked
>>> around in the BigQuery source in Java SDK. To my understanding, it's this
>>> one:
>>> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L100,
>>> however I don't yet have enough knowledge about the parity between Java and
>>> Python SDKs to tell whether I can or cannot implement a Python source in a
>>> similar fashion (from what I remember, implementing sources is generally
>>> frowned upon, as opposed to writing a DoFn instead).
>>>
>>> Any thoughts and suggestions would be highly appreciated.
>>>
>>> Thank you.
>>>
>>> --
>>> Best regards,
>>> Dmitry Demeshchuk.
>>>
>>
>
>
> --
> Best regards,
> Dmitry Demeshchuk.
>

Re: Redshift source for Python

Posted by Dmitry Demeshchuk <dm...@postmates.com>.
Hi Cham,

Thanks a lot for the clarifications!

(1) I wouldn't mind to use BoundedSource, it's just that my impression was
that it was considered to be an anti-pattern. Seems like most of the logic
will be left intact though, so shouldn't really be a problem. Is
BoundedSource API going to stay mostly the same after the introduction of
SplittableDoFn?

(2) Makes sense, thanks!

(3) Thanks for the tip! This makes me wonder if some sort of test-time
fusion detector would be useful, which would take a pipeline, analyze it
and print out which transformations may be fused by the runner. But maybe
it's just easier to remember that ParDo steps tend to fuse.

On a side note, as you may see, I've been putting the columns data into the
PCollection (for example, here:
https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L72),
but reading more code made me realize that I should have used side inputs
for that instead?

On Mon, Jun 12, 2017 at 4:44 PM, Chamikara Jayalath <ch...@apache.org>
wrote:

> Hi Dmitry,
>
> Thanks for writing this. Some general comments.
>
> (1) Do you want to implement this using ParDos or using BoundedSource [1]
> API. Using BoundedSource API has some benefits such as support for dynamic
> work rebalancing (see [2]) though using ParDos will be more future-proof
> (dynamic work rebalancing will be supported sometime in the future through
> SplittableDoFn API [3][4]).
>
> (2) Seems like what Java BigQuery source is doing is, deleting the
> temporary table at the location you mentioned, and deleting temporary
> exported files by mapping the directory path to pipeline's temporary path
> (which hopefully gets deleted by the runner). You should be able to utilize
> a similar approach in Python SDK. You should not delete exported files in a
> Pardo since a runner might rerun stages of a pipeline.
>
> (3) If you are using ParDo based approach, you should add a GroupByKey
> between the ParDos 'GetS3Files' and 'LoadDataFromS3'. Otherwise all of you
> ParDos might get fused to a single stage and you might end up reading all
> the data from a single worker.
>
> Thanks,
> Cham.
>
> [1] https://github.com/apache/beam/blob/master/sdks/python/
> apache_beam/io/iobase.py#L73
> [2] https://beam.apache.org/documentation/io/authoring-
> overview/#read-transforms
> [3] https://s.apache.org/splittable-do-fn
> [4] https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH
> 1YDmi-s_ozM/edit#
>
>
> On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <dm...@postmates.com>
> wrote:
>
>> Hi, list,
>>
>> I was hoping someone could give me a general code review on a Redshift
>> source I wrote: https://gist.github.com/doubleyou/
>> d3236180691dc9b146e17bc046ec1fc1. It also relies on modules `s3` and
>> `config` from our internal library, I can add them too if needed, it just
>> was more hassle to open up the entire repository with the code, since it
>> contains some company-specific code at the moment.
>>
>> My hope was also to find out if you wanted me to file a pull request,
>> we'd be totally fine to open source this piece, as well as some other AWS
>> sources and sinks in the future.
>>
>> Finally, I have a specific question about cleanup. My impression was that
>> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1f
>> c1#file-redshift-py-L153 would help making sure that there's no possible
>> data loss after we delete the S3 files, however, in a personal conversation
>> Eugene Kirpichev pointed out that this way does not ensure the PCollection
>> persistence, and that Dataflow will just fuse multiple phases together.
>>
>> Also, Eugene pointed out that this cleanup problem has been worked around
>> in the BigQuery source in Java SDK. To my understanding, it's this one:
>> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff0
>> 36429429c1/sdks/java/io/google-cloud-platform/src/
>> main/java/org/apache/beam/sdk/io/gcp/bigquery/
>> BigQuerySourceBase.java#L100, however I don't yet have enough knowledge
>> about the parity between Java and Python SDKs to tell whether I can or
>> cannot implement a Python source in a similar fashion (from what I
>> remember, implementing sources is generally frowned upon, as opposed to
>> writing a DoFn instead).
>>
>> Any thoughts and suggestions would be highly appreciated.
>>
>> Thank you.
>>
>> --
>> Best regards,
>> Dmitry Demeshchuk.
>>
>


-- 
Best regards,
Dmitry Demeshchuk.

Re: Redshift source for Python

Posted by Chamikara Jayalath <ch...@apache.org>.
Hi Dmitry,

Thanks for writing this. Some general comments.

(1) Do you want to implement this using ParDos or using BoundedSource [1]
API. Using BoundedSource API has some benefits such as support for dynamic
work rebalancing (see [2]) though using ParDos will be more future-proof
(dynamic work rebalancing will be supported sometime in the future through
SplittableDoFn API [3][4]).

(2) Seems like what Java BigQuery source is doing is, deleting the
temporary table at the location you mentioned, and deleting temporary
exported files by mapping the directory path to pipeline's temporary path
(which hopefully gets deleted by the runner). You should be able to utilize
a similar approach in Python SDK. You should not delete exported files in a
Pardo since a runner might rerun stages of a pipeline.

(3) If you are using ParDo based approach, you should add a GroupByKey
between the ParDos 'GetS3Files' and 'LoadDataFromS3'. Otherwise all of you
ParDos might get fused to a single stage and you might end up reading all
the data from a single worker.

Thanks,
Cham.

[1]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L73
[2]
https://beam.apache.org/documentation/io/authoring-overview/#read-transforms
[3] https://s.apache.org/splittable-do-fn
[4]
https://docs.google.com/document/d/1h_zprJrOilivK2xfvl4L42vaX4DMYGfH1YDmi-s_ozM/edit#


On Mon, Jun 12, 2017 at 2:22 PM Dmitry Demeshchuk <dm...@postmates.com>
wrote:

> Hi, list,
>
> I was hoping someone could give me a general code review on a Redshift
> source I wrote:
> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1. It
> also relies on modules `s3` and `config` from our internal library, I can
> add them too if needed, it just was more hassle to open up the entire
> repository with the code, since it contains some company-specific code at
> the moment.
>
> My hope was also to find out if you wanted me to file a pull request, we'd
> be totally fine to open source this piece, as well as some other AWS
> sources and sinks in the future.
>
> Finally, I have a specific question about cleanup. My impression was that
> https://gist.github.com/doubleyou/d3236180691dc9b146e17bc046ec1fc1#file-redshift-py-L153
> would help making sure that there's no possible data loss after we delete
> the S3 files, however, in a personal conversation Eugene Kirpichev pointed
> out that this way does not ensure the PCollection persistence, and that
> Dataflow will just fuse multiple phases together.
>
> Also, Eugene pointed out that this cleanup problem has been worked around
> in the BigQuery source in Java SDK. To my understanding, it's this one:
> https://github.com/apache/beam/blob/70e53e7dc5d58e4d9f88c6d4f1cff036429429c1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L100,
> however I don't yet have enough knowledge about the parity between Java and
> Python SDKs to tell whether I can or cannot implement a Python source in a
> similar fashion (from what I remember, implementing sources is generally
> frowned upon, as opposed to writing a DoFn instead).
>
> Any thoughts and suggestions would be highly appreciated.
>
> Thank you.
>
> --
> Best regards,
> Dmitry Demeshchuk.
>