You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by OrielResearch Eila Arich-Landkof <ei...@orielresearch.org> on 2018/03/20 15:02:31 UTC

Help with Dynamic writing

Hello all,

It was nice to meet you last week!!!

I am writing genomic pCollection that is created from bigQuery to a folder.
Following is the code with output so you can run it with any small BQ table
and let me know what your thoughts are:

rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index': u'GSM2316666',
u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14': 0},{u'index':
u'GSM2312372', u'SNRPCP14': 0}]

rows[1].keys()
# output:  [u'index', u'SNRPCP14']

# you can change `archs4.results_20180308_ to any other table name with
index column
queries2 = rows | beam.Map(lambda x:
(beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115',
use_standard_sql=False, query=str('SELECT * FROM
`archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
                               str('gs://archs4/output/'+x["index"]+'/')))

queries2
# output: a list of pCollection and the path to write the pCollection data
to

[(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
  'gs://archs4/output/GSM2313641/'),
 (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
  'gs://archs4/output/GSM2316666/'),
 (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
  'gs://archs4/output/GSM2312355/'),
 (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
  'gs://archs4/output/GSM2312372/')]


*# this is my challenge*
queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND COLUMN")

Do you have any idea how to sink the data to a text file? I have tried few
other options and was stuck at the write transform

Any advice is very appreciated.

Thanks,
Eila



-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Re: Help with Dynamic writing

Posted by OrielResearch Eila Arich-Landkof <ei...@orielresearch.org>.
Thanks. Everything is working!
Eila

On Thu, Mar 22, 2018 at 3:38 AM, Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Wed, Mar 21, 2018 at 1:57 PM OrielResearch Eila Arich-Landkof <
> eila@orielresearch.org> wrote:
>
>> yes. You were right, I had to put back the pc.
>> I am working on the partition function and try to debug it without
>> running a pipeline on dataflow (dataflow execution takes at list 8 minutes
>> for any size of data), based on the link: https://medium.com/google-
>> cloud/quickly-experiment-with-dataflow-3d5a0da8d8e9
>>
>> *my code & questions:*
>> # Is it possible to use string as partition name? if not, what would be
>> the simplest solution?
>> # generating another PCollection with the samples (index) and merge with
>> the data table? if yes,
>> # how would I extract that information when I am writing to text file
>> def partition_fn(element, num_partitions):
>>   return(element['sample'])
>>
>
> I believe partition function has to return an integer that is the
> partition number for a given element. Please see following code snippet for
> an example usage.
> https://github.com/apache/beam/blob/master/sdks/python/
> apache_beam/examples/snippets/snippets.py#L1155
>
> Also, that example shows how you can write resulting PCollections to text
> files.
>
>
>>
>> # debug input, no pipeline
>>
>> *d = [{u'sample': u'GSM2313641', u'SNRPCP14': 0},{u'sample':
>> u'GSM2316666', u'SNRPCP14': 0},{u'sample': u'GSM2312355', u'SNRPCP14': 0}]
>> | beam.Flatten()*
>>
>> *d*
>> # output:
>>
>> [(u'sample', u'GSM2312355'),
>>  (u'SNRPCP14', 0),
>>  (u'sample', u'GSM2313641'),
>>  (u'SNRPCP14', 0),
>>  (u'sample', u'GSM2316666'),
>>  (u'SNRPCP14', 0)]
>>
>>
>>
>> *d | beam.Partition(partition_fn,3)*
>> #output, error:
>>
>> TypeError: tuple indices must be integers, not str [while running 'Partition(CallableWrapperPartitionFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']
>>
>>
> Please see above.
>
>
>>
>> *#writing to dynamic output - how do i extract the partition name that
>> the element is assigned to?*
>> *for i, partition in enumerate(partitions):*
>> *  # The partition path is using the partition str name*
>> *  pathi= str('gs://bucket/output/'+x["sample"]+'/')*
>> *  partition | label >> beam.io.WriteToText(pathi)*
>>
>>
>> Please also let me know if there is a better way to debug the code before
>> running on dataflow runner.
>>
>
> You can test code using DirectRunner before using DataflowRunner. Both
> runners should produce the same result.
>
>
>>
>> Many thanks,
>> Eila
>>
>>
>>
>> On Wed, Mar 21, 2018 at 12:43 PM, Chamikara Jayalath <
>> chamikara@google.com> wrote:
>>
>>> On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof <
>>> eila@orielresearch.org> wrote:
>>>
>>>> Hi Cham,
>>>>
>>>> *all_data = pcollections | beam.Flatten()*
>>>>
>>>> fires an error:
>>>>
>>>> TypeError: 'Read' object is not iterable
>>>>
>>>>
>>>> pcollections is the following list:
>>>>
>>>> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
>>>>  <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
>>>>  <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
>>>>  <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]
>>>>
>>>>
>>>>
>>> Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how
>>> you ended up with a list of Read PTransforms otherwise.
>>>
>>> Also, follow everything with a "p.run.wait_until_finish()" for pipeline
>>> to execute.
>>>
>>> Can you paste the code that you are running ?
>>>
>>>
>>>> Based on the following, i converted the list to tuples (tuple(*pcollections)) with the same error for tuple.*
>>>>
>>>>
>>>> # Flatten takes a tuple of PCollection objects.# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.merged = (
>>>>     (pcoll1, pcoll2, pcoll3)
>>>>     # A list of tuples can be "piped" directly into a Flatten transform.
>>>>     | beam.Flatten())
>>>>
>>>>
>>>> Any advice?
>>>>
>>>> Many thanks,
>>>> Eila
>>>>
>>>>
>>>> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
>>>> eila@orielresearch.org> wrote:
>>>>
>>>>> very helpful!!! i will keep you posted if I have any issue / question
>>>>> Best,
>>>>> Eila
>>>>>
>>>>>
>>>>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>>>>>> eila@orielresearch.org> wrote:
>>>>>>
>>>>>>> Hi Cham,
>>>>>>>
>>>>>>> Please see inline. If possible, code / pseudo code will help a lot.
>>>>>>> Thanks,
>>>>>>> Eila
>>>>>>>
>>>>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>>>>>> chamikara@google.com> wrote:
>>>>>>>
>>>>>>>> Hi Eila,
>>>>>>>>
>>>>>>>> Please find my comments inline.
>>>>>>>>
>>>>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>>>>>> eila@orielresearch.org> wrote:
>>>>>>>>
>>>>>>>>> Hello all,
>>>>>>>>>
>>>>>>>>> It was nice to meet you last week!!!
>>>>>>>>>
>>>>>>>>>
>>>>>>>> It was nice to meet you as well :)
>>>>>>>>
>>>>>>>>
>>>>>>>>> I am writing genomic pCollection that is created from bigQuery to
>>>>>>>>> a folder. Following is the code with output so you can run it with any
>>>>>>>>> small BQ table and let me know what your thoughts are:
>>>>>>>>>
>>>>>>>>> This init is only for debugging. In production I will use the
>>>>>>> pipeline syntax
>>>>>>>
>>>>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>>>>>
>>>>>>>>> rows[1].keys()
>>>>>>>>> # output:  [u'index', u'SNRPCP14']
>>>>>>>>>
>>>>>>>>> # you can change `archs4.results_20180308_ to any other table
>>>>>>>>> name with index column
>>>>>>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.
>>>>>>>>> BigQuerySource(project='orielresearch-188115',
>>>>>>>>> use_standard_sql=False, query=str('SELECT * FROM
>>>>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>>>>>>>>>                                str('gs://archs4/output/'+x["
>>>>>>>>> index"]+'/')))
>>>>>>>>>
>>>>>>>>
>>>>>>>> I don't think above code will work (not portable across runners at
>>>>>>>> least). BigQuerySource (along with Read transform) have to be applied to a
>>>>>>>> Pipeline object. So probably change this to a for loop that creates a set
>>>>>>>> of read transforms and use Flatten to create a single PCollection.
>>>>>>>>
>>>>>>> For debug, I am running on the local datalab runner. For the
>>>>>>> production, I will be running only dataflow runner. I think that I was able
>>>>>>> to query the tables that way, I will double check it. The indexes could go
>>>>>>> to millions - my concern is that I will not be able to leverage on Beam
>>>>>>> distribution capability when I use the the loop option. Any thoughts on
>>>>>>> that?
>>>>>>>
>>>>>>
>>>>>> You mean you'll have millions of queries. That will not be scalable.
>>>>>> My suggestion was to loop on queries. Can you reduce to one or a small
>>>>>> number of queries and perform further processing in Beam ?
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> queries2
>>>>>>>>> # output: a list of pCollection and the path to write the
>>>>>>>>> pCollection data to
>>>>>>>>>
>>>>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>>>>>   'gs://archs4/output/GSM2313641/'),
>>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>>>>>   'gs://archs4/output/GSM2316666/'),
>>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>>>>>   'gs://archs4/output/GSM2312355/'),
>>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>>>>>   'gs://archs4/output/GSM2312372/')]
>>>>>>>>>
>>>>>>>>>
>>>>>>>> What you got here is a PCollection of PTransform objects which is
>>>>>>>> not useful.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>> *# this is my challenge*
>>>>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>>>>>> COLUMN")
>>>>>>>>>
>>>>>>>>>
>>>>>>>> Once you update above code you will get a proper PCollection of
>>>>>>>> elements read from BigQuery. You can transform and write this (to files,
>>>>>>>> BQ, or any other sink) as needed.
>>>>>>>>
>>>>>>>
>>>>>>> it is a list of tupples with PCollection and the path to write to.
>>>>>>> the path is not unique and I might have more than one PCollection written
>>>>>>> to the same destination. How do I pass the path from the tupple list as a
>>>>>>> parameter to the text file name? Could you please add the code that you
>>>>>>> were thinking about?
>>>>>>>
>>>>>>
>>>>>> Python SDK does not support writing to different files based on the
>>>>>> values of data (dynamic writes). So you'll have to either partition data
>>>>>> into separate PCollections  or write all data into the same location.
>>>>>>
>>>>>> Here's *pseudocode* (untested) for reading from few queries,
>>>>>> partitioning into several PCollections, and writing to different
>>>>>> destinations.
>>>>>>
>>>>>> *queries = ['select * from A', 'select * from B',....]*
>>>>>>
>>>>>> *p = Pipeline()*
>>>>>> *pcollections = []*
>>>>>> *for query in queries:*
>>>>>> *  pc = p | beam.io.Read(beam.io
>>>>>> <http://beam.io/>.BigQuerySource(query=query))*
>>>>>> * pcollections.append(pc)*
>>>>>>
>>>>>> *all_data = pcollections | beam.Flatten()*
>>>>>> *partitions = all_data | beam.Partition(my_partition_fn)*
>>>>>> *for i, partition in enumerate(partitions):*
>>>>>> *  partition | beam.io.WriteToText(<unique path for partition i>)*
>>>>>> Hope this helps.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Please see programming guide on how to write to text files (section
>>>>>>>> 5.3 and click Python tab): https://beam.apache.org/
>>>>>>>> documentation/programming-guide/
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cham
>>>>>>>>
>>>>>>>>
>>>>>>>>> Do you have any idea how to sink the data to a text file? I have
>>>>>>>>> tried few other options and was stuck at the write transform
>>>>>>>>>
>>>>>>>>> Any advice is very appreciated.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Eila
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Eila
>>>>>>>>> www.orielresearch.org
>>>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eila
>>>>>>> www.orielresearch.org
>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eila
>>>>> www.orielresearch.org
>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>


-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Re: Help with Dynamic writing

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, Mar 21, 2018 at 1:57 PM OrielResearch Eila Arich-Landkof <
eila@orielresearch.org> wrote:

> yes. You were right, I had to put back the pc.
> I am working on the partition function and try to debug it without running
> a pipeline on dataflow (dataflow execution takes at list 8 minutes for any
> size of data), based on the link:
> https://medium.com/google-cloud/quickly-experiment-with-dataflow-3d5a0da8d8e9
>
> *my code & questions:*
> # Is it possible to use string as partition name? if not, what would be
> the simplest solution?
> # generating another PCollection with the samples (index) and merge with
> the data table? if yes,
> # how would I extract that information when I am writing to text file
> def partition_fn(element, num_partitions):
>   return(element['sample'])
>

I believe partition function has to return an integer that is the partition
number for a given element. Please see following code snippet for an
example usage.
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L1155

Also, that example shows how you can write resulting PCollections to text
files.


>
> # debug input, no pipeline
>
> *d = [{u'sample': u'GSM2313641', u'SNRPCP14': 0},{u'sample':
> u'GSM2316666', u'SNRPCP14': 0},{u'sample': u'GSM2312355', u'SNRPCP14': 0}]
> | beam.Flatten()*
>
> *d*
> # output:
>
> [(u'sample', u'GSM2312355'),
>  (u'SNRPCP14', 0),
>  (u'sample', u'GSM2313641'),
>  (u'SNRPCP14', 0),
>  (u'sample', u'GSM2316666'),
>  (u'SNRPCP14', 0)]
>
>
>
> *d | beam.Partition(partition_fn,3)*
> #output, error:
>
> TypeError: tuple indices must be integers, not str [while running 'Partition(CallableWrapperPartitionFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']
>
>
Please see above.


>
> *#writing to dynamic output - how do i extract the partition name that the
> element is assigned to?*
> *for i, partition in enumerate(partitions):*
> *  # The partition path is using the partition str name*
> *  pathi= str('gs://bucket/output/'+x["sample"]+'/')*
> *  partition | label >> beam.io.WriteToText(pathi)*
>
>
> Please also let me know if there is a better way to debug the code before
> running on dataflow runner.
>

You can test code using DirectRunner before using DataflowRunner. Both
runners should produce the same result.


>
> Many thanks,
> Eila
>
>
>
> On Wed, Mar 21, 2018 at 12:43 PM, Chamikara Jayalath <chamikara@google.com
> > wrote:
>
>> On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof <
>> eila@orielresearch.org> wrote:
>>
>>> Hi Cham,
>>>
>>> *all_data = pcollections | beam.Flatten()*
>>>
>>> fires an error:
>>>
>>> TypeError: 'Read' object is not iterable
>>>
>>>
>>> pcollections is the following list:
>>>
>>> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
>>>  <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
>>>  <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
>>>  <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]
>>>
>>>
>>>
>> Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how
>> you ended up with a list of Read PTransforms otherwise.
>>
>> Also, follow everything with a "p.run.wait_until_finish()" for pipeline
>> to execute.
>>
>> Can you paste the code that you are running ?
>>
>>
>>> Based on the following, i converted the list to tuples (tuple(*pcollections)) with the same error for tuple.*
>>>
>>>
>>> # Flatten takes a tuple of PCollection objects.# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.merged = (
>>>     (pcoll1, pcoll2, pcoll3)
>>>     # A list of tuples can be "piped" directly into a Flatten transform.
>>>     | beam.Flatten())
>>>
>>>
>>> Any advice?
>>>
>>> Many thanks,
>>> Eila
>>>
>>>
>>> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
>>> eila@orielresearch.org> wrote:
>>>
>>>> very helpful!!! i will keep you posted if I have any issue / question
>>>> Best,
>>>> Eila
>>>>
>>>>
>>>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>>>>> eila@orielresearch.org> wrote:
>>>>>
>>>>>> Hi Cham,
>>>>>>
>>>>>> Please see inline. If possible, code / pseudo code will help a lot.
>>>>>> Thanks,
>>>>>> Eila
>>>>>>
>>>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>>
>>>>>>> Hi Eila,
>>>>>>>
>>>>>>> Please find my comments inline.
>>>>>>>
>>>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>>>>> eila@orielresearch.org> wrote:
>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> It was nice to meet you last week!!!
>>>>>>>>
>>>>>>>>
>>>>>>> It was nice to meet you as well :)
>>>>>>>
>>>>>>>
>>>>>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>>>>>> folder. Following is the code with output so you can run it with any small
>>>>>>>> BQ table and let me know what your thoughts are:
>>>>>>>>
>>>>>>>> This init is only for debugging. In production I will use the
>>>>>> pipeline syntax
>>>>>>
>>>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>>>>
>>>>>>>> rows[1].keys()
>>>>>>>> # output:  [u'index', u'SNRPCP14']
>>>>>>>>
>>>>>>>> # you can change `archs4.results_20180308_ to any other table name
>>>>>>>> with index column
>>>>>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115',
>>>>>>>> use_standard_sql=False, query=str('SELECT * FROM
>>>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>>>>>>>>
>>>>>>>>  str('gs://archs4/output/'+x["index"]+'/')))
>>>>>>>>
>>>>>>>
>>>>>>> I don't think above code will work (not portable across runners at
>>>>>>> least). BigQuerySource (along with Read transform) have to be applied to a
>>>>>>> Pipeline object. So probably change this to a for loop that creates a set
>>>>>>> of read transforms and use Flatten to create a single PCollection.
>>>>>>>
>>>>>> For debug, I am running on the local datalab runner. For the
>>>>>> production, I will be running only dataflow runner. I think that I was able
>>>>>> to query the tables that way, I will double check it. The indexes could go
>>>>>> to millions - my concern is that I will not be able to leverage on Beam
>>>>>> distribution capability when I use the the loop option. Any thoughts on
>>>>>> that?
>>>>>>
>>>>>
>>>>> You mean you'll have millions of queries. That will not be scalable.
>>>>> My suggestion was to loop on queries. Can you reduce to one or a small
>>>>> number of queries and perform further processing in Beam ?
>>>>>
>>>>>
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> queries2
>>>>>>>> # output: a list of pCollection and the path to write the
>>>>>>>> pCollection data to
>>>>>>>>
>>>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>>>>   'gs://archs4/output/GSM2313641/'),
>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>>>>   'gs://archs4/output/GSM2316666/'),
>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>>>>   'gs://archs4/output/GSM2312355/'),
>>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>>>>   'gs://archs4/output/GSM2312372/')]
>>>>>>>>
>>>>>>>>
>>>>>>> What you got here is a PCollection of PTransform objects which is
>>>>>>> not useful.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> *# this is my challenge*
>>>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>>>>> COLUMN")
>>>>>>>>
>>>>>>>>
>>>>>>> Once you update above code you will get a proper PCollection of
>>>>>>> elements read from BigQuery. You can transform and write this (to files,
>>>>>>> BQ, or any other sink) as needed.
>>>>>>>
>>>>>>
>>>>>> it is a list of tupples with PCollection and the path to write to.
>>>>>> the path is not unique and I might have more than one PCollection written
>>>>>> to the same destination. How do I pass the path from the tupple list as a
>>>>>> parameter to the text file name? Could you please add the code that you
>>>>>> were thinking about?
>>>>>>
>>>>>
>>>>> Python SDK does not support writing to different files based on the
>>>>> values of data (dynamic writes). So you'll have to either partition data
>>>>> into separate PCollections  or write all data into the same location.
>>>>>
>>>>> Here's *pseudocode* (untested) for reading from few queries,
>>>>> partitioning into several PCollections, and writing to different
>>>>> destinations.
>>>>>
>>>>> *queries = ['select * from A', 'select * from B',....]*
>>>>>
>>>>> *p = Pipeline()*
>>>>> *pcollections = []*
>>>>> *for query in queries:*
>>>>> *  pc = p | beam.io.Read(beam.io
>>>>> <http://beam.io/>.BigQuerySource(query=query))*
>>>>> * pcollections.append(pc)*
>>>>>
>>>>> *all_data = pcollections | beam.Flatten()*
>>>>> *partitions = all_data | beam.Partition(my_partition_fn)*
>>>>> *for i, partition in enumerate(partitions):*
>>>>> *  partition | beam.io.WriteToText(<unique path for partition i>)*
>>>>> Hope this helps.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>
>>>>>> Please see programming guide on how to write to text files (section
>>>>>>> 5.3 and click Python tab):
>>>>>>> https://beam.apache.org/documentation/programming-guide/
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>>
>>>>>>>> Do you have any idea how to sink the data to a text file? I have
>>>>>>>> tried few other options and was stuck at the write transform
>>>>>>>>
>>>>>>>> Any advice is very appreciated.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Eila
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Eila
>>>>>>>> www.orielresearch.org
>>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eila
>>>>>> www.orielresearch.org
>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>>>
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>
>>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>

Re: Help with Dynamic writing

Posted by OrielResearch Eila Arich-Landkof <ei...@orielresearch.org>.
yes. You were right, I had to put back the pc.
I am working on the partition function and try to debug it without running
a pipeline on dataflow (dataflow execution takes at list 8 minutes for any
size of data), based on the link:
https://medium.com/google-cloud/quickly-experiment-with-dataflow-3d5a0da8d8e9

*my code & questions:*
# Is it possible to use string as partition name? if not, what would be the
simplest solution?
# generating another PCollection with the samples (index) and merge with
the data table? if yes,
# how would I extract that information when I am writing to text file
def partition_fn(element, num_partitions):
  return(element['sample'])

# debug input, no pipeline

*d = [{u'sample': u'GSM2313641', u'SNRPCP14': 0},{u'sample': u'GSM2316666',
u'SNRPCP14': 0},{u'sample': u'GSM2312355', u'SNRPCP14': 0}] |
beam.Flatten()*

*d*
# output:

[(u'sample', u'GSM2312355'),
 (u'SNRPCP14', 0),
 (u'sample', u'GSM2313641'),
 (u'SNRPCP14', 0),
 (u'sample', u'GSM2316666'),
 (u'SNRPCP14', 0)]



*d | beam.Partition(partition_fn,3)*
#output, error:

TypeError: tuple indices must be integers, not str [while running
'Partition(CallableWrapperPartitionFn)/ParDo(ApplyPartitionFnFn)/ParDo(ApplyPartitionFnFn)']


*#writing to dynamic output - how do i extract the partition name that the
element is assigned to?*
*for i, partition in enumerate(partitions):*
*  # The partition path is using the partition str name*
*  pathi= str('gs://bucket/output/'+x["sample"]+'/')*
*  partition | label >> beam.io.WriteToText(pathi)*


Please also let me know if there is a better way to debug the code before
running on dataflow runner.

Many thanks,
Eila



On Wed, Mar 21, 2018 at 12:43 PM, Chamikara Jayalath <ch...@google.com>
wrote:

> On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof <
> eila@orielresearch.org> wrote:
>
>> Hi Cham,
>>
>> *all_data = pcollections | beam.Flatten()*
>>
>> fires an error:
>>
>> TypeError: 'Read' object is not iterable
>>
>>
>> pcollections is the following list:
>>
>> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
>>  <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
>>  <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
>>  <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]
>>
>>
>>
> Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how
> you ended up with a list of Read PTransforms otherwise.
>
> Also, follow everything with a "p.run.wait_until_finish()" for pipeline to
> execute.
>
> Can you paste the code that you are running ?
>
>
>> Based on the following, i converted the list to tuples (tuple(*pcollections)) with the same error for tuple.*
>>
>>
>> # Flatten takes a tuple of PCollection objects.# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.merged = (
>>     (pcoll1, pcoll2, pcoll3)
>>     # A list of tuples can be "piped" directly into a Flatten transform.
>>     | beam.Flatten())
>>
>>
>> Any advice?
>>
>> Many thanks,
>> Eila
>>
>>
>> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
>> eila@orielresearch.org> wrote:
>>
>>> very helpful!!! i will keep you posted if I have any issue / question
>>> Best,
>>> Eila
>>>
>>>
>>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <
>>> chamikara@google.com> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>>>> eila@orielresearch.org> wrote:
>>>>
>>>>> Hi Cham,
>>>>>
>>>>> Please see inline. If possible, code / pseudo code will help a lot.
>>>>> Thanks,
>>>>> Eila
>>>>>
>>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>> Hi Eila,
>>>>>>
>>>>>> Please find my comments inline.
>>>>>>
>>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>>>> eila@orielresearch.org> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> It was nice to meet you last week!!!
>>>>>>>
>>>>>>>
>>>>>> It was nice to meet you as well :)
>>>>>>
>>>>>>
>>>>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>>>>> folder. Following is the code with output so you can run it with any small
>>>>>>> BQ table and let me know what your thoughts are:
>>>>>>>
>>>>>>> This init is only for debugging. In production I will use the
>>>>> pipeline syntax
>>>>>
>>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>>>
>>>>>>> rows[1].keys()
>>>>>>> # output:  [u'index', u'SNRPCP14']
>>>>>>>
>>>>>>> # you can change `archs4.results_20180308_ to any other table name
>>>>>>> with index column
>>>>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.
>>>>>>> BigQuerySource(project='orielresearch-188115',
>>>>>>> use_standard_sql=False, query=str('SELECT * FROM
>>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>>>>>>>                                str('gs://archs4/output/'+x["
>>>>>>> index"]+'/')))
>>>>>>>
>>>>>>
>>>>>> I don't think above code will work (not portable across runners at
>>>>>> least). BigQuerySource (along with Read transform) have to be applied to a
>>>>>> Pipeline object. So probably change this to a for loop that creates a set
>>>>>> of read transforms and use Flatten to create a single PCollection.
>>>>>>
>>>>> For debug, I am running on the local datalab runner. For the
>>>>> production, I will be running only dataflow runner. I think that I was able
>>>>> to query the tables that way, I will double check it. The indexes could go
>>>>> to millions - my concern is that I will not be able to leverage on Beam
>>>>> distribution capability when I use the the loop option. Any thoughts on
>>>>> that?
>>>>>
>>>>
>>>> You mean you'll have millions of queries. That will not be scalable. My
>>>> suggestion was to loop on queries. Can you reduce to one or a small number
>>>> of queries and perform further processing in Beam ?
>>>>
>>>>
>>>>>
>>>>>>
>>>>>>>
>>>>>>> queries2
>>>>>>> # output: a list of pCollection and the path to write the
>>>>>>> pCollection data to
>>>>>>>
>>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>>>   'gs://archs4/output/GSM2313641/'),
>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>>>   'gs://archs4/output/GSM2316666/'),
>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>>>   'gs://archs4/output/GSM2312355/'),
>>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>>>   'gs://archs4/output/GSM2312372/')]
>>>>>>>
>>>>>>>
>>>>>> What you got here is a PCollection of PTransform objects which is not
>>>>>> useful.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> *# this is my challenge*
>>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>>>> COLUMN")
>>>>>>>
>>>>>>>
>>>>>> Once you update above code you will get a proper PCollection of
>>>>>> elements read from BigQuery. You can transform and write this (to files,
>>>>>> BQ, or any other sink) as needed.
>>>>>>
>>>>>
>>>>> it is a list of tupples with PCollection and the path to write to. the
>>>>> path is not unique and I might have more than one PCollection written to
>>>>> the same destination. How do I pass the path from the tupple list as a
>>>>> parameter to the text file name? Could you please add the code that you
>>>>> were thinking about?
>>>>>
>>>>
>>>> Python SDK does not support writing to different files based on the
>>>> values of data (dynamic writes). So you'll have to either partition data
>>>> into separate PCollections  or write all data into the same location.
>>>>
>>>> Here's *pseudocode* (untested) for reading from few queries,
>>>> partitioning into several PCollections, and writing to different
>>>> destinations.
>>>>
>>>> *queries = ['select * from A', 'select * from B',....]*
>>>>
>>>> *p = Pipeline()*
>>>> *pcollections = []*
>>>> *for query in queries:*
>>>> *  pc = p | beam.io.Read(beam.io
>>>> <http://beam.io/>.BigQuerySource(query=query))*
>>>> * pcollections.append(pc)*
>>>>
>>>> *all_data = pcollections | beam.Flatten()*
>>>> *partitions = all_data | beam.Partition(my_partition_fn)*
>>>> *for i, partition in enumerate(partitions):*
>>>> *  partition | beam.io.WriteToText(<unique path for partition i>)*
>>>> Hope this helps.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>
>>>>> Please see programming guide on how to write to text files (section
>>>>>> 5.3 and click Python tab): https://beam.apache.org/
>>>>>> documentation/programming-guide/
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>> Do you have any idea how to sink the data to a text file? I have
>>>>>>> tried few other options and was stuck at the write transform
>>>>>>>
>>>>>>> Any advice is very appreciated.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Eila
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eila
>>>>>>> www.orielresearch.org
>>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eila
>>>>> www.orielresearch.org
>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>
>>>>
>>>
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>
>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>


-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Re: Help with Dynamic writing

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, Mar 21, 2018 at 7:53 AM OrielResearch Eila Arich-Landkof <
eila@orielresearch.org> wrote:

> Hi Cham,
>
> *all_data = pcollections | beam.Flatten()*
>
> fires an error:
>
> TypeError: 'Read' object is not iterable
>
>
> pcollections is the following list:
>
> [<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
>  <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
>  <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
>  <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]
>
>
>
Did you omit "p | " in "p | beam.io.Read" by any chance ? Not sure how you
ended up with a list of Read PTransforms otherwise.

Also, follow everything with a "p.run.wait_until_finish()" for pipeline to
execute.

Can you paste the code that you are running ?


> Based on the following, i converted the list to tuples (tuple(*pcollections)) with the same error for tuple.*
>
>
> # Flatten takes a tuple of PCollection objects.# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.merged = (
>     (pcoll1, pcoll2, pcoll3)
>     # A list of tuples can be "piped" directly into a Flatten transform.
>     | beam.Flatten())
>
>
> Any advice?
>
> Many thanks,
> Eila
>
>
> On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
> eila@orielresearch.org> wrote:
>
>> very helpful!!! i will keep you posted if I have any issue / question
>> Best,
>> Eila
>>
>>
>> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <chamikara@google.com
>> > wrote:
>>
>>>
>>>
>>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>>> eila@orielresearch.org> wrote:
>>>
>>>> Hi Cham,
>>>>
>>>> Please see inline. If possible, code / pseudo code will help a lot.
>>>> Thanks,
>>>> Eila
>>>>
>>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>>> chamikara@google.com> wrote:
>>>>
>>>>> Hi Eila,
>>>>>
>>>>> Please find my comments inline.
>>>>>
>>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>>> eila@orielresearch.org> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> It was nice to meet you last week!!!
>>>>>>
>>>>>>
>>>>> It was nice to meet you as well :)
>>>>>
>>>>>
>>>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>>>> folder. Following is the code with output so you can run it with any small
>>>>>> BQ table and let me know what your thoughts are:
>>>>>>
>>>>>> This init is only for debugging. In production I will use the
>>>> pipeline syntax
>>>>
>>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>>
>>>>>> rows[1].keys()
>>>>>> # output:  [u'index', u'SNRPCP14']
>>>>>>
>>>>>> # you can change `archs4.results_20180308_ to any other table name
>>>>>> with index column
>>>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115',
>>>>>> use_standard_sql=False, query=str('SELECT * FROM
>>>>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>>>>>>
>>>>>>  str('gs://archs4/output/'+x["index"]+'/')))
>>>>>>
>>>>>
>>>>> I don't think above code will work (not portable across runners at
>>>>> least). BigQuerySource (along with Read transform) have to be applied to a
>>>>> Pipeline object. So probably change this to a for loop that creates a set
>>>>> of read transforms and use Flatten to create a single PCollection.
>>>>>
>>>> For debug, I am running on the local datalab runner. For the
>>>> production, I will be running only dataflow runner. I think that I was able
>>>> to query the tables that way, I will double check it. The indexes could go
>>>> to millions - my concern is that I will not be able to leverage on Beam
>>>> distribution capability when I use the the loop option. Any thoughts on
>>>> that?
>>>>
>>>
>>> You mean you'll have millions of queries. That will not be scalable. My
>>> suggestion was to loop on queries. Can you reduce to one or a small number
>>> of queries and perform further processing in Beam ?
>>>
>>>
>>>>
>>>>>
>>>>>>
>>>>>> queries2
>>>>>> # output: a list of pCollection and the path to write the pCollection
>>>>>> data to
>>>>>>
>>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>>   'gs://archs4/output/GSM2313641/'),
>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>>   'gs://archs4/output/GSM2316666/'),
>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>>   'gs://archs4/output/GSM2312355/'),
>>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>>   'gs://archs4/output/GSM2312372/')]
>>>>>>
>>>>>>
>>>>> What you got here is a PCollection of PTransform objects which is not
>>>>> useful.
>>>>>
>>>>>
>>>>>>
>>>>>> *# this is my challenge*
>>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>>> COLUMN")
>>>>>>
>>>>>>
>>>>> Once you update above code you will get a proper PCollection of
>>>>> elements read from BigQuery. You can transform and write this (to files,
>>>>> BQ, or any other sink) as needed.
>>>>>
>>>>
>>>> it is a list of tupples with PCollection and the path to write to. the
>>>> path is not unique and I might have more than one PCollection written to
>>>> the same destination. How do I pass the path from the tupple list as a
>>>> parameter to the text file name? Could you please add the code that you
>>>> were thinking about?
>>>>
>>>
>>> Python SDK does not support writing to different files based on the
>>> values of data (dynamic writes). So you'll have to either partition data
>>> into separate PCollections  or write all data into the same location.
>>>
>>> Here's *pseudocode* (untested) for reading from few queries,
>>> partitioning into several PCollections, and writing to different
>>> destinations.
>>>
>>> *queries = ['select * from A', 'select * from B',....]*
>>>
>>> *p = Pipeline()*
>>> *pcollections = []*
>>> *for query in queries:*
>>> *  pc = p | beam.io.Read(beam.io
>>> <http://beam.io/>.BigQuerySource(query=query))*
>>> * pcollections.append(pc)*
>>>
>>> *all_data = pcollections | beam.Flatten()*
>>> *partitions = all_data | beam.Partition(my_partition_fn)*
>>> *for i, partition in enumerate(partitions):*
>>> *  partition | beam.io.WriteToText(<unique path for partition i>)*
>>> Hope this helps.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>>> Please see programming guide on how to write to text files (section 5.3
>>>>> and click Python tab):
>>>>> https://beam.apache.org/documentation/programming-guide/
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>> Do you have any idea how to sink the data to a text file? I have
>>>>>> tried few other options and was stuck at the write transform
>>>>>>
>>>>>> Any advice is very appreciated.
>>>>>>
>>>>>> Thanks,
>>>>>> Eila
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eila
>>>>>> www.orielresearch.org
>>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>

Re: Help with Dynamic writing

Posted by OrielResearch Eila Arich-Landkof <ei...@orielresearch.org>.
Hi Cham,

*all_data = pcollections | beam.Flatten()*

fires an error:

TypeError: 'Read' object is not iterable


pcollections is the following list:

[<Read(PTransform) label=[Read] at 0x7f9fa93d7410>,
 <Read(PTransform) label=[Read] at 0x7f9fa988a350>,
 <Read(PTransform) label=[Read] at 0x7f9fa93d72d0>,
 <Read(PTransform) label=[Read] at 0x7f9fa93d70d0>]


Based on the following, i converted the list to tuples
(tuple(*pcollections)) with the same error for tuple.*


# Flatten takes a tuple of PCollection objects.# Returns a single
PCollection that contains all of the elements in the PCollection
objects in that tuple.merged = (
    (pcoll1, pcoll2, pcoll3)
    # A list of tuples can be "piped" directly into a Flatten transform.
    | beam.Flatten())


Any advice?

Many thanks,
Eila


On Wed, Mar 21, 2018 at 9:16 AM, OrielResearch Eila Arich-Landkof <
eila@orielresearch.org> wrote:

> very helpful!!! i will keep you posted if I have any issue / question
> Best,
> Eila
>
>
> On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
>> eila@orielresearch.org> wrote:
>>
>>> Hi Cham,
>>>
>>> Please see inline. If possible, code / pseudo code will help a lot.
>>> Thanks,
>>> Eila
>>>
>>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <
>>> chamikara@google.com> wrote:
>>>
>>>> Hi Eila,
>>>>
>>>> Please find my comments inline.
>>>>
>>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>>> eila@orielresearch.org> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> It was nice to meet you last week!!!
>>>>>
>>>>>
>>>> It was nice to meet you as well :)
>>>>
>>>>
>>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>>> folder. Following is the code with output so you can run it with any small
>>>>> BQ table and let me know what your thoughts are:
>>>>>
>>>>> This init is only for debugging. In production I will use the pipeline
>>> syntax
>>>
>>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>>
>>>>> rows[1].keys()
>>>>> # output:  [u'index', u'SNRPCP14']
>>>>>
>>>>> # you can change `archs4.results_20180308_ to any other table name
>>>>> with index column
>>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.BigQuery
>>>>> Source(project='orielresearch-188115', use_standard_sql=False,
>>>>> query=str('SELECT * FROM `archs4.results_20180308_*` where index=\'%s\'' %
>>>>> (x["index"])))),
>>>>>                                str('gs://archs4/output/'+x["
>>>>> index"]+'/')))
>>>>>
>>>>
>>>> I don't think above code will work (not portable across runners at
>>>> least). BigQuerySource (along with Read transform) have to be applied to a
>>>> Pipeline object. So probably change this to a for loop that creates a set
>>>> of read transforms and use Flatten to create a single PCollection.
>>>>
>>> For debug, I am running on the local datalab runner. For the production,
>>> I will be running only dataflow runner. I think that I was able to query
>>> the tables that way, I will double check it. The indexes could go to
>>> millions - my concern is that I will not be able to leverage on Beam
>>> distribution capability when I use the the loop option. Any thoughts on
>>> that?
>>>
>>
>> You mean you'll have millions of queries. That will not be scalable. My
>> suggestion was to loop on queries. Can you reduce to one or a small number
>> of queries and perform further processing in Beam ?
>>
>>
>>>
>>>>
>>>>>
>>>>> queries2
>>>>> # output: a list of pCollection and the path to write the pCollection
>>>>> data to
>>>>>
>>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>>   'gs://archs4/output/GSM2313641/'),
>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>>   'gs://archs4/output/GSM2316666/'),
>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>>   'gs://archs4/output/GSM2312355/'),
>>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>>   'gs://archs4/output/GSM2312372/')]
>>>>>
>>>>>
>>>> What you got here is a PCollection of PTransform objects which is not
>>>> useful.
>>>>
>>>>
>>>>>
>>>>> *# this is my challenge*
>>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>>> COLUMN")
>>>>>
>>>>>
>>>> Once you update above code you will get a proper PCollection of
>>>> elements read from BigQuery. You can transform and write this (to files,
>>>> BQ, or any other sink) as needed.
>>>>
>>>
>>> it is a list of tupples with PCollection and the path to write to. the
>>> path is not unique and I might have more than one PCollection written to
>>> the same destination. How do I pass the path from the tupple list as a
>>> parameter to the text file name? Could you please add the code that you
>>> were thinking about?
>>>
>>
>> Python SDK does not support writing to different files based on the
>> values of data (dynamic writes). So you'll have to either partition data
>> into separate PCollections  or write all data into the same location.
>>
>> Here's *pseudocode* (untested) for reading from few queries,
>> partitioning into several PCollections, and writing to different
>> destinations.
>>
>> *queries = ['select * from A', 'select * from B',....]*
>>
>> *p = Pipeline()*
>> *pcollections = []*
>> *for query in queries:*
>> *  pc = p | beam.io.Read(beam.io
>> <http://beam.io/>.BigQuerySource(query=query))*
>> * pcollections.append(pc)*
>>
>> *all_data = pcollections | beam.Flatten()*
>> *partitions = all_data | beam.Partition(my_partition_fn)*
>> *for i, partition in enumerate(partitions):*
>> *  partition | beam.io.WriteToText(<unique path for partition i>)*
>> Hope this helps.
>>
>> Thanks,
>> Cham
>>
>>
>>
>>> Please see programming guide on how to write to text files (section 5.3
>>>> and click Python tab): https://beam.apache.org/
>>>> documentation/programming-guide/
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>> Do you have any idea how to sink the data to a text file? I have tried
>>>>> few other options and was stuck at the write transform
>>>>>
>>>>> Any advice is very appreciated.
>>>>>
>>>>> Thanks,
>>>>> Eila
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eila
>>>>> www.orielresearch.org
>>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>>
>>>>
>>>
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>
>>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>



-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Re: Help with Dynamic writing

Posted by OrielResearch Eila Arich-Landkof <ei...@orielresearch.org>.
very helpful!!! i will keep you posted if I have any issue / question
Best,
Eila


On Tue, Mar 20, 2018 at 5:08 PM, Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
> eila@orielresearch.org> wrote:
>
>> Hi Cham,
>>
>> Please see inline. If possible, code / pseudo code will help a lot.
>> Thanks,
>> Eila
>>
>> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <chamikara@google.com
>> > wrote:
>>
>>> Hi Eila,
>>>
>>> Please find my comments inline.
>>>
>>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>>> eila@orielresearch.org> wrote:
>>>
>>>> Hello all,
>>>>
>>>> It was nice to meet you last week!!!
>>>>
>>>>
>>> It was nice to meet you as well :)
>>>
>>>
>>>> I am writing genomic pCollection that is created from bigQuery to a
>>>> folder. Following is the code with output so you can run it with any small
>>>> BQ table and let me know what your thoughts are:
>>>>
>>>> This init is only for debugging. In production I will use the pipeline
>> syntax
>>
>>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>>
>>>> rows[1].keys()
>>>> # output:  [u'index', u'SNRPCP14']
>>>>
>>>> # you can change `archs4.results_20180308_ to any other table name
>>>> with index column
>>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.
>>>> BigQuerySource(project='orielresearch-188115', use_standard_sql=False,
>>>> query=str('SELECT * FROM `archs4.results_20180308_*` where index=\'%s\'' %
>>>> (x["index"])))),
>>>>                                str('gs://archs4/output/'+x["
>>>> index"]+'/')))
>>>>
>>>
>>> I don't think above code will work (not portable across runners at
>>> least). BigQuerySource (along with Read transform) have to be applied to a
>>> Pipeline object. So probably change this to a for loop that creates a set
>>> of read transforms and use Flatten to create a single PCollection.
>>>
>> For debug, I am running on the local datalab runner. For the production,
>> I will be running only dataflow runner. I think that I was able to query
>> the tables that way, I will double check it. The indexes could go to
>> millions - my concern is that I will not be able to leverage on Beam
>> distribution capability when I use the the loop option. Any thoughts on
>> that?
>>
>
> You mean you'll have millions of queries. That will not be scalable. My
> suggestion was to loop on queries. Can you reduce to one or a small number
> of queries and perform further processing in Beam ?
>
>
>>
>>>
>>>>
>>>> queries2
>>>> # output: a list of pCollection and the path to write the pCollection
>>>> data to
>>>>
>>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>>   'gs://archs4/output/GSM2313641/'),
>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>>   'gs://archs4/output/GSM2316666/'),
>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>>   'gs://archs4/output/GSM2312355/'),
>>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>>   'gs://archs4/output/GSM2312372/')]
>>>>
>>>>
>>> What you got here is a PCollection of PTransform objects which is not
>>> useful.
>>>
>>>
>>>>
>>>> *# this is my challenge*
>>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>>> COLUMN")
>>>>
>>>>
>>> Once you update above code you will get a proper PCollection of elements
>>> read from BigQuery. You can transform and write this (to files, BQ, or any
>>> other sink) as needed.
>>>
>>
>> it is a list of tupples with PCollection and the path to write to. the
>> path is not unique and I might have more than one PCollection written to
>> the same destination. How do I pass the path from the tupple list as a
>> parameter to the text file name? Could you please add the code that you
>> were thinking about?
>>
>
> Python SDK does not support writing to different files based on the values
> of data (dynamic writes). So you'll have to either partition data into
> separate PCollections  or write all data into the same location.
>
> Here's *pseudocode* (untested) for reading from few queries, partitioning
> into several PCollections, and writing to different destinations.
>
> *queries = ['select * from A', 'select * from B',....]*
>
> *p = Pipeline()*
> *pcollections = []*
> *for query in queries:*
> *  pc = p | beam.io.Read(beam.io
> <http://beam.io/>.BigQuerySource(query=query))*
> * pcollections.append(pc)*
>
> *all_data = pcollections | beam.Flatten()*
> *partitions = all_data | beam.Partition(my_partition_fn)*
> *for i, partition in enumerate(partitions):*
> *  partition | beam.io.WriteToText(<unique path for partition i>)*
> Hope this helps.
>
> Thanks,
> Cham
>
>
>
>> Please see programming guide on how to write to text files (section 5.3
>>> and click Python tab): https://beam.apache.org/
>>> documentation/programming-guide/
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>> Do you have any idea how to sink the data to a text file? I have tried
>>>> few other options and was stuck at the write transform
>>>>
>>>> Any advice is very appreciated.
>>>>
>>>> Thanks,
>>>> Eila
>>>>
>>>>
>>>>
>>>> --
>>>> Eila
>>>> www.orielresearch.org
>>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>>
>>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>


-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Re: Help with Dynamic writing

Posted by Chamikara Jayalath <ch...@google.com>.
On Tue, Mar 20, 2018 at 12:54 PM OrielResearch Eila Arich-Landkof <
eila@orielresearch.org> wrote:

> Hi Cham,
>
> Please see inline. If possible, code / pseudo code will help a lot.
> Thanks,
> Eila
>
> On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> Hi Eila,
>>
>> Please find my comments inline.
>>
>> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
>> eila@orielresearch.org> wrote:
>>
>>> Hello all,
>>>
>>> It was nice to meet you last week!!!
>>>
>>>
>> It was nice to meet you as well :)
>>
>>
>>> I am writing genomic pCollection that is created from bigQuery to a
>>> folder. Following is the code with output so you can run it with any small
>>> BQ table and let me know what your thoughts are:
>>>
>>> This init is only for debugging. In production I will use the pipeline
> syntax
>
>> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>>
>>> rows[1].keys()
>>> # output:  [u'index', u'SNRPCP14']
>>>
>>> # you can change `archs4.results_20180308_ to any other table name with
>>> index column
>>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115',
>>> use_standard_sql=False, query=str('SELECT * FROM
>>> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>>>
>>>  str('gs://archs4/output/'+x["index"]+'/')))
>>>
>>
>> I don't think above code will work (not portable across runners at
>> least). BigQuerySource (along with Read transform) have to be applied to a
>> Pipeline object. So probably change this to a for loop that creates a set
>> of read transforms and use Flatten to create a single PCollection.
>>
> For debug, I am running on the local datalab runner. For the production, I
> will be running only dataflow runner. I think that I was able to query the
> tables that way, I will double check it. The indexes could go to millions -
> my concern is that I will not be able to leverage on Beam distribution
> capability when I use the the loop option. Any thoughts on that?
>

You mean you'll have millions of queries. That will not be scalable. My
suggestion was to loop on queries. Can you reduce to one or a small number
of queries and perform further processing in Beam ?


>
>>
>>>
>>> queries2
>>> # output: a list of pCollection and the path to write the pCollection
>>> data to
>>>
>>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>>   'gs://archs4/output/GSM2313641/'),
>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>>   'gs://archs4/output/GSM2316666/'),
>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>>   'gs://archs4/output/GSM2312355/'),
>>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>>   'gs://archs4/output/GSM2312372/')]
>>>
>>>
>> What you got here is a PCollection of PTransform objects which is not
>> useful.
>>
>>
>>>
>>> *# this is my challenge*
>>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>>> COLUMN")
>>>
>>>
>> Once you update above code you will get a proper PCollection of elements
>> read from BigQuery. You can transform and write this (to files, BQ, or any
>> other sink) as needed.
>>
>
> it is a list of tupples with PCollection and the path to write to. the
> path is not unique and I might have more than one PCollection written to
> the same destination. How do I pass the path from the tupple list as a
> parameter to the text file name? Could you please add the code that you
> were thinking about?
>

Python SDK does not support writing to different files based on the values
of data (dynamic writes). So you'll have to either partition data into
separate PCollections  or write all data into the same location.

Here's *pseudocode* (untested) for reading from few queries, partitioning
into several PCollections, and writing to different destinations.

*queries = ['select * from A', 'select * from B',....]*

*p = Pipeline()*
*pcollections = []*
*for query in queries:*
*  pc = p | beam.io.Read(beam.io
<http://beam.io/>.BigQuerySource(query=query))*
* pcollections.append(pc)*

*all_data = pcollections | beam.Flatten()*
*partitions = all_data | beam.Partition(my_partition_fn)*
*for i, partition in enumerate(partitions):*
*  partition | beam.io.WriteToText(<unique path for partition i>)*
Hope this helps.

Thanks,
Cham



> Please see programming guide on how to write to text files (section 5.3
>> and click Python tab):
>> https://beam.apache.org/documentation/programming-guide/
>>
>> Thanks,
>> Cham
>>
>>
>>> Do you have any idea how to sink the data to a text file? I have tried
>>> few other options and was stuck at the write transform
>>>
>>> Any advice is very appreciated.
>>>
>>> Thanks,
>>> Eila
>>>
>>>
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>
>>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>

Re: Help with Dynamic writing

Posted by OrielResearch Eila Arich-Landkof <ei...@orielresearch.org>.
Hi Cham,

Please see inline. If possible, code / pseudo code will help a lot.
Thanks,
Eila

On Tue, Mar 20, 2018 at 1:15 PM, Chamikara Jayalath <ch...@google.com>
wrote:

> Hi Eila,
>
> Please find my comments inline.
>
> On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
> eila@orielresearch.org> wrote:
>
>> Hello all,
>>
>> It was nice to meet you last week!!!
>>
>>
> It was nice to meet you as well :)
>
>
>> I am writing genomic pCollection that is created from bigQuery to a
>> folder. Following is the code with output so you can run it with any small
>> BQ table and let me know what your thoughts are:
>>
>> This init is only for debugging. In production I will use the pipeline
syntax

> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
>> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
>> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>>
>> rows[1].keys()
>> # output:  [u'index', u'SNRPCP14']
>>
>> # you can change `archs4.results_20180308_ to any other table name with
>> index column
>> queries2 = rows | beam.Map(lambda x: (beam.io.Read(beam.io.
>> BigQuerySource(project='orielresearch-188115', use_standard_sql=False,
>> query=str('SELECT * FROM `archs4.results_20180308_*` where index=\'%s\'' %
>> (x["index"])))),
>>                                str('gs://archs4/output/'+x["
>> index"]+'/')))
>>
>
> I don't think above code will work (not portable across runners at least).
> BigQuerySource (along with Read transform) have to be applied to a Pipeline
> object. So probably change this to a for loop that creates a set of read
> transforms and use Flatten to create a single PCollection.
>
For debug, I am running on the local datalab runner. For the production, I
will be running only dataflow runner. I think that I was able to query the
tables that way, I will double check it. The indexes could go to millions -
my concern is that I will not be able to leverage on Beam distribution
capability when I use the the loop option. Any thoughts on that?

>
>
>>
>> queries2
>> # output: a list of pCollection and the path to write the pCollection
>> data to
>>
>> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>>   'gs://archs4/output/GSM2313641/'),
>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>>   'gs://archs4/output/GSM2316666/'),
>>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>>   'gs://archs4/output/GSM2312355/'),
>>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>>   'gs://archs4/output/GSM2312372/')]
>>
>>
> What you got here is a PCollection of PTransform objects which is not
> useful.
>
>
>>
>> *# this is my challenge*
>> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND
>> COLUMN")
>>
>>
> Once you update above code you will get a proper PCollection of elements
> read from BigQuery. You can transform and write this (to files, BQ, or any
> other sink) as needed.
>

it is a list of tupples with PCollection and the path to write to. the path
is not unique and I might have more than one PCollection written to the
same destination. How do I pass the path from the tupple list as a
parameter to the text file name? Could you please add the code that you
were thinking about?

> Please see programming guide on how to write to text files (section 5.3
> and click Python tab): https://beam.apache.org/documentation/programming-
> guide/
>
> Thanks,
> Cham
>
>
>> Do you have any idea how to sink the data to a text file? I have tried
>> few other options and was stuck at the write transform
>>
>> Any advice is very appreciated.
>>
>> Thanks,
>> Eila
>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>


-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Re: Help with Dynamic writing

Posted by Chamikara Jayalath <ch...@google.com>.
Hi Eila,

Please find my comments inline.

On Tue, Mar 20, 2018 at 8:02 AM OrielResearch Eila Arich-Landkof <
eila@orielresearch.org> wrote:

> Hello all,
>
> It was nice to meet you last week!!!
>
>
It was nice to meet you as well :)


> I am writing genomic pCollection that is created from bigQuery to a
> folder. Following is the code with output so you can run it with any small
> BQ table and let me know what your thoughts are:
>
> rows = [{u'index': u'GSM2313641', u'SNRPCP14': 0},{u'index':
> u'GSM2316666', u'SNRPCP14': 0},{u'index': u'GSM2312355', u'SNRPCP14':
> 0},{u'index': u'GSM2312372', u'SNRPCP14': 0}]
>
> rows[1].keys()
> # output:  [u'index', u'SNRPCP14']
>
> # you can change `archs4.results_20180308_ to any other table name with
> index column
> queries2 = rows | beam.Map(lambda x:
> (beam.io.Read(beam.io.BigQuerySource(project='orielresearch-188115',
> use_standard_sql=False, query=str('SELECT * FROM
> `archs4.results_20180308_*` where index=\'%s\'' % (x["index"])))),
>                                str('gs://archs4/output/'+x["index"]+'/')))
>

I don't think above code will work (not portable across runners at least).
BigQuerySource (along with Read transform) have to be applied to a Pipeline
object. So probably change this to a for loop that creates a set of read
transforms and use Flatten to create a single PCollection.


>
> queries2
> # output: a list of pCollection and the path to write the pCollection data
> to
>
> [(<Read(PTransform) label=[Read] at 0x7fa6990fb7d0>,
>   'gs://archs4/output/GSM2313641/'),
>  (<Read(PTransform) label=[Read] at 0x7fa6990fb950>,
>   'gs://archs4/output/GSM2316666/'),
>  (<Read(PTransform) label=[Read] at 0x7fa6990fb9d0>,
>   'gs://archs4/output/GSM2312355/'),
>  (<Read(PTransform) label=[Read] at 0x7fa6990fbb50>,
>   'gs://archs4/output/GSM2312372/')]
>
>
What you got here is a PCollection of PTransform objects which is not
useful.


>
> *# this is my challenge*
> queries2 | 'write to relevant path' >> beam.io.WriteToText("SECOND COLUMN")
>
>
Once you update above code you will get a proper PCollection of elements
read from BigQuery. You can transform and write this (to files, BQ, or any
other sink) as needed.
Please see programming guide on how to write to text files (section 5.3 and
click Python tab): https://beam.apache.org/documentation/programming-guide/

Thanks,
Cham


> Do you have any idea how to sink the data to a text file? I have tried few
> other options and was stuck at the write transform
>
> Any advice is very appreciated.
>
> Thanks,
> Eila
>
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>