You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Jacob Ferriero (Jira)" <ji...@apache.org> on 2020/04/30 22:00:00 UTC

[jira] [Comment Edited] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization

    [ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096961#comment-17096961 ] 

Jacob Ferriero edited comment on BEAM-9856 at 4/30/20, 9:59 PM:
----------------------------------------------------------------

The existing GA [HL7v2 Messages.List API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list] allows us to specify a filter and order by sendTime. We should be able to use this as our restriction dimension to make this a splitable DoFn.

I will investigate feasibility of this.


Basic Design Proposal:
Each Messages.List query will have a sendTime filter based on it's restriction and orderBy sendTime in order closely mimic the [OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html] pattern.

* getInitialRestriction: run a query against HL7v2 store to get earliest sendTime 
* RestrictionTracker: keep a "watermark" based on sendTime
* splitRestriction: split timestamp range based on fraction (this will require an existing HL7v2 Message List call can be notified to "stop paginating through the at an arbitrary sendTime". Note that each List query might not get to the end of it's sendTime filter (due to splitting). Instead it should just spot processing results when it encounters are record past the end of it's restriction.

This should allow us to more eagerly emit results as certain restrictions are completed.


Open Questions:
* Is there a canonical way of specifying more than one initial restriction based on assumption that you know for most use cases you'll want to split at least n-times upfront (e.g. partition by day/hour to start and dynamically split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional user parameter for the transform? What should the default be (e.g. Instant.now() just before firing the first query)? or Should the ListMessages just scroll until it is "caught up" there are no newer messages?


Jake's two cents:
I consider this List Messages transform to primarily serve a batch / bounded backfill or replay use case. I believe real-time use cases should use the Pub Sub notifications for event driven / streaming updates from the HL7v2 store with HL7v2IO.readAll() (as this allows for much greater parallelization and is more likely to "keep up" during higher throughput). However, there is nothing stopping a user from using ListMessages against a store that is still being updated. If this becomes a splittable DoFn that fires many ListMessages throughtout it's life time and our initial restriction is [minSendTime,  inf), this could become an unbounded source (or more specifically unbounded per element). I feel we should force ListMessages to be bounded per element by always having a endSendTime.


was (Author: data-runner0):
The existing GA [HL7v2 Messages.List API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list] allows us to specify a filter and order by createTime. We should be able to use this as our restriction dimension to make this a splitable DoFn.

I will investigate feasibility of this.


Basic Design Proposal:
Each Messages.List query will have a createTime filter based on it's restriction and orderBy createTime in order closely mimic the [OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html] pattern.

* getInitialRestriction: run a query against HL7v2 store to get earliest createTime 
* RestrictionTracker: keep a "watermark" based on createTime
* splitRestriction: split timestamp range based on fraction (this will require an existing HL7v2 Message List call can be notified to "stop paginating through the at an arbitrary createTime". Note that each List query might not get to the end of it's createTime filter (due to splitting). Instead it should just spot processing results when it encounters are record past the end of it's restriction.

This should allow us to more eagerly emit results as certain restrictions are completed.


Open Questions:
* Is there a canonical way of specifying more than one initial restriction based on assumption that you know for most use cases you'll want to split at least n-times upfront (e.g. partition by day/hour to start and dynamically split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional user parameter for the transform? What should the default be (e.g. Instant.now() just before firing the first query)? or Should the ListMessages just scroll until it is "caught up" there are no newer messages?


Jake's two cents:
I consider this List Messages transform to primarily serve a batch / bounded backfill or replay use case. I believe real-time use cases should use the Pub Sub notifications for event driven / streaming updates from the HL7v2 store with HL7v2IO.readAll() (as this allows for much greater parallelization and is more likely to "keep up" during higher throughput). However, there is nothing stopping a user from using ListMessages against a store that is still being updated. If this becomes a splittable DoFn that fires many ListMessages throughtout it's life time and our initial restriction is [minCreateTime,  inf), this could become an unbounded source (or more specifically unbounded per element). I feel we should force ListMessages to be bounded per element by always having a endCreateTime.

> HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-9856
>                 URL: https://issues.apache.org/jira/browse/BEAM-9856
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Jacob Ferriero
>            Assignee: Jacob Ferriero
>            Priority: Minor
>
> Currently the List Messages API paginates through in a single ProcessElement Call.
> However we could get a restriction based on createTime using Messages.List filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes available that should allow splitting on (e.g. create time dimension). Leveraging this bulk export might be  a future optimization to explore.
>  
> This could take one of two forms:
> 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make optimization the runner's problem, potentially unnecessarily complex for this use case )
> 2. static splitting on some time partition e.g. finding the earliest createTime and emitting a PCollection of 1 hour partitions and paginating through each hour of data w/ in the time frame that the store spans, in a separate ProcessElement. (easy to implement but will likely have hot keys / stragglers based on "busy hours")
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)