You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Paul Johnson (Jira)" <ji...@apache.org> on 2020/05/19 20:21:00 UTC
[jira] [Commented] (BEAM-9380) Python: ReadFromDatastore Embedded
Entities
[ https://issues.apache.org/jira/browse/BEAM-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111502#comment-17111502 ]
Paul Johnson commented on BEAM-9380:
------------------------------------
Looks like the apache_beam.io.gcp.datastore.v1new.types.Entity objects expect key not to be None, I worked around this by subclassing Entity and ReadFromDatastore:
{code:java}
from apache_beam.io.gcp.datastore.v1new import helper
from apache_beam.io.gcp.datastore.v1new.types import Query, Key, Entity
from apache_beam.io.gcp.datastore.v1new.datastoreio import ReadFromDatastore
from apache_beam.transforms import Create
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
from apache_beam.transforms import Reshuffle
from google.cloud.datastore import key
from google.cloud.datastore import entity
class NestedEntity(Entity):
@staticmethod
def from_client_entity(client_entity):
res = Entity(
Key.from_client_key(
client_entity.key) if client_entity.key else None,
exclude_from_indexes=set(client_entity.exclude_from_indexes))
for name, value in client_entity.items():
if isinstance(value, key.Key):
value = Key.from_client_key(value)
if isinstance(value, entity.Entity):
value = NestedEntity.from_client_entity(value)
res.properties[name] = value
return res
def to_client_entity(self):
"""
Returns a :class:`google.cloud.datastore.entity.Entity` instance that
represents this entity.
"""
res = entity.Entity(
key=self.key.to_client_key() if self.key else None,
exclude_from_indexes=tuple(self.exclude_from_indexes))
for name, value in self.properties.items():
if isinstance(value, Key):
if not value.project:
value.project = self.key.project
value = value.to_client_key()
if isinstance(value, Entity):
if not value.key.project:
value.key.project = self.key.project
value = value.to_client_entity()
res[name] = value
return res
class FixedReadFromDatastore(ReadFromDatastore):
def expand(self, pcoll):
return (
pcoll.pipeline
| 'UserQuery' >> Create([self._query])
| 'SplitQuery' >> ParDo(
ReadFromDatastore._SplitQueryFn(self._num_splits))
| Reshuffle()
| 'Read' >> ParDo(FixedReadFromDatastore._QueryFn()))
class _QueryFn(DoFn):
def process(self, query, *unused_args, **unused_kwargs):
_client = helper.get_client(query.project, query.namespace)
client_query = query._to_client_query(_client)
for client_entity in client_query.fetch(query.limit):
yield NestedEntity.from_client_entity(client_entity)
{code}
Don't think I'll have time to do a PR sorry.
> Python: ReadFromDatastore Embedded Entities
> -------------------------------------------
>
> Key: BEAM-9380
> URL: https://issues.apache.org/jira/browse/BEAM-9380
> Project: Beam
> Issue Type: Bug
> Components: io-py-gcp
> Affects Versions: 2.17.0, 2.18.0, 2.19.0
> Reporter: Quentin
> Priority: P2
>
> [Issue 8405|https://issues.apache.org/jira/browse/BEAM-8405] discussed the possibility to support embedded entities when using the conversion methods to/from of the client entity type.
> This feature was added in [PR 9805|[https://github.com/apache/beam/pull/9805]], which was shipped in 2.17.
> However, there seems to be an issue when Datastore embedded entities do not have a key.
> Keys in embedded entities are optional.
> Because of this, when using the apache_beam.io.gcp.datastore.v1new.datastoreio.ReadFromDatastore transform, the pipeline fails with the following stacktrace:
> {code:java}
> File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 440, in apache_beam.runners.common.SimpleInvoker.invoke_process
> File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
> File "/Users/quentin/Work/sensome/dev/senback/dataflow/senback-reporting-db-import-flow/venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/datastoreio.py", line 264, in process
> yield types.Entity.from_client_entity(client_entity)
> File "/Users/quentin/Work/sensome/dev/senback/dataflow/senback-reporting-db-import-flow/venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 225, in from_client_entity
> value = Entity.from_client_entity(value)
> File "/Users/quentin/Work/sensome/dev/senback/dataflow/senback-reporting-db-import-flow/venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 219, in from_client_entity
> Key.from_client_key(client_entity.key),
> File "/Users/quentin/Work/sensome/dev/senback/dataflow/senback-reporting-db-import-flow/venv/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/types.py", line 156, in from_client_key
> return Key(client_key.flat_path, project=client_key.project,
> AttributeError: 'NoneType' object has no attribute 'flat_path' [while running 'Read from Datastore/Read']
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)