You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/05 00:30:06 UTC
[GitHub] [beam] damccorm opened a new issue, #21600: Mismatch in event publish_time
damccorm opened a new issue, #21600:
URL: https://github.com/apache/beam/issues/21600
below is the example code, where we are trying to get the publish_time of the pubsub message in our DoFn(). It seems the type has changed in the version of apache beam starting 2.36.0. However, I was not able to find any release notes that talk about this change. Any reference will be helpful.
```
class ProtoToDictDoFn(beam.DoFn):
def process(self, element, publish_time=beam.DoFn.TimestampParam):
"""
element is type PubsubMessage(),
"""
print('-------------')
print(type(element.publish_time))
print(element.publish_time)
print('-------------')
Output:
— for version 2.35.0 —
<class
'google.protobuf.timestamp_pb2.Timestamp'>
seconds: 1652814206
nanos: 417000000
— for version >=
2.36.0 —
<class 'proto.datetime_helpers.DatetimeWithNanoseconds'>
2022-05-17 19:02:06.314000+00:00
```
Which seems to be an issue. As per the [google pubsub documentation publish time should be type "google.protobuf.timestamp_pb2.Timestamp"](https://github.com/googleapis/python-pubsub/blob/main/google/pubsub_v1/types/pubsub.py#L232)
Any clue, what has changed or caused this issue?
Imported from Jira [BEAM-14482](https://issues.apache.org/jira/browse/BEAM-14482). Original Jira may contain additional context.
Reported by: daljeet.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] Mismatch in event publish_time [beam]
Posted by "lazarillo (via GitHub)" <gi...@apache.org>.
lazarillo commented on issue #21600:
URL: https://github.com/apache/beam/issues/21600#issuecomment-1873925030
I normally don't add any "me, too" comments, but it has been over 1.5 years and this issue is still there and it seems there is no follow-up.
In my case, I am receiving this data type from Pub/Sub's publish time:
```python
messages: PCollection[Dict[str, Any]] = (
pipeline
| f"Read Subscript"
>> ReadFromPubSub(
subscription=subscription_full_path,
id_label="message_id",
with_attributes=True,
).with_output_types(PubsubMessage)
| "Decode JSON" >> ParDo(JsonLoad()).with_output_types(dict)
| "Include subscription name"
>> Map(lambda x: {**x, "subscription_name": subscription}).with_output_types(
dict
)
)
```
(`JsonLoad` is a `DoFn` that is _almost_ exactly just a `Map` of `json.loads`, but capturing attributes and managing incorrect incoming data.)
The data `PCollection` that I receive from `ReadFromPubSub` includes a `publish_time` that is `DatetimeWithNanoseconds`:
```python
{
'message_id': '9961304079927039',
...,
'publish_time': DatetimeWithNanoseconds(2024, 1, 2, 10, 16, 22, 762000, tzinfo=datetime.timezone.utc)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org