You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Paul Johnson (Jira)" <ji...@apache.org> on 2020/05/19 20:21:00 UTC

[jira] [Commented] (BEAM-9380) Python: ReadFromDatastore Embedded Entities

    [ https://issues.apache.org/jira/browse/BEAM-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111502#comment-17111502 ] 

Paul Johnson commented on BEAM-9380:
------------------------------------

Looks like the apache_beam.io.gcp.datastore.v1new.types.Entity objects expect key not to be None, I worked around this by subclassing Entity and ReadFromDatastore:

 
{code:java}
from apache_beam.io.gcp.datastore.v1new import helper
from apache_beam.io.gcp.datastore.v1new.types import Query, Key, Entity
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.transforms import Create
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import Reshuffle
from google.cloud.datastore import key
from google.cloud.datastore import entity


class NestedEntity(Entity):
    @staticmethod
    def from_client_entity(client_entity):
        res = Entity(
            Key.from_client_key(
                client_entity.key) if client_entity.key else None,
            exclude_from_indexes=set(client_entity.exclude_from_indexes))
        for name, value in client_entity.items():
            if isinstance(value, key.Key):
                value = Key.from_client_key(value)
            if isinstance(value, entity.Entity):
                value = NestedEntity.from_client_entity(value)
            res.properties[name] = value
        return res    
    
    def to_client_entity(self):
        """
        Returns a :class:`google.cloud.datastore.entity.Entity` instance that
        represents this entity.
        """
        res = entity.Entity(
            key=self.key.to_client_key() if self.key else None,
            exclude_from_indexes=tuple(self.exclude_from_indexes))
        for name, value in self.properties.items():
            if isinstance(value, Key):
                if not value.project:
                    value.project = self.key.project
                value = value.to_client_key()
            if isinstance(value, Entity):
                if not value.key.project:
                    value.key.project = self.key.project
                value = value.to_client_entity()
            res[name] = value
        return res


class FixedReadFromDatastore(ReadFromDatastore):
    def expand(self, pcoll):
        return (
            pcoll.pipeline
            | 'UserQuery' >> Create([self._query])
            | 'SplitQuery' >> ParDo(
                ReadFromDatastore._SplitQueryFn(self._num_splits))
            | Reshuffle()
            | 'Read' >> ParDo(FixedReadFromDatastore._QueryFn()))    
    
    class _QueryFn(DoFn):
        def process(self, query, *unused_args, **unused_kwargs):
            _client = helper.get_client(query.project, query.namespace)
            client_query = query._to_client_query(_client)
            for client_entity in client_query.fetch(query.limit):
                yield NestedEntity.from_client_entity(client_entity)

{code}
 

Don't think I'll have time to do a PR sorry.

 

> Python: ReadFromDatastore Embedded Entities
> -------------------------------------------
>
>                 Key: BEAM-9380
>                 URL: https://issues.apache.org/jira/browse/BEAM-9380
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>    Affects Versions: 2.17.0, 2.18.0, 2.19.0
>            Reporter: Quentin
>            Priority: P2
>
> [Issue 8405|https://issues.apache.org/jira/browse/BEAM-8405] discussed the possibility to support embedded entities when using the conversion methods to/from of the client entity type.
> This feature was added in [PR 9805|[https://github.com/apache/beam/pull/9805]], which was shipped in 2.17.
> However, there seems to be an issue when Datastore embedded entities do not have a key.
>  Keys in embedded entities are optional.
> Because of this, when using the apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore transform, the pipeline fails with the following stacktrace:
> {code:java}
>   File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 440, in apache_beam.runners.common.SimpleInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/Users/quentin/Work/sensome/dev/senback/dataflow/senback-reporting-db-import-flow/venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py", line 264, in process
>     yield types.Entity.from_client_entity(client_entity)
>   File "/Users/quentin/Work/sensome/dev/senback/dataflow/senback-reporting-db-import-flow/venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 225, in from_client_entity
>     value = Entity.from_client_entity(value)
>   File "/Users/quentin/Work/sensome/dev/senback/dataflow/senback-reporting-db-import-flow/venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 219, in from_client_entity
>     Key.from_client_key(client_entity.key),
>   File "/Users/quentin/Work/sensome/dev/senback/dataflow/senback-reporting-db-import-flow/venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 156, in from_client_key
>     return Key(client_key.flat_path, project=client_key.project,
> AttributeError: 'NoneType' object has no attribute 'flat_path' [while running 'Read from Datastore/Read']
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)