You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Willi Schinmeyer (Jira)" <ji...@apache.org> on 2021/09/17 13:09:00 UTC

[jira] [Commented] (BEAM-12791) Allow Queries from PCollection in datastore.v1new.datastoreio.ReadFromDatastore

    [ https://issues.apache.org/jira/browse/BEAM-12791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416670#comment-17416670 ] 

Willi Schinmeyer commented on BEAM-12791:
-----------------------------------------

I've looked into it, and currently {{ReadFromDatastore}} works like this: 
{code:python}
 return (
    pcoll.pipeline
    | 'UserQuery' >> Create([self._query])
    | 'SplitQuery' >> ParDo(
        ReadFromDatastore._SplitQueryFn(self._num_splits))
    | Reshuffle()
    | 'Read' >> ParDo(ReadFromDatastore._QueryFn())){code}
so it already works on PCollections of queries internally, there's just currently no way to pass them in from outside.

I've created my own copy that uses the input pcoll instead of creating one, which seems to work just fine, with one drawback: There's no way to match the entities in the output to the input queries. So I've also created wrappers for {{_SplitQueryFn}} and {{_QueryFn}} that pass along generic userdata. Here's the result:

{code:python}
from typing import Generic, Iterable, Tuple, TypeVar

import apache_beam
from apache_beam.io.gcp.datastore.v1new import datastoreio, types


T = TypeVar("T")
U = TypeVar("U")


@apache_beam.typehints.with_input_types(Tuple[T, types.Query])
@apache_beam.typehints.with_output_types(Tuple[T, types.Entity])
class ReadFromDatastoreDynamically(apache_beam.PTransform, Generic[T]):
    """
    A variant of datastoreio.ReadFromDatastore that takes queries from the incoming PCollection instead of on construction,
    and also carries generic userdata so you can identify outputs.
    """

    def __init__(self, num_splits=0):
        super().__init__()
        self._num_splits = num_splits

    def expand(self, input_or_inputs):
        # This is a composite transform involving the following:
        #   1. Take the queries from the input and apply a ``ParDo``
        #   that splits them into `num_splits` queries each, if possible.
        #
        #   If the value of `num_splits` is 0, the number of splits will be
        #   computed dynamically based on the size of the data for the `query`.
        #
        #   2. The resulting ``PCollection`` is sharded across workers using a
        #   ``Reshuffle`` operation.
        #
        #   3. In the third step, a ``ParDo`` reads entities for each query and
        #   outputs a ``PCollection[Tuple[T, Entity]]``.

        return (
            input_or_inputs
            | "SplitQuery" >> apache_beam.ParDo(ReadFromDatastoreDynamically._SplitQueryFn(self._num_splits))
            | apache_beam.Reshuffle()
            | "Read" >> apache_beam.ParDo(ReadFromDatastoreDynamically._QueryFn())
        )

    @apache_beam.typehints.with_input_types(Tuple[U, types.Query])
    @apache_beam.typehints.with_output_types(Tuple[U, types.Query])
    class _SplitQueryFn(apache_beam.DoFn, Generic[U]):
        """
        Extends the original _SplitQueryFn with userdata.
        """

        def __init__(self, num_splits):
            super().__init__()
            # caution: we use a protected inner class, this could break on Beam update!
            self.__delegate = datastoreio.ReadFromDatastore._SplitQueryFn(num_splits)

        def process(self, element: Tuple[U, types.Query], *args, **kwargs) -> Iterable[Tuple[U, types.Query]]:
            key, query = element
            split_queries = self.__delegate.process(query=query, *args, **kwargs)
            return ((key, q) for q in split_queries)

    @apache_beam.typehints.with_input_types(Tuple[U, types.Query])
    @apache_beam.typehints.with_output_types(Tuple[U, types.Entity])
    class _QueryFn(apache_beam.DoFn, Generic[U]):
        """
        Extends the original _SplitQueryFn with userdata.
        """

        def __init__(self):
            super().__init__()
            # caution: we inherit from a protected inner class, this could break on Beam update!
            self.__delegate = datastoreio.ReadFromDatastore._QueryFn()

        def process(self, element: Tuple[U, types.Query], *args, **kwargs) -> Iterable[Tuple[U, types.Entity]]:
            key, query = element
            return ((key, e) for e in self.__delegate.process(query, *args, **kwargs))
{code}

This relies on using internal DoFns, whose API could change in future Beam versions, so I'd still prefer an official solution, but for now this will satisfy my needs.

> Allow Queries from PCollection in datastore.v1new.datastoreio.ReadFromDatastore
> -------------------------------------------------------------------------------
>
>                 Key: BEAM-12791
>                 URL: https://issues.apache.org/jira/browse/BEAM-12791
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-py-gcp
>            Reporter: Willi Schinmeyer
>            Priority: P2
>
> As far as I can tell, I currently have to provide the Datastore Query when I initially create my pipeline. However, I would like to only fetch Entities corresponding to the data currently being processed by my pipeline. In other words, I would like to pass a PCollection of Queries (or at least Query Filters) to ReadFromDatastore.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)