You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Jacob Ferriero (Jira)" <ji...@apache.org> on 2020/04/30 20:38:00 UTC

[jira] [Commented] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available

    [ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096961#comment-17096961 ] 

Jacob Ferriero commented on BEAM-9856:
--------------------------------------

The existing GA [HL7v2 Messages.List API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list] allows us to specify a filter and order by createTime. We should be able to use this as our restriction dimension to make this a splitable DoFn.

I will investigate feasibility of this.


Basic Design Proposal:
Each Messages.List query will have a createTime filter based on it's restriction and orderBy createTime in order closely mimic the [OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html] pattern.

* getInitialRestriction: run a query against HL7v2 store to get earliest createTime 
* RestrictionTracker: keep a "watermark" based on createTime
* splitRestriction: split timestamp range based on fraction (this will require an existing HL7v2 Message List call can be notified to "stop paginating through the at an arbitrary createTime". Note that each List query might not get to the end of it's createTime filter (due to splitting). Instead it should just spot processing results when it encounters are record past the end of it's restriction.

This should allow us to more eagerly emit results as certain restrictions are completed.


Open Questions:
* Is there a canonical way of specifying more than one initial restriction based on assumption that you know for most use cases you'll want to split at least n-times upfront (e.g. partition by day/hour to start and dynamically split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional user parameter for the transform? What should the default be (e.g. Instant.now() just before firing the first query)? or Should the ListMessages just scroll until it is "caught up" there are no newer messages?


Jake's two cents:
I consider this List Messages transform to primarily serve a batch / bounded backfill or replay use case. I believe real-time use cases should use the Pub Sub notifications for event driven / streaming updates from the HL7v2 store with HL7v2IO.readAll() (as this allows for much greater parallelization and is more likely to "keep up" during higher throughput). However, there is nothing stopping a user from using ListMessages against a store that is still being updated. If this becomes a splittable DoFn that fires many ListMessages throughtout it's life time and our initial restriction is [minCreateTime,  inf), this could become an unbounded source (or more specifically unbounded per element). I feel we should force ListMessages to be bounded per element by always having a endCreateTime.

> HL7v2IO.ListMessages should be refactored once Bulk Export API is available
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-9856
>                 URL: https://issues.apache.org/jira/browse/BEAM-9856
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Jacob Ferriero
>            Assignee: Jacob Ferriero
>            Priority: Major
>
> Currently the List Messages API paginates through in a single ProcessElement Call.
>  
> In the future if a bulk export API becomes available that would allow splitting on some dimension (e.g. create time), this should be refactored as a splittable DoFn or at least run several sub queries so that we are not listing an entire store in a single thread.
>  
> This could look like paginating through each hour of data w/ in the time frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)