You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Sayak Paul <sp...@gmail.com> on 2021/09/26 12:41:18 UTC

Using a pipeline for collecting records from arXiv

Hi folks,

I am currently working on a pipeline with which I want to gather a bunch of
paper titles, abstracts, and their term categories from arXiv
<https://arxiv.org/>. I am using a combination of CombineGlobally and a
custom CombineFn to maintain three different sets to accumulate these
records.

I might have written the accumulator in the wrong manner but I am not sure
where it's going wrong i.e. the pipeline is able to collect the entries
using the arxiv API but not able to accumulate the results.

Here's my notebook
<https://gist.github.com/sayakpaul/5997eddc26f87cb3a3f53032b46d0536> for
reproducing the issue.

Sayak Paul | sayak.dev

Re: Using a pipeline for collecting records from arXiv

Posted by Sayak Paul <sp...@gmail.com>.
Thank you, Robert, for providing such nice insights. It's now clear to me
what I was doing wrong. Your code snippets were spot on and I ended up
using them.

As I had mentioned earlier, if we end up using it to collect the bigger
dataset, we will be sure to give you a shoutout from the code and
elsewhere.

Sayak Paul | sayak.dev



On Wed, Sep 29, 2021 at 8:45 PM Robert Bradshaw <ro...@google.com> wrote:

> So your records look something like
>
> ([['cs.CV', 'cs.AI'], ['cs.LG', 'cs.CV', 'cs.AI']],
>  ['title1', 'title2'],
>  ['abstract 1', 'abstract2'])
>
> where the first item is a list of list of tags (one list for each
> paper/query result), the second a list of titles (in the same order), and
> the third a list of abstracts.
>
> What's unclear is what you want collected_entries to look like. The same
> format (but combined over all keys)?  Or a set of all encountered tags,
> titles, and abstracts. If the latter, use all_terms.update(*new_tags[0]))
> in your combiner to flatten out your double nesting of lists.
>
> It might simplify things to note that you can emit more than one output
> for each input element. E.g. you could write
>
> def query_with_keywords(query):
>     search = arxiv.Search(
>         query=query,
>         max_results=10, # 20000
>         sort_by=arxiv.SortCriterion.LastUpdatedDate
>     )
>     for res in client.results(search):
>         if res.primary_category in ["cs.CV", "stat.ML", "cs.LG"]:
>             yield beam.Row(terms=res.categories, title=res.title,
> abstract=res.summary)
>
> and then you would use this as
>
>     records =  keywords | beam.FlatMap(query_with_keywords)
>
> which would result in records being a PCollection with one element per
> search result (see the documentation for FlatMap), which would likely
> simplify things, i.e. records would be
>
> (['cs.CV', 'cs.AI'], 'title1', 'abstract1')
> (['cs.LG', 'cs.CV', 'cs.AI'], 'title2', 'abstract2')
> ...
>
> You could then do a global combine as before but the combiner logic should
> be simpler to write and reason about. (Note that after CombineGlobally, you
> have a PCollection with a *single* element in it. If you're just trying to
> write this out as a csv, you could instead do
>
>     records = beam.FlatMap(query_with_keywords)
>     to_dataframe(records).to_csv('path')
>
> no combining necessary.
>
>
> On Tue, Sep 28, 2021 at 7:08 PM Sayak Paul <sp...@gmail.com> wrote:
>
>> Yeah sure. Please find it attached.
>>
>> Each individual list represents terms, titles, and the underlying paper
>> abstracts. I was able to make it work with numpy.array(). So, given a query
>> keyword, I want to be able to find a couple of papers from arXiv, collect
>> their titles and abstracts and store them inside a CSV file. This is the
>> goal. My example is here:
>> https://gist.github.com/sayakpaul/5997eddc26f87cb3a3f53032b46d0536.
>>
>> Now, during creating the dataframe (for CVS serialization) I am not able
>> to get the right format. Essentially the structure should follow something
>> like this (taken from the official guide
>> <https://beam.apache.org/documentation/dsls/dataframes/overview/>):
>>
>> [image: image.png]
>> But mine is appearing like so:
>>
>> [image: image.png]
>>
>> It's not row-wise. Here's the gist of the pipeline:
>>
>> with beam.Pipeline() as pipeline:
>>     keywords = pipeline | beam.Create(query_keywords[:2])
>>     records =  keywords | beam.Map(query_with_keywords)
>>     collected_entries = records | beam.CombineGlobally(GatherRecords())
>>
>>     dataframe_row = collected_entries | beam.Map(
>>         lambda x: beam.Row(title=str(x[0]), abstract=str(x[1]),
>> term=list(x[2]))
>>     )
>>     df = to_dataframe(dataframe_row)
>>     (
>>         to_pcollection(df)
>>         | "To dictionaries" >> beam.Map(lambda x: dict(x._asdict()))
>>         | "Print" >> beam.Map(print)
>>     )
>>
>> The entire code can be found here:
>> https://gist.github.com/sayakpaul/5997eddc26f87cb3a3f53032b46d0536. Let
>> me know if anything is still unclear.
>>
>> Sayak Paul | sayak.dev
>>
>>
>>
>> On Tue, Sep 28, 2021 at 11:51 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> It would be helpful if you gave an example of two inputs and what you
>>> want the final merged value for them to be.
>>>
>>> On Mon, Sep 27, 2021 at 8:01 PM Sayak Paul <sp...@gmail.com>
>>> wrote:
>>> >
>>> > Hi, it's me again.
>>> >
>>> > Under `merge_accumulators()` I have three lists under accumulators:
>>> >
>>> > * all_terms: list of lists: an example: [['cs.CV', 'cs.AI'], ['cs.CV',
>>> 'cs.LG'], ['cs.CV', 'cs.AI', 'cs.LG'], ['cs.CV', 'cs.AI']]
>>> > * all_titles: list of titles: an example: ['Towards to Robust and
>>> Generalized Medical Image Segmentation Framework', 'Survey on Semantic
>>> Stereo Matching / Semantic Depth Estimation', 'FUTURE-AI: Guiding
>>> Principles and Consensus Recommendations for Trustworthy Artificial
>>> Intelligence in Future Medical Imaging', 'Enforcing Mutual Consistency of
>>> Hard Regions for Semi-supervised Medical Image Segmentation']
>>> > * all_asbtracts: list of abstracts
>>> >
>>> > I am unable to figure a way out on how to collate them to produce the
>>> final outputs for all_terms. For the other two, here's what I am doing:
>>> >
>>> > merged_titles = set.union(set(all_titles[0][0]))
>>> > merged_abstracts = set.union(set(all_abstracts[0][0]))
>>> >
>>> > The indexing is there to extract the core content and to have it
>>> respect the type spec I mentioned above.
>>> >
>>> > Any hints would be really appreciated.
>>> >
>>> > Sayak Paul | sayak.dev
>>> >
>>> >
>>> >
>>> > On Tue, Sep 28, 2021 at 7:29 AM Sayak Paul <sp...@gmail.com>
>>> wrote:
>>> >>
>>> >> Very silly of me. Thank you :)
>>> >>
>>> >> > (Also, in your combiner, I'm not understanding why you have
>>> >> all_terms.add(new_tags[0]). Did you want
>>> >> all_terms.update(*new_tags[0]))?
>>> >>
>>> >> This is because each term can be a list of many different terms. That
>>> is why. Hence they need not be unique. But for the other things like titles
>>> and abstracts they should always be unique given the way
>>> query_with_keywords() is written.
>>> >>
>>> >> Sayak Paul | sayak.dev
>>> >>
>>> >>
>>> >>
>>> >> On Mon, Sep 27, 2021 at 10:42 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>>
>>> >>> The problem is in your line
>>> >>>
>>> >>>     collected_entries = beam.CombineGlobally(GatherRecords())
>>> >>>
>>> >>> You're not applying the CombineGlobally transform to anything, just
>>> >>> assigning it to the variable collected_entries. This should probably
>>> >>> be
>>> >>>
>>> >>>     collected_entries = records |
>>> beam.CombineGlobally(GatherRecords())
>>> >>>
>>> >>> (Also, in your combiner, I'm not understanding why you have
>>> >>> all_terms.add(new_tags[0]). Did you want
>>> >>> all_terms.update(*new_tags[0]))?
>>> >>>
>>> >>> On Sun, Sep 26, 2021 at 5:41 AM Sayak Paul <sp...@gmail.com>
>>> wrote:
>>> >>> >
>>> >>> > Hi folks,
>>> >>> >
>>> >>> > I am currently working on a pipeline with which I want to gather a
>>> bunch of paper titles, abstracts, and their term categories from arXiv. I
>>> am using a combination of CombineGlobally and a custom CombineFn to
>>> maintain three different sets to accumulate these records.
>>> >>> >
>>> >>> > I might have written the accumulator in the wrong manner but I am
>>> not sure where it's going wrong i.e. the pipeline is able to collect the
>>> entries using the arxiv API but not able to accumulate the results.
>>> >>> >
>>> >>> > Here's my notebook for reproducing the issue.
>>> >>> >
>>> >>> > Sayak Paul | sayak.dev
>>> >>> >
>>>
>>

Re: Using a pipeline for collecting records from arXiv

Posted by Sayak Paul <sp...@gmail.com>.
Thanks much. Let me work with these and get back to you. A primary version
of the dataset is listed here:
https://www.kaggle.com/spsayakpaul/arxiv-paper-abstracts.

If we end up developing the Beam pipeline for this, the dataset will be
updated and we will of course cite your contribution.

Sayak Paul | sayak.dev



On Wed, Sep 29, 2021 at 8:45 PM Robert Bradshaw <ro...@google.com> wrote:

> So your records look something like
>
> ([['cs.CV', 'cs.AI'], ['cs.LG', 'cs.CV', 'cs.AI']],
>  ['title1', 'title2'],
>  ['abstract 1', 'abstract2'])
>
> where the first item is a list of list of tags (one list for each
> paper/query result), the second a list of titles (in the same order), and
> the third a list of abstracts.
>
> What's unclear is what you want collected_entries to look like. The same
> format (but combined over all keys)?  Or a set of all encountered tags,
> titles, and abstracts. If the latter, use all_terms.update(*new_tags[0]))
> in your combiner to flatten out your double nesting of lists.
>
> It might simplify things to note that you can emit more than one output
> for each input element. E.g. you could write
>
> def query_with_keywords(query):
>     search = arxiv.Search(
>         query=query,
>         max_results=10, # 20000
>         sort_by=arxiv.SortCriterion.LastUpdatedDate
>     )
>     for res in client.results(search):
>         if res.primary_category in ["cs.CV", "stat.ML", "cs.LG"]:
>             yield beam.Row(terms=res.categories, title=res.title,
> abstract=res.summary)
>
> and then you would use this as
>
>     records =  keywords | beam.FlatMap(query_with_keywords)
>
> which would result in records being a PCollection with one element per
> search result (see the documentation for FlatMap), which would likely
> simplify things, i.e. records would be
>
> (['cs.CV', 'cs.AI'], 'title1', 'abstract1')
> (['cs.LG', 'cs.CV', 'cs.AI'], 'title2', 'abstract2')
> ...
>
> You could then do a global combine as before but the combiner logic should
> be simpler to write and reason about. (Note that after CombineGlobally, you
> have a PCollection with a *single* element in it. If you're just trying to
> write this out as a csv, you could instead do
>
>     records = beam.FlatMap(query_with_keywords)
>     to_dataframe(records).to_csv('path')
>
> no combining necessary.
>
>
> On Tue, Sep 28, 2021 at 7:08 PM Sayak Paul <sp...@gmail.com> wrote:
>
>> Yeah sure. Please find it attached.
>>
>> Each individual list represents terms, titles, and the underlying paper
>> abstracts. I was able to make it work with numpy.array(). So, given a query
>> keyword, I want to be able to find a couple of papers from arXiv, collect
>> their titles and abstracts and store them inside a CSV file. This is the
>> goal. My example is here:
>> https://gist.github.com/sayakpaul/5997eddc26f87cb3a3f53032b46d0536.
>>
>> Now, during creating the dataframe (for CVS serialization) I am not able
>> to get the right format. Essentially the structure should follow something
>> like this (taken from the official guide
>> <https://beam.apache.org/documentation/dsls/dataframes/overview/>):
>>
>> [image: image.png]
>> But mine is appearing like so:
>>
>> [image: image.png]
>>
>> It's not row-wise. Here's the gist of the pipeline:
>>
>> with beam.Pipeline() as pipeline:
>>     keywords = pipeline | beam.Create(query_keywords[:2])
>>     records =  keywords | beam.Map(query_with_keywords)
>>     collected_entries = records | beam.CombineGlobally(GatherRecords())
>>
>>     dataframe_row = collected_entries | beam.Map(
>>         lambda x: beam.Row(title=str(x[0]), abstract=str(x[1]),
>> term=list(x[2]))
>>     )
>>     df = to_dataframe(dataframe_row)
>>     (
>>         to_pcollection(df)
>>         | "To dictionaries" >> beam.Map(lambda x: dict(x._asdict()))
>>         | "Print" >> beam.Map(print)
>>     )
>>
>> The entire code can be found here:
>> https://gist.github.com/sayakpaul/5997eddc26f87cb3a3f53032b46d0536. Let
>> me know if anything is still unclear.
>>
>> Sayak Paul | sayak.dev
>>
>>
>>
>> On Tue, Sep 28, 2021 at 11:51 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>>
>>> It would be helpful if you gave an example of two inputs and what you
>>> want the final merged value for them to be.
>>>
>>> On Mon, Sep 27, 2021 at 8:01 PM Sayak Paul <sp...@gmail.com>
>>> wrote:
>>> >
>>> > Hi, it's me again.
>>> >
>>> > Under `merge_accumulators()` I have three lists under accumulators:
>>> >
>>> > * all_terms: list of lists: an example: [['cs.CV', 'cs.AI'], ['cs.CV',
>>> 'cs.LG'], ['cs.CV', 'cs.AI', 'cs.LG'], ['cs.CV', 'cs.AI']]
>>> > * all_titles: list of titles: an example: ['Towards to Robust and
>>> Generalized Medical Image Segmentation Framework', 'Survey on Semantic
>>> Stereo Matching / Semantic Depth Estimation', 'FUTURE-AI: Guiding
>>> Principles and Consensus Recommendations for Trustworthy Artificial
>>> Intelligence in Future Medical Imaging', 'Enforcing Mutual Consistency of
>>> Hard Regions for Semi-supervised Medical Image Segmentation']
>>> > * all_asbtracts: list of abstracts
>>> >
>>> > I am unable to figure a way out on how to collate them to produce the
>>> final outputs for all_terms. For the other two, here's what I am doing:
>>> >
>>> > merged_titles = set.union(set(all_titles[0][0]))
>>> > merged_abstracts = set.union(set(all_abstracts[0][0]))
>>> >
>>> > The indexing is there to extract the core content and to have it
>>> respect the type spec I mentioned above.
>>> >
>>> > Any hints would be really appreciated.
>>> >
>>> > Sayak Paul | sayak.dev
>>> >
>>> >
>>> >
>>> > On Tue, Sep 28, 2021 at 7:29 AM Sayak Paul <sp...@gmail.com>
>>> wrote:
>>> >>
>>> >> Very silly of me. Thank you :)
>>> >>
>>> >> > (Also, in your combiner, I'm not understanding why you have
>>> >> all_terms.add(new_tags[0]). Did you want
>>> >> all_terms.update(*new_tags[0]))?
>>> >>
>>> >> This is because each term can be a list of many different terms. That
>>> is why. Hence they need not be unique. But for the other things like titles
>>> and abstracts they should always be unique given the way
>>> query_with_keywords() is written.
>>> >>
>>> >> Sayak Paul | sayak.dev
>>> >>
>>> >>
>>> >>
>>> >> On Mon, Sep 27, 2021 at 10:42 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>>
>>> >>> The problem is in your line
>>> >>>
>>> >>>     collected_entries = beam.CombineGlobally(GatherRecords())
>>> >>>
>>> >>> You're not applying the CombineGlobally transform to anything, just
>>> >>> assigning it to the variable collected_entries. This should probably
>>> >>> be
>>> >>>
>>> >>>     collected_entries = records |
>>> beam.CombineGlobally(GatherRecords())
>>> >>>
>>> >>> (Also, in your combiner, I'm not understanding why you have
>>> >>> all_terms.add(new_tags[0]). Did you want
>>> >>> all_terms.update(*new_tags[0]))?
>>> >>>
>>> >>> On Sun, Sep 26, 2021 at 5:41 AM Sayak Paul <sp...@gmail.com>
>>> wrote:
>>> >>> >
>>> >>> > Hi folks,
>>> >>> >
>>> >>> > I am currently working on a pipeline with which I want to gather a
>>> bunch of paper titles, abstracts, and their term categories from arXiv. I
>>> am using a combination of CombineGlobally and a custom CombineFn to
>>> maintain three different sets to accumulate these records.
>>> >>> >
>>> >>> > I might have written the accumulator in the wrong manner but I am
>>> not sure where it's going wrong i.e. the pipeline is able to collect the
>>> entries using the arxiv API but not able to accumulate the results.
>>> >>> >
>>> >>> > Here's my notebook for reproducing the issue.
>>> >>> >
>>> >>> > Sayak Paul | sayak.dev
>>> >>> >
>>>
>>

Re: Using a pipeline for collecting records from arXiv

Posted by Robert Bradshaw <ro...@google.com>.
So your records look something like

([['cs.CV', 'cs.AI'], ['cs.LG', 'cs.CV', 'cs.AI']],
 ['title1', 'title2'],
 ['abstract 1', 'abstract2'])

where the first item is a list of list of tags (one list for each
paper/query result), the second a list of titles (in the same order), and
the third a list of abstracts.

What's unclear is what you want collected_entries to look like. The same
format (but combined over all keys)?  Or a set of all encountered tags,
titles, and abstracts. If the latter, use all_terms.update(*new_tags[0]))
in your combiner to flatten out your double nesting of lists.

It might simplify things to note that you can emit more than one output for
each input element. E.g. you could write

def query_with_keywords(query):
    search = arxiv.Search(
        query=query,
        max_results=10, # 20000
        sort_by=arxiv.SortCriterion.LastUpdatedDate
    )
    for res in client.results(search):
        if res.primary_category in ["cs.CV", "stat.ML", "cs.LG"]:
            yield beam.Row(terms=res.categories, title=res.title,
abstract=res.summary)

and then you would use this as

    records =  keywords | beam.FlatMap(query_with_keywords)

which would result in records being a PCollection with one element per
search result (see the documentation for FlatMap), which would likely
simplify things, i.e. records would be

(['cs.CV', 'cs.AI'], 'title1', 'abstract1')
(['cs.LG', 'cs.CV', 'cs.AI'], 'title2', 'abstract2')
...

You could then do a global combine as before but the combiner logic should
be simpler to write and reason about. (Note that after CombineGlobally, you
have a PCollection with a *single* element in it. If you're just trying to
write this out as a csv, you could instead do

    records = beam.FlatMap(query_with_keywords)
    to_dataframe(records).to_csv('path')

no combining necessary.


On Tue, Sep 28, 2021 at 7:08 PM Sayak Paul <sp...@gmail.com> wrote:

> Yeah sure. Please find it attached.
>
> Each individual list represents terms, titles, and the underlying paper
> abstracts. I was able to make it work with numpy.array(). So, given a query
> keyword, I want to be able to find a couple of papers from arXiv, collect
> their titles and abstracts and store them inside a CSV file. This is the
> goal. My example is here:
> https://gist.github.com/sayakpaul/5997eddc26f87cb3a3f53032b46d0536.
>
> Now, during creating the dataframe (for CVS serialization) I am not able
> to get the right format. Essentially the structure should follow something
> like this (taken from the official guide
> <https://beam.apache.org/documentation/dsls/dataframes/overview/>):
>
> [image: image.png]
> But mine is appearing like so:
>
> [image: image.png]
>
> It's not row-wise. Here's the gist of the pipeline:
>
> with beam.Pipeline() as pipeline:
>     keywords = pipeline | beam.Create(query_keywords[:2])
>     records =  keywords | beam.Map(query_with_keywords)
>     collected_entries = records | beam.CombineGlobally(GatherRecords())
>
>     dataframe_row = collected_entries | beam.Map(
>         lambda x: beam.Row(title=str(x[0]), abstract=str(x[1]),
> term=list(x[2]))
>     )
>     df = to_dataframe(dataframe_row)
>     (
>         to_pcollection(df)
>         | "To dictionaries" >> beam.Map(lambda x: dict(x._asdict()))
>         | "Print" >> beam.Map(print)
>     )
>
> The entire code can be found here:
> https://gist.github.com/sayakpaul/5997eddc26f87cb3a3f53032b46d0536. Let
> me know if anything is still unclear.
>
> Sayak Paul | sayak.dev
>
>
>
> On Tue, Sep 28, 2021 at 11:51 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> It would be helpful if you gave an example of two inputs and what you
>> want the final merged value for them to be.
>>
>> On Mon, Sep 27, 2021 at 8:01 PM Sayak Paul <sp...@gmail.com> wrote:
>> >
>> > Hi, it's me again.
>> >
>> > Under `merge_accumulators()` I have three lists under accumulators:
>> >
>> > * all_terms: list of lists: an example: [['cs.CV', 'cs.AI'], ['cs.CV',
>> 'cs.LG'], ['cs.CV', 'cs.AI', 'cs.LG'], ['cs.CV', 'cs.AI']]
>> > * all_titles: list of titles: an example: ['Towards to Robust and
>> Generalized Medical Image Segmentation Framework', 'Survey on Semantic
>> Stereo Matching / Semantic Depth Estimation', 'FUTURE-AI: Guiding
>> Principles and Consensus Recommendations for Trustworthy Artificial
>> Intelligence in Future Medical Imaging', 'Enforcing Mutual Consistency of
>> Hard Regions for Semi-supervised Medical Image Segmentation']
>> > * all_asbtracts: list of abstracts
>> >
>> > I am unable to figure a way out on how to collate them to produce the
>> final outputs for all_terms. For the other two, here's what I am doing:
>> >
>> > merged_titles = set.union(set(all_titles[0][0]))
>> > merged_abstracts = set.union(set(all_abstracts[0][0]))
>> >
>> > The indexing is there to extract the core content and to have it
>> respect the type spec I mentioned above.
>> >
>> > Any hints would be really appreciated.
>> >
>> > Sayak Paul | sayak.dev
>> >
>> >
>> >
>> > On Tue, Sep 28, 2021 at 7:29 AM Sayak Paul <sp...@gmail.com>
>> wrote:
>> >>
>> >> Very silly of me. Thank you :)
>> >>
>> >> > (Also, in your combiner, I'm not understanding why you have
>> >> all_terms.add(new_tags[0]). Did you want
>> >> all_terms.update(*new_tags[0]))?
>> >>
>> >> This is because each term can be a list of many different terms. That
>> is why. Hence they need not be unique. But for the other things like titles
>> and abstracts they should always be unique given the way
>> query_with_keywords() is written.
>> >>
>> >> Sayak Paul | sayak.dev
>> >>
>> >>
>> >>
>> >> On Mon, Sep 27, 2021 at 10:42 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>>
>> >>> The problem is in your line
>> >>>
>> >>>     collected_entries = beam.CombineGlobally(GatherRecords())
>> >>>
>> >>> You're not applying the CombineGlobally transform to anything, just
>> >>> assigning it to the variable collected_entries. This should probably
>> >>> be
>> >>>
>> >>>     collected_entries = records |
>> beam.CombineGlobally(GatherRecords())
>> >>>
>> >>> (Also, in your combiner, I'm not understanding why you have
>> >>> all_terms.add(new_tags[0]). Did you want
>> >>> all_terms.update(*new_tags[0]))?
>> >>>
>> >>> On Sun, Sep 26, 2021 at 5:41 AM Sayak Paul <sp...@gmail.com>
>> wrote:
>> >>> >
>> >>> > Hi folks,
>> >>> >
>> >>> > I am currently working on a pipeline with which I want to gather a
>> bunch of paper titles, abstracts, and their term categories from arXiv. I
>> am using a combination of CombineGlobally and a custom CombineFn to
>> maintain three different sets to accumulate these records.
>> >>> >
>> >>> > I might have written the accumulator in the wrong manner but I am
>> not sure where it's going wrong i.e. the pipeline is able to collect the
>> entries using the arxiv API but not able to accumulate the results.
>> >>> >
>> >>> > Here's my notebook for reproducing the issue.
>> >>> >
>> >>> > Sayak Paul | sayak.dev
>> >>> >
>>
>

Re: Using a pipeline for collecting records from arXiv

Posted by Sayak Paul <sp...@gmail.com>.
Yeah sure. Please find it attached.

Each individual list represents terms, titles, and the underlying paper
abstracts. I was able to make it work with numpy.array(). So, given a query
keyword, I want to be able to find a couple of papers from arXiv, collect
their titles and abstracts and store them inside a CSV file. This is the
goal. My example is here:
https://gist.github.com/sayakpaul/5997eddc26f87cb3a3f53032b46d0536.

Now, during creating the dataframe (for CVS serialization) I am not able to
get the right format. Essentially the structure should follow something
like this (taken from the official guide
<https://beam.apache.org/documentation/dsls/dataframes/overview/>):

[image: image.png]
But mine is appearing like so:

[image: image.png]

It's not row-wise. Here's the gist of the pipeline:

with beam.Pipeline() as pipeline:
    keywords = pipeline | beam.Create(query_keywords[:2])
    records =  keywords | beam.Map(query_with_keywords)
    collected_entries = records | beam.CombineGlobally(GatherRecords())

    dataframe_row = collected_entries | beam.Map(
        lambda x: beam.Row(title=str(x[0]), abstract=str(x[1]),
term=list(x[2]))
    )
    df = to_dataframe(dataframe_row)
    (
        to_pcollection(df)
        | "To dictionaries" >> beam.Map(lambda x: dict(x._asdict()))
        | "Print" >> beam.Map(print)
    )

The entire code can be found here:
https://gist.github.com/sayakpaul/5997eddc26f87cb3a3f53032b46d0536. Let me
know if anything is still unclear.

Sayak Paul | sayak.dev



On Tue, Sep 28, 2021 at 11:51 PM Robert Bradshaw <ro...@google.com>
wrote:

> It would be helpful if you gave an example of two inputs and what you
> want the final merged value for them to be.
>
> On Mon, Sep 27, 2021 at 8:01 PM Sayak Paul <sp...@gmail.com> wrote:
> >
> > Hi, it's me again.
> >
> > Under `merge_accumulators()` I have three lists under accumulators:
> >
> > * all_terms: list of lists: an example: [['cs.CV', 'cs.AI'], ['cs.CV',
> 'cs.LG'], ['cs.CV', 'cs.AI', 'cs.LG'], ['cs.CV', 'cs.AI']]
> > * all_titles: list of titles: an example: ['Towards to Robust and
> Generalized Medical Image Segmentation Framework', 'Survey on Semantic
> Stereo Matching / Semantic Depth Estimation', 'FUTURE-AI: Guiding
> Principles and Consensus Recommendations for Trustworthy Artificial
> Intelligence in Future Medical Imaging', 'Enforcing Mutual Consistency of
> Hard Regions for Semi-supervised Medical Image Segmentation']
> > * all_asbtracts: list of abstracts
> >
> > I am unable to figure a way out on how to collate them to produce the
> final outputs for all_terms. For the other two, here's what I am doing:
> >
> > merged_titles = set.union(set(all_titles[0][0]))
> > merged_abstracts = set.union(set(all_abstracts[0][0]))
> >
> > The indexing is there to extract the core content and to have it respect
> the type spec I mentioned above.
> >
> > Any hints would be really appreciated.
> >
> > Sayak Paul | sayak.dev
> >
> >
> >
> > On Tue, Sep 28, 2021 at 7:29 AM Sayak Paul <sp...@gmail.com>
> wrote:
> >>
> >> Very silly of me. Thank you :)
> >>
> >> > (Also, in your combiner, I'm not understanding why you have
> >> all_terms.add(new_tags[0]). Did you want
> >> all_terms.update(*new_tags[0]))?
> >>
> >> This is because each term can be a list of many different terms. That
> is why. Hence they need not be unique. But for the other things like titles
> and abstracts they should always be unique given the way
> query_with_keywords() is written.
> >>
> >> Sayak Paul | sayak.dev
> >>
> >>
> >>
> >> On Mon, Sep 27, 2021 at 10:42 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>>
> >>> The problem is in your line
> >>>
> >>>     collected_entries = beam.CombineGlobally(GatherRecords())
> >>>
> >>> You're not applying the CombineGlobally transform to anything, just
> >>> assigning it to the variable collected_entries. This should probably
> >>> be
> >>>
> >>>     collected_entries = records | beam.CombineGlobally(GatherRecords())
> >>>
> >>> (Also, in your combiner, I'm not understanding why you have
> >>> all_terms.add(new_tags[0]). Did you want
> >>> all_terms.update(*new_tags[0]))?
> >>>
> >>> On Sun, Sep 26, 2021 at 5:41 AM Sayak Paul <sp...@gmail.com>
> wrote:
> >>> >
> >>> > Hi folks,
> >>> >
> >>> > I am currently working on a pipeline with which I want to gather a
> bunch of paper titles, abstracts, and their term categories from arXiv. I
> am using a combination of CombineGlobally and a custom CombineFn to
> maintain three different sets to accumulate these records.
> >>> >
> >>> > I might have written the accumulator in the wrong manner but I am
> not sure where it's going wrong i.e. the pipeline is able to collect the
> entries using the arxiv API but not able to accumulate the results.
> >>> >
> >>> > Here's my notebook for reproducing the issue.
> >>> >
> >>> > Sayak Paul | sayak.dev
> >>> >
>

Re: Using a pipeline for collecting records from arXiv

Posted by Robert Bradshaw <ro...@google.com>.
It would be helpful if you gave an example of two inputs and what you
want the final merged value for them to be.

On Mon, Sep 27, 2021 at 8:01 PM Sayak Paul <sp...@gmail.com> wrote:
>
> Hi, it's me again.
>
> Under `merge_accumulators()` I have three lists under accumulators:
>
> * all_terms: list of lists: an example: [['cs.CV', 'cs.AI'], ['cs.CV', 'cs.LG'], ['cs.CV', 'cs.AI', 'cs.LG'], ['cs.CV', 'cs.AI']]
> * all_titles: list of titles: an example: ['Towards to Robust and Generalized Medical Image Segmentation Framework', 'Survey on Semantic Stereo Matching / Semantic Depth Estimation', 'FUTURE-AI: Guiding Principles and Consensus Recommendations for Trustworthy Artificial Intelligence in Future Medical Imaging', 'Enforcing Mutual Consistency of Hard Regions for Semi-supervised Medical Image Segmentation']
> * all_asbtracts: list of abstracts
>
> I am unable to figure a way out on how to collate them to produce the final outputs for all_terms. For the other two, here's what I am doing:
>
> merged_titles = set.union(set(all_titles[0][0]))
> merged_abstracts = set.union(set(all_abstracts[0][0]))
>
> The indexing is there to extract the core content and to have it respect the type spec I mentioned above.
>
> Any hints would be really appreciated.
>
> Sayak Paul | sayak.dev
>
>
>
> On Tue, Sep 28, 2021 at 7:29 AM Sayak Paul <sp...@gmail.com> wrote:
>>
>> Very silly of me. Thank you :)
>>
>> > (Also, in your combiner, I'm not understanding why you have
>> all_terms.add(new_tags[0]). Did you want
>> all_terms.update(*new_tags[0]))?
>>
>> This is because each term can be a list of many different terms. That is why. Hence they need not be unique. But for the other things like titles and abstracts they should always be unique given the way query_with_keywords() is written.
>>
>> Sayak Paul | sayak.dev
>>
>>
>>
>> On Mon, Sep 27, 2021 at 10:42 PM Robert Bradshaw <ro...@google.com> wrote:
>>>
>>> The problem is in your line
>>>
>>>     collected_entries = beam.CombineGlobally(GatherRecords())
>>>
>>> You're not applying the CombineGlobally transform to anything, just
>>> assigning it to the variable collected_entries. This should probably
>>> be
>>>
>>>     collected_entries = records | beam.CombineGlobally(GatherRecords())
>>>
>>> (Also, in your combiner, I'm not understanding why you have
>>> all_terms.add(new_tags[0]). Did you want
>>> all_terms.update(*new_tags[0]))?
>>>
>>> On Sun, Sep 26, 2021 at 5:41 AM Sayak Paul <sp...@gmail.com> wrote:
>>> >
>>> > Hi folks,
>>> >
>>> > I am currently working on a pipeline with which I want to gather a bunch of paper titles, abstracts, and their term categories from arXiv. I am using a combination of CombineGlobally and a custom CombineFn to maintain three different sets to accumulate these records.
>>> >
>>> > I might have written the accumulator in the wrong manner but I am not sure where it's going wrong i.e. the pipeline is able to collect the entries using the arxiv API but not able to accumulate the results.
>>> >
>>> > Here's my notebook for reproducing the issue.
>>> >
>>> > Sayak Paul | sayak.dev
>>> >

Re: Using a pipeline for collecting records from arXiv

Posted by Sayak Paul <sp...@gmail.com>.
Hi, it's me again.

Under `merge_accumulators()` I have three lists under accumulators:

* *all_terms*: list of lists: an example: [['cs.CV', 'cs.AI'], ['cs.CV',
'cs.LG'], ['cs.CV', 'cs.AI', 'cs.LG'], ['cs.CV', 'cs.AI']]
* *all_titles*: list of titles: an example: ['Towards to Robust and
Generalized Medical Image Segmentation Framework', 'Survey on Semantic
Stereo Matching / Semantic Depth Estimation', 'FUTURE-AI: Guiding
Principles and Consensus Recommendations for Trustworthy Artificial
Intelligence in Future Medical Imaging', 'Enforcing Mutual Consistency of
Hard Regions for Semi-supervised Medical Image Segmentation']
* *all_asbtracts*: list of abstracts

I am unable to figure a way out on how to collate them to produce the final
outputs for *all_terms*. For the other two, here's what I am doing:

merged_titles = set.union(set(all_titles[0][0]))
merged_abstracts = set.union(set(all_abstracts[0][0]))

The indexing is there to extract the core content and to have it respect
the type spec I mentioned above.

Any hints would be really appreciated.

Sayak Paul | sayak.dev



On Tue, Sep 28, 2021 at 7:29 AM Sayak Paul <sp...@gmail.com> wrote:

> Very silly of me. Thank you :)
>
> > (Also, in your combiner, I'm not understanding why you have
> all_terms.add(new_tags[0]). Did you want
> all_terms.update(*new_tags[0]))?
>
> This is because each term can be a list of many different terms. That is
> why. Hence they need not be unique. But for the other things like titles
> and abstracts they should always be unique given the
> way query_with_keywords() is written.
>
> Sayak Paul | sayak.dev
>
>
>
> On Mon, Sep 27, 2021 at 10:42 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> The problem is in your line
>>
>>     collected_entries = beam.CombineGlobally(GatherRecords())
>>
>> You're not applying the CombineGlobally transform to anything, just
>> assigning it to the variable collected_entries. This should probably
>> be
>>
>>     collected_entries = records | beam.CombineGlobally(GatherRecords())
>>
>> (Also, in your combiner, I'm not understanding why you have
>> all_terms.add(new_tags[0]). Did you want
>> all_terms.update(*new_tags[0]))?
>>
>> On Sun, Sep 26, 2021 at 5:41 AM Sayak Paul <sp...@gmail.com> wrote:
>> >
>> > Hi folks,
>> >
>> > I am currently working on a pipeline with which I want to gather a
>> bunch of paper titles, abstracts, and their term categories from arXiv. I
>> am using a combination of CombineGlobally and a custom CombineFn to
>> maintain three different sets to accumulate these records.
>> >
>> > I might have written the accumulator in the wrong manner but I am not
>> sure where it's going wrong i.e. the pipeline is able to collect the
>> entries using the arxiv API but not able to accumulate the results.
>> >
>> > Here's my notebook for reproducing the issue.
>> >
>> > Sayak Paul | sayak.dev
>> >
>>
>

Re: Using a pipeline for collecting records from arXiv

Posted by Sayak Paul <sp...@gmail.com>.
Very silly of me. Thank you :)

> (Also, in your combiner, I'm not understanding why you have
all_terms.add(new_tags[0]). Did you want
all_terms.update(*new_tags[0]))?

This is because each term can be a list of many different terms. That is
why. Hence they need not be unique. But for the other things like titles
and abstracts they should always be unique given the
way query_with_keywords() is written.

Sayak Paul | sayak.dev



On Mon, Sep 27, 2021 at 10:42 PM Robert Bradshaw <ro...@google.com>
wrote:

> The problem is in your line
>
>     collected_entries = beam.CombineGlobally(GatherRecords())
>
> You're not applying the CombineGlobally transform to anything, just
> assigning it to the variable collected_entries. This should probably
> be
>
>     collected_entries = records | beam.CombineGlobally(GatherRecords())
>
> (Also, in your combiner, I'm not understanding why you have
> all_terms.add(new_tags[0]). Did you want
> all_terms.update(*new_tags[0]))?
>
> On Sun, Sep 26, 2021 at 5:41 AM Sayak Paul <sp...@gmail.com> wrote:
> >
> > Hi folks,
> >
> > I am currently working on a pipeline with which I want to gather a bunch
> of paper titles, abstracts, and their term categories from arXiv. I am
> using a combination of CombineGlobally and a custom CombineFn to maintain
> three different sets to accumulate these records.
> >
> > I might have written the accumulator in the wrong manner but I am not
> sure where it's going wrong i.e. the pipeline is able to collect the
> entries using the arxiv API but not able to accumulate the results.
> >
> > Here's my notebook for reproducing the issue.
> >
> > Sayak Paul | sayak.dev
> >
>

Re: Using a pipeline for collecting records from arXiv

Posted by Robert Bradshaw <ro...@google.com>.
The problem is in your line

    collected_entries = beam.CombineGlobally(GatherRecords())

You're not applying the CombineGlobally transform to anything, just
assigning it to the variable collected_entries. This should probably
be

    collected_entries = records | beam.CombineGlobally(GatherRecords())

(Also, in your combiner, I'm not understanding why you have
all_terms.add(new_tags[0]). Did you want
all_terms.update(*new_tags[0]))?

On Sun, Sep 26, 2021 at 5:41 AM Sayak Paul <sp...@gmail.com> wrote:
>
> Hi folks,
>
> I am currently working on a pipeline with which I want to gather a bunch of paper titles, abstracts, and their term categories from arXiv. I am using a combination of CombineGlobally and a custom CombineFn to maintain three different sets to accumulate these records.
>
> I might have written the accumulator in the wrong manner but I am not sure where it's going wrong i.e. the pipeline is able to collect the entries using the arxiv API but not able to accumulate the results.
>
> Here's my notebook for reproducing the issue.
>
> Sayak Paul | sayak.dev
>