You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/08 02:04:38 UTC

[GitHub] [beam] jaketf opened a new pull request #11339: [BEAM-9468] Fhir io

jaketf opened a new pull request #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339
 
 
   adds FhirIO healthcare connector 
   Write use the executeBundle REST API method to execute transactions on the FHIR Store
   Read with read / search / from notification subscription
   
   Follow up to #1151
   
   R: @pabloem 
   CC: @lastomato, @brianlucier
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [x] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611110630
 
 
   Integration test data was bad due to mis-configuration of synthea exporting fhir groups.
   This data is now fixed and tests are passing for all fhir versions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [ ] ValueProvider support
   - [ ] Unit test for FhirIO dead letter handling
   - [ ] Add example usage to javadoc
   - [ ] Add IT for FhirIO.Read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-612264169
 
 
   @lastomato 
   Do you think we should add `FhirIO.Read` implementations that paginate through [fhir.search](https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/search) results?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [x] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions (improves maintainability)
   - [x] Add IT for FhirIO.Read
       - [x] implement scaffolding for test (currently always passes because initial PCollection is empty)
       - ~[ ] Needs convenience method to "read all resource IDs from a FHIR store to populate initial PCollection of resource IDs~
       - [x] implemented test using TestPubsubSignal to verify that resources were successfully read
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf removed a comment on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf removed a comment on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-612264169
 
 
   @lastomato 
   Do you think we should add `FhirIO.Read` implementations that paginate through [fhir.search](https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/search) results?
   
   I'm not sure what use case this really solves and would like to keep it for a later PR if it becomes necessary.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [ ] Unit test for FhirIO dead letter handling
   - [ ] Add IT for FhirIO.Read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-614332334
 
 
   @lastomato I added [GroupIntoBatches](https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html) in the FhirIO.Import path. 
   The logic is:
   - buffer `HttpBody`'s to an iterable until we have 1000 of them (this threshold was chosen arbitrarily)
   - ImportFn updates the ndJson write channel with all 1000 resources
   - FinishBundle will flush the batch: write to file on GCS and trigger import job
   
   This is one way to mitigate the "import job per resource" concern but I'm open to other suggestions for achieving this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [x] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions (improves maintainability)
   - [ ] Add IT for FhirIO.Read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-614118743
 
 
   @chamikaramj FYI this is a WIP follow up to #11151 to add an IO for FHIR store.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [ ] Unit test for FhirIO dead letter handling
   - [x] Add example usage to javadoc
   - [ ] Add IT for FhirIO.Read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [x] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions (improves maintainability)
   - [x] Add IT for FhirIO.Read
       - [x] implement scaffolding for test (currently always passes because initial PCollection is empty)
       - [ ] Needs convenience method to "read all resource IDs from a FHIR store to populate initial PCollection of resource IDs
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [x] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions (improves maintainability)
   - [ ] Add IT for FhirIO.Read
       - [x] implement scaffolding for test (currently always passes because initial PCollection is empty)
       - [ ] Needs convenience method to "read all resource IDs from a FHIR store to populate initial PCollection of resource IDs
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [ ] ValueProvider support
   - [ ] Unit test for FhirIO dead letter handling
   - [ ] Add example usage to javadoc
   - [ ] Benchmark / load test the FhirIO.Import

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-614713471
 
 
   FhirIOReadIT was flaky due to stale data (from previously run tests) sticking around in pubsub topic.
   This causes the read to try to read a resource from a FHIR store that no longer exists. 
   While the behavior of the IO was WAI writing these to the DLQ, I've improved the test setup to create / destroy the pubsub topic for each test.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [x] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions (improves maintainability)
   - [x] Add IT for FhirIO.Read
       - [x] implement scaffolding for test (currently always passes because initial PCollection is empty)
       - ~[ ] Needs convenience method to "read all resource IDs from a FHIR store to populate initial PCollection of resource IDs~
       - [x] implemented test using TestPubsubSignal to verify that resources were successfully read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chamikaramj commented on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-615300099
 
 
   Seems like one of the new tests is flaky.
   
   https://builds.apache.org/job/beam_PostCommit_Java/5947/
   https://builds.apache.org/job/beam_PostCommit_Java/5943/
   https://builds.apache.org/job/beam_PostCommit_Java/5942/

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [ ] Add IT for FhirIO.Read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf removed a comment on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf removed a comment on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611110630
 
 
   Integration test data was bad due to mis-configuration of synthea exporting fhir groups.
   This data is now fixed and tests are passing for all fhir versions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#discussion_r408943866
 
 

 ##########
 File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
 ##########
 @@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import com.google.api.services.healthcare.v1beta1.model.Operation;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Write.Result;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.codehaus.jackson.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FhirIO} provides an API for reading and writing resources to <a
+ * href="https://cloud.google.com/healthcare/docs/concepts/fhir">Google Cloud Healthcare Fhir API.
+ * </a>
+ *
+ * <p>Read
+ *
+ * <p>FHIR resources can be read with {@link FhirIO.Read} supports use cases where you have a
+ * ${@link PCollection} of message IDS. This is appropriate for reading the Fhir notifications from
+ * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
+ * prepared list of messages that you need to process (e.g. in a text file read with {@link
+ * org.apache.beam.sdk.io.TextIO}) .
+ *
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
+ * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieved a
+ * {@link PCollection} containing the successfully fetched {@link HttpBody}s and/or {@link
+ * FhirIO.Read.Result#getFailedReads()} to retrieve a {@link PCollection} of {@link
+ * HealthcareIOError} containing the resource ID that could not be fetched and the exception as a
+ * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
+ * choosing. This error handling is mainly to transparently surface errors where the upstream {@link
+ * PCollection} contains IDs that are not valid or are not reachable due to permissions issues.
+ *
+ * <p>Write Resources can be written to FHIR with two different methods: Import or Execute Bundle.
+ *
+ * <p>Execute Bundle This is best for use cases where you are writing to a non-empty FHIR store with
+ * other clients or otherwise need referential integrity (e.g. A Streaming HL7v2 to FHIR ETL
+ * pipeline).
+ *
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>
+ *     <p>Import This is best for use cases where you are populating an empty FHIR store with no
+ *     other clients. It is faster than the execute bundles method but does not respect referential
+ *     integrity and the resources are not written transactionally (e.g. a historicaly backfill on a
+ *     new FHIR store) This requires each resource to contain a client provided ID. It is important
+ *     that when using import you give the appropriate permissions to the Google Cloud Healthcare
+ *     Service Agent
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a>
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>
+ *     A {@link PCollection} of {@link HttpBody} can be ingested into an Fhir store using {@link
+ *     FhirIO.Write#fhirStoresImport(String, String, String, ContentStructure)} This will return a
+ *     {@link FhirIO.Write.Result} on which you can call {@link
+ *     FhirIO.Write.Result#getFailedInsertsWithErr()} to retrieve a {@link PCollection} of {@link
+ *     HealthcareIOError} containing the {@link HttpBody} that failed to be ingested and the
+ *     exception.
+ */
+public class FhirIO {
+
+  @Experimental
+  public static class Read extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+    public Read() {
+      LOG.warn("FhirIO.Read is experimental and has not been tested.");
+    }
+
+    public static class Result implements POutput, PInput {
+      private PCollection<HttpBody> resources;
+
+      private PCollection<HealthcareIOError<String>> failedReads;
+      PCollectionTuple pct;
+
+      public static FhirIO.Read.Result of(PCollectionTuple pct) throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(OUT).and(DEAD_LETTER))) {
+          return new FhirIO.Read.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the FhirIO.Read.OUT "
+                  + "and FhirIO.Read.DEAD_LETTER tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.resources = pct.get(OUT).setCoder(new HttpBodyCoder());
+        this.failedReads =
+            pct.get(DEAD_LETTER).setCoder(new HealthcareIOErrorCoder<>(StringUtf8Coder.of()));
+      }
+
+      public PCollection<HealthcareIOError<String>> getFailedReads() {
+        return failedReads;
+      }
+
+      public PCollection<HttpBody> getResources() {
+        return resources;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(OUT, resources);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /** The tag for the main output of Fhir Messages. */
+    public static final TupleTag<HttpBody> OUT = new TupleTag<HttpBody>() {};
+    /** The tag for the deadletter output of Fhir Messages. */
+    public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+        new TupleTag<HealthcareIOError<String>>() {};
+
+    @Override
+    public FhirIO.Read.Result expand(PCollection<String> input) {
+      return input.apply("Fetch Fhir messages", new FhirIO.Read.FetchHttpBody());
+    }
+
+    /**
+     * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store based on resourceID
+     *
+     * <p>This DoFn consumes a {@link PCollection} of notifications {@link String}s from the FHIR
+     * store, and fetches the actual {@link HttpBody} object based on the id in the notification and
+     * will output a {@link PCollectionTuple} which contains the output and dead-letter {@link
+     * PCollection}.
+     *
+     * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
+     *
+     * <ul>
+     *   <li>{@link FhirIO.Read#OUT} - Contains all {@link PCollection} records successfully read
+     *       from the Fhir store.
+     *   <li>{@link FhirIO.Read#DEAD_LETTER} - Contains all {@link PCollection} of {@link
+     *       HealthcareIOError} of message IDs which failed to be fetched from the Fhir store, with
+     *       error message and stacktrace.
+     * </ul>
+     */
+    public static class FetchHttpBody extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+
+      /** Instantiates a new Fetch Fhir message DoFn. */
+      public FetchHttpBody() {}
+
+      @Override
+      public FhirIO.Read.Result expand(PCollection<String> resourceIds) {
+        return new FhirIO.Read.Result(
+            resourceIds.apply(
+                ParDo.of(new FhirIO.Read.FetchHttpBody.HttpBodyGetFn())
+                    .withOutputTags(FhirIO.Read.OUT, TupleTagList.of(FhirIO.Read.DEAD_LETTER))));
+      }
+
+      /** DoFn for fetching messages from the Fhir store with error handling. */
+      public static class HttpBodyGetFn extends DoFn<String, HttpBody> {
+
+        private Counter failedMessageGets =
+            Metrics.counter(FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class, "failed-message-reads");
+        private static final Logger LOG =
+            LoggerFactory.getLogger(FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class);
+        private final Counter successfulHttpBodyGets =
+            Metrics.counter(
+                FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class, "successful-hl7v2-message-gets");
+        private HealthcareApiClient client;
+
+        /** Instantiates a new Hl 7 v 2 message get fn. */
+        HttpBodyGetFn() {}
+
+        /**
+         * Instantiate healthcare client.
+         *
+         * @throws IOException the io exception
+         */
+        @Setup
+        public void instantiateHealthcareClient() throws IOException {
+          this.client = new HttpHealthcareApiClient();
+        }
+
+        /**
+         * Process element.
+         *
+         * @param context the context
+         */
+        @ProcessElement
+        public void processElement(ProcessContext context) {
+          String resourceId = context.element();
+          try {
+            context.output(fetchResource(this.client, resourceId));
+          } catch (Exception e) {
+            failedMessageGets.inc();
+            LOG.warn(
+                String.format(
+                    "Error fetching Fhir message with ID %s writing to Dead Letter "
+                        + "Queue. Cause: %s Stack Trace: %s",
+                    resourceId, e.getMessage(), Throwables.getStackTraceAsString(e)));
+            context.output(FhirIO.Read.DEAD_LETTER, HealthcareIOError.of(resourceId, e));
+          }
+        }
+
+        private HttpBody fetchResource(HealthcareApiClient client, String resourceId)
+            throws IOException, IllegalArgumentException, InterruptedException {
+          long startTime = System.currentTimeMillis();
+          Sleeper sleeper = Sleeper.DEFAULT;
+
+          com.google.api.services.healthcare.v1beta1.model.HttpBody resource =
+              client.readFhirResource(resourceId);
+
+          if (resource == null) {
+            throw new IOException(String.format("GET request for %s returned null", resourceId));
+          }
+          this.successfulHttpBodyGets.inc();
+          return resource;
+        }
+      }
+    }
+  }
+
+  /** The type Write. */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<HttpBody>, Write.Result> {
+
+    /** The tag for the failed writes to FHIR store`. */
+    public static final TupleTag<HealthcareIOError<HttpBody>> FAILED_BODY =
+        new TupleTag<HealthcareIOError<HttpBody>>() {};
+
+    /** The enum Write method. */
+    public enum WriteMethod {
+      /**
+       * Execute Bundle Method executes a batch of requests as a single transaction @see <a
+       * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
+       */
+      EXECUTE_BUNDLE,
+      /**
+       * Import Method bulk imports resources from GCS. This is ideal for initial loads to empty
+       * FHIR stores. <a
+       * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>.
+       */
+      IMPORT
+    }
+
+    public static class Result implements POutput {
+      private final Pipeline pipeline;
+      private final PCollection<HealthcareIOError<HttpBody>> failedInsertsWithErr;
+
+      /** Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. */
+      static Result in(Pipeline pipeline, PCollection<HealthcareIOError<HttpBody>> failedInserts) {
+        return new Result(pipeline, failedInserts);
+      }
+
+      public PCollection<HealthcareIOError<HttpBody>> getFailedInsertsWithErr() {
+        return this.failedInsertsWithErr;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pipeline;
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        failedInsertsWithErr.setCoder(new HealthcareIOErrorCoder<HttpBody>(new HttpBodyCoder()));
+        return ImmutableMap.of(Write.FAILED_BODY, failedInsertsWithErr);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+
+      private Result(
+          Pipeline pipeline, PCollection<HealthcareIOError<HttpBody>> failedInsertsWithErr) {
+        this.pipeline = pipeline;
+        this.failedInsertsWithErr = failedInsertsWithErr;
+      }
+    }
+
+    /**
+     * Gets Fhir store.
+     *
+     * @return the Fhir store
+     */
+    abstract String getFhirStore();
+
+    /**
+     * Gets write method.
+     *
+     * @return the write method
+     */
+    abstract WriteMethod getWriteMethod();
+
+    abstract Optional<ContentStructure> getContentStructure();
+
+    abstract Optional<String> getImportGcsTempPath();
+
+    abstract Optional<String> getImportGcsDeadLetterPath();
+
+    /** The type Builder. */
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      /**
+       * Sets Fhir store.
+       *
+       * @param fhirStore the Fhir store
+       * @return the Fhir store
+       */
+      abstract Builder setFhirStore(String fhirStore);
+
+      /**
+       * Sets write method.
+       *
+       * @param writeMethod the write method
+       * @return the write method
+       */
+      abstract Builder setWriteMethod(WriteMethod writeMethod);
+
+      abstract Builder setContentStructure(ContentStructure contentStructure);
+
+      abstract Builder setImportGcsTempPath(String gcsTempPath);
+
+      abstract Builder setImportGcsDeadLetterPath(String gcsDeadLetterPath);
+
+      /**
+       * Build write.
+       *
+       * @return the write
+       */
+      abstract Write build();
+    }
+
+    private static Write.Builder write(String fhirStore) {
+      return new AutoValue_FhirIO_Write.Builder().setFhirStore(fhirStore);
+    }
+
+    /**
+     * Create Method creates a single FHIR resource. @see <a
+     * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create></a>
+     *
+     * @param fhirStore the hl 7 v 2 store
+     * @return the write
+     */
+    public static Write fhirStoresImport(
+        String fhirStore,
+        String gcsTempPath,
+        String gcsDeadLetterPath,
+        @Nullable ContentStructure contentStructure) {
+      return new AutoValue_FhirIO_Write.Builder()
+          .setFhirStore(fhirStore)
+          .setWriteMethod(Write.WriteMethod.IMPORT)
+          .setContentStructure(contentStructure)
+          .setImportGcsTempPath(gcsTempPath)
+          .setImportGcsDeadLetterPath(gcsDeadLetterPath)
+          .build();
+    }
+
+    /**
+     * Execute Bundle Method executes a batch of requests as a single transaction @see <a
+     * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
+     *
+     * @param fhirStore the hl 7 v 2 store
+     * @return the write
+     */
+    public static Write executeBundles(String fhirStore) {
+      return new AutoValue_FhirIO_Write.Builder()
+          .setFhirStore(fhirStore)
+          .setWriteMethod(WriteMethod.EXECUTE_BUNDLE)
+          .build();
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+
+    @Override
+    public Result expand(PCollection<HttpBody> input) {
+      PCollection<HealthcareIOError<HttpBody>> failedBundles;
+      PCollection<HealthcareIOError<String>> failedImports;
+      switch (this.getWriteMethod()) {
+        case IMPORT:
+          LOG.warn(
+              "Make sure the Cloud Healthcare Service Agent has permissions when using import:"
+                  + " https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions");
+          String tempPath = getImportGcsTempPath().orElseThrow(IllegalArgumentException::new);
+          String deadPath = getImportGcsDeadLetterPath().orElseThrow(IllegalArgumentException::new);
+          ContentStructure contentStructure =
+              getContentStructure().orElseThrow(IllegalArgumentException::new);
+
+          failedBundles =
+              input
+                  .apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure))
+                  .setCoder(new HealthcareIOErrorCoder<>(new HttpBodyCoder()));
+          // fall through
+        case EXECUTE_BUNDLE:
+        default:
+          failedBundles =
+              input.apply(
+                  "Execute FHIR Bundles",
+                  ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore())));
+      }
+      return Result.in(input.getPipeline(), failedBundles);
+    }
+  }
+
+  /**
+   * Writes each bundle of elements to a new-line delimited JSON file on GCS and issues a
+   * fhirStores.import Request for that file.
+   */
+  public static class Import
+      extends PTransform<PCollection<HttpBody>, PCollection<HealthcareIOError<HttpBody>>> {
+
+    private final String fhirStore;
+    private final String tempGcsPath;
+    private final String deadLetterGcsPath;
+    private final ContentStructure contentStructure;
+
+    Import(
+        String fhirStore,
+        String tempGcsPath,
+        String deadLetterGcsPath,
+        @Nullable ContentStructure contentStructure) {
+      this.fhirStore = fhirStore;
+      this.tempGcsPath = tempGcsPath;
+      this.deadLetterGcsPath = deadLetterGcsPath;
+      if (contentStructure == null) {
+        this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED;
+      } else {
+        this.contentStructure = contentStructure;
+      }
+    }
+
+    @Override
+    public PCollection<HealthcareIOError<HttpBody>> expand(PCollection<HttpBody> input) {
+      return input.apply(
+          ParDo.of(new ImportFn(fhirStore, tempGcsPath, deadLetterGcsPath, contentStructure)));
+    }
+
+    public enum ContentStructure {
+      /** If the content structure is not specified, the default value BUNDLE will be used. */
+      CONTENT_STRUCTURE_UNSPECIFIED,
+      /**
+       * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is
+       * a bundle, which contains one or more resources. Set the bundle type to history to import
+       * resource versions.
+       */
+      BUNDLE,
+      /**
+       * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is
+       * a single resource.
+       */
+      RESOURCE,
+      /** The entire file is one JSON bundle. The JSON can span multiple lines. */
+      BUNDLE_PRETTY,
+      /** The entire file is one JSON resource. The JSON can span multiple lines. */
+      RESOURCE_PRETTY
+    }
+
+    static class ImportFn extends DoFn<HttpBody, HealthcareIOError<HttpBody>> {
+      private final String fhirStore;
+      private final String tempGcsPath;
+      private final String deadLetterGcsPath;
+      private final ContentStructure contentStructure;
+      private ObjectMapper mapper;
+      private ResourceId resourceId;
+      private ResourceId deadLetterResourceId;
+      private WritableByteChannel ndJsonChannel;
+
+      private transient HealthcareApiClient client;
+      private static final Logger LOG = LoggerFactory.getLogger(ImportFn.class);
+
+      ImportFn(
+          String fhirStore,
+          String tempGcsPath,
+          String deadLetterGcsPath,
+          @Nullable ContentStructure contentStructure) {
+        this.fhirStore = fhirStore;
+        this.tempGcsPath = tempGcsPath;
+        this.deadLetterGcsPath = deadLetterGcsPath;
+        if (contentStructure == null) {
+          this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED;
+        } else {
+          this.contentStructure = contentStructure;
+        }
+      }
+
+      @Setup
+      public void initClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
+      }
+
+      @StartBundle
+      public void initBatch() throws IOException {
 
 Review comment:
   @pabloem is there a good way to signal to the runner to prefer large bundles for this DoFn? Are there any other common patterns for batching files for bulk import operations that are less complicated than the BQ File Loads (which seem to have a lot of complexity to handle dynamic destinations)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lastomato commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
lastomato commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#discussion_r405761171
 
 

 ##########
 File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
 ##########
 @@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import com.google.api.services.healthcare.v1beta1.model.Operation;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Write.Result;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.codehaus.jackson.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FhirIO} provides an API for reading and writing resources to <a
+ * href="https://cloud.google.com/healthcare/docs/concepts/fhir">Google Cloud Healthcare Fhir API.
+ * </a>
+ *
+ * <p>Read
+ *
+ * <p>FHIR resources can be read with {@link FhirIO.Read} supports use cases where you have a
+ * ${@link PCollection} of message IDS. This is appropriate for reading the Fhir notifications from
+ * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
+ * prepared list of messages that you need to process (e.g. in a text file read with {@link
+ * org.apache.beam.sdk.io.TextIO}) .
+ *
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
+ * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieved a
+ * {@link PCollection} containing the successfully fetched {@link HttpBody}s and/or {@link
+ * FhirIO.Read.Result#getFailedReads()} to retrieve a {@link PCollection} of {@link
+ * HealthcareIOError} containing the resource ID that could not be fetched and the exception as a
+ * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
+ * choosing. This error handling is mainly to transparently surface errors where the upstream {@link
+ * PCollection} contains IDs that are not valid or are not reachable due to permissions issues.
+ *
+ * <p>Write Resources can be written to FHIR with two different methods: Import or Execute Bundle.
+ *
+ * <p>Execute Bundle This is best for use cases where you are writing to a non-empty FHIR store with
+ * other clients or otherwise need referential integrity (e.g. A Streaming HL7v2 to FHIR ETL
+ * pipeline).
+ *
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>
+ *     <p>Import This is best for use cases where you are populating an empty FHIR store with no
+ *     other clients. It is faster than the execute bundles method but does not respect referential
+ *     integrity and the resources are not written transactionally (e.g. a historicaly backfill on a
+ *     new FHIR store) This requires each resource to contain a client provided ID. It is important
+ *     that when using import you give the appropriate permissions to the Google Cloud Healthcare
+ *     Service Agent
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a>
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>
+ *     A {@link PCollection} of {@link HttpBody} can be ingested into an Fhir store using {@link
+ *     FhirIO.Write#fhirStoresImport(String, String, String, ContentStructure)} This will return a
+ *     {@link FhirIO.Write.Result} on which you can call {@link
+ *     FhirIO.Write.Result#getFailedInsertsWithErr()} to retrieve a {@link PCollection} of {@link
+ *     HealthcareIOError} containing the {@link HttpBody} that failed to be ingested and the
+ *     exception.
+ */
+public class FhirIO {
+
+  @Experimental
+  public static class Read extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+    public Read() {
+      LOG.warn("FhirIO.Read is experimental and has not been tested.");
+    }
+
+    public static class Result implements POutput, PInput {
+      private PCollection<HttpBody> resources;
+
+      private PCollection<HealthcareIOError<String>> failedReads;
+      PCollectionTuple pct;
+
+      public static FhirIO.Read.Result of(PCollectionTuple pct) throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(OUT).and(DEAD_LETTER))) {
+          return new FhirIO.Read.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the FhirIO.Read.OUT "
+                  + "and FhirIO.Read.DEAD_LETTER tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.resources = pct.get(OUT).setCoder(new HttpBodyCoder());
+        this.failedReads =
+            pct.get(DEAD_LETTER).setCoder(new HealthcareIOErrorCoder<>(StringUtf8Coder.of()));
+      }
+
+      public PCollection<HealthcareIOError<String>> getFailedReads() {
+        return failedReads;
+      }
+
+      public PCollection<HttpBody> getResources() {
+        return resources;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(OUT, resources);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /** The tag for the main output of Fhir Messages. */
+    public static final TupleTag<HttpBody> OUT = new TupleTag<HttpBody>() {};
+    /** The tag for the deadletter output of Fhir Messages. */
+    public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+        new TupleTag<HealthcareIOError<String>>() {};
+
+    @Override
+    public FhirIO.Read.Result expand(PCollection<String> input) {
+      return input.apply("Fetch Fhir messages", new FhirIO.Read.FetchHttpBody());
+    }
+
+    /**
+     * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store based on resourceID
+     *
+     * <p>This DoFn consumes a {@link PCollection} of notifications {@link String}s from the FHIR
+     * store, and fetches the actual {@link HttpBody} object based on the id in the notification and
+     * will output a {@link PCollectionTuple} which contains the output and dead-letter {@link
+     * PCollection}.
+     *
+     * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
+     *
+     * <ul>
+     *   <li>{@link FhirIO.Read#OUT} - Contains all {@link PCollection} records successfully read
+     *       from the Fhir store.
+     *   <li>{@link FhirIO.Read#DEAD_LETTER} - Contains all {@link PCollection} of {@link
+     *       HealthcareIOError} of message IDs which failed to be fetched from the Fhir store, with
+     *       error message and stacktrace.
+     * </ul>
+     */
+    public static class FetchHttpBody extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+
+      /** Instantiates a new Fetch Fhir message DoFn. */
+      public FetchHttpBody() {}
+
+      @Override
+      public FhirIO.Read.Result expand(PCollection<String> resourceIds) {
+        return new FhirIO.Read.Result(
+            resourceIds.apply(
+                ParDo.of(new FhirIO.Read.FetchHttpBody.HttpBodyGetFn())
+                    .withOutputTags(FhirIO.Read.OUT, TupleTagList.of(FhirIO.Read.DEAD_LETTER))));
+      }
+
+      /** DoFn for fetching messages from the Fhir store with error handling. */
+      public static class HttpBodyGetFn extends DoFn<String, HttpBody> {
+
+        private Counter failedMessageGets =
+            Metrics.counter(FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class, "failed-message-reads");
+        private static final Logger LOG =
+            LoggerFactory.getLogger(FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class);
+        private final Counter successfulHttpBodyGets =
+            Metrics.counter(
+                FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class, "successful-hl7v2-message-gets");
+        private HealthcareApiClient client;
+
+        /** Instantiates a new Hl 7 v 2 message get fn. */
+        HttpBodyGetFn() {}
+
+        /**
+         * Instantiate healthcare client.
+         *
+         * @throws IOException the io exception
+         */
+        @Setup
+        public void instantiateHealthcareClient() throws IOException {
+          this.client = new HttpHealthcareApiClient();
+        }
+
+        /**
+         * Process element.
+         *
+         * @param context the context
+         */
+        @ProcessElement
+        public void processElement(ProcessContext context) {
+          String resourceId = context.element();
+          try {
+            context.output(fetchResource(this.client, resourceId));
+          } catch (Exception e) {
+            failedMessageGets.inc();
+            LOG.warn(
+                String.format(
+                    "Error fetching Fhir message with ID %s writing to Dead Letter "
+                        + "Queue. Cause: %s Stack Trace: %s",
+                    resourceId, e.getMessage(), Throwables.getStackTraceAsString(e)));
+            context.output(FhirIO.Read.DEAD_LETTER, HealthcareIOError.of(resourceId, e));
+          }
+        }
+
+        private HttpBody fetchResource(HealthcareApiClient client, String resourceId)
+            throws IOException, IllegalArgumentException, InterruptedException {
+          long startTime = System.currentTimeMillis();
+          Sleeper sleeper = Sleeper.DEFAULT;
+
+          com.google.api.services.healthcare.v1beta1.model.HttpBody resource =
+              client.readFhirResource(resourceId);
+
+          if (resource == null) {
+            throw new IOException(String.format("GET request for %s returned null", resourceId));
+          }
+          this.successfulHttpBodyGets.inc();
+          return resource;
+        }
+      }
+    }
+  }
+
+  /** The type Write. */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<HttpBody>, Write.Result> {
+
+    /** The tag for the failed writes to FHIR store`. */
+    public static final TupleTag<HealthcareIOError<HttpBody>> FAILED_BODY =
+        new TupleTag<HealthcareIOError<HttpBody>>() {};
+
+    /** The enum Write method. */
+    public enum WriteMethod {
+      /**
+       * Execute Bundle Method executes a batch of requests as a single transaction @see <a
+       * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
+       */
+      EXECUTE_BUNDLE,
+      /**
+       * Import Method bulk imports resources from GCS. This is ideal for initial loads to empty
+       * FHIR stores. <a
+       * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>.
+       */
+      IMPORT
+    }
+
+    public static class Result implements POutput {
+      private final Pipeline pipeline;
+      private final PCollection<HealthcareIOError<HttpBody>> failedInsertsWithErr;
+
+      /** Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. */
+      static Result in(Pipeline pipeline, PCollection<HealthcareIOError<HttpBody>> failedInserts) {
+        return new Result(pipeline, failedInserts);
+      }
+
+      public PCollection<HealthcareIOError<HttpBody>> getFailedInsertsWithErr() {
+        return this.failedInsertsWithErr;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pipeline;
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        failedInsertsWithErr.setCoder(new HealthcareIOErrorCoder<HttpBody>(new HttpBodyCoder()));
+        return ImmutableMap.of(Write.FAILED_BODY, failedInsertsWithErr);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+
+      private Result(
+          Pipeline pipeline, PCollection<HealthcareIOError<HttpBody>> failedInsertsWithErr) {
+        this.pipeline = pipeline;
+        this.failedInsertsWithErr = failedInsertsWithErr;
+      }
+    }
+
+    /**
+     * Gets Fhir store.
+     *
+     * @return the Fhir store
+     */
+    abstract String getFhirStore();
+
+    /**
+     * Gets write method.
+     *
+     * @return the write method
+     */
+    abstract WriteMethod getWriteMethod();
+
+    abstract Optional<ContentStructure> getContentStructure();
+
+    abstract Optional<String> getImportGcsTempPath();
+
+    abstract Optional<String> getImportGcsDeadLetterPath();
+
+    /** The type Builder. */
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      /**
+       * Sets Fhir store.
+       *
+       * @param fhirStore the Fhir store
+       * @return the Fhir store
+       */
+      abstract Builder setFhirStore(String fhirStore);
+
+      /**
+       * Sets write method.
+       *
+       * @param writeMethod the write method
+       * @return the write method
+       */
+      abstract Builder setWriteMethod(WriteMethod writeMethod);
+
+      abstract Builder setContentStructure(ContentStructure contentStructure);
+
+      abstract Builder setImportGcsTempPath(String gcsTempPath);
+
+      abstract Builder setImportGcsDeadLetterPath(String gcsDeadLetterPath);
+
+      /**
+       * Build write.
+       *
+       * @return the write
+       */
+      abstract Write build();
+    }
+
+    private static Write.Builder write(String fhirStore) {
+      return new AutoValue_FhirIO_Write.Builder().setFhirStore(fhirStore);
+    }
+
+    /**
+     * Create Method creates a single FHIR resource. @see <a
+     * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create></a>
+     *
+     * @param fhirStore the hl 7 v 2 store
+     * @return the write
+     */
+    public static Write fhirStoresImport(
+        String fhirStore,
+        String gcsTempPath,
+        String gcsDeadLetterPath,
+        @Nullable ContentStructure contentStructure) {
+      return new AutoValue_FhirIO_Write.Builder()
+          .setFhirStore(fhirStore)
+          .setWriteMethod(Write.WriteMethod.IMPORT)
+          .setContentStructure(contentStructure)
+          .setImportGcsTempPath(gcsTempPath)
+          .setImportGcsDeadLetterPath(gcsDeadLetterPath)
+          .build();
+    }
+
+    /**
+     * Execute Bundle Method executes a batch of requests as a single transaction @see <a
+     * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
+     *
+     * @param fhirStore the hl 7 v 2 store
+     * @return the write
+     */
+    public static Write executeBundles(String fhirStore) {
+      return new AutoValue_FhirIO_Write.Builder()
+          .setFhirStore(fhirStore)
+          .setWriteMethod(WriteMethod.EXECUTE_BUNDLE)
+          .build();
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+
+    @Override
+    public Result expand(PCollection<HttpBody> input) {
+      PCollection<HealthcareIOError<HttpBody>> failedBundles;
+      PCollection<HealthcareIOError<String>> failedImports;
+      switch (this.getWriteMethod()) {
+        case IMPORT:
+          LOG.warn(
+              "Make sure the Cloud Healthcare Service Agent has permissions when using import:"
+                  + " https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions");
+          String tempPath = getImportGcsTempPath().orElseThrow(IllegalArgumentException::new);
+          String deadPath = getImportGcsDeadLetterPath().orElseThrow(IllegalArgumentException::new);
+          ContentStructure contentStructure =
+              getContentStructure().orElseThrow(IllegalArgumentException::new);
+
+          failedBundles =
+              input
+                  .apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure))
+                  .setCoder(new HealthcareIOErrorCoder<>(new HttpBodyCoder()));
+          // fall through
+        case EXECUTE_BUNDLE:
+        default:
+          failedBundles =
+              input.apply(
+                  "Execute FHIR Bundles",
+                  ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore())));
+      }
+      return Result.in(input.getPipeline(), failedBundles);
+    }
+  }
+
+  /**
+   * Writes each bundle of elements to a new-line delimited JSON file on GCS and issues a
+   * fhirStores.import Request for that file.
+   */
+  public static class Import
+      extends PTransform<PCollection<HttpBody>, PCollection<HealthcareIOError<HttpBody>>> {
+
+    private final String fhirStore;
+    private final String tempGcsPath;
+    private final String deadLetterGcsPath;
+    private final ContentStructure contentStructure;
+
+    Import(
+        String fhirStore,
+        String tempGcsPath,
+        String deadLetterGcsPath,
+        @Nullable ContentStructure contentStructure) {
+      this.fhirStore = fhirStore;
+      this.tempGcsPath = tempGcsPath;
+      this.deadLetterGcsPath = deadLetterGcsPath;
+      if (contentStructure == null) {
+        this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED;
+      } else {
+        this.contentStructure = contentStructure;
+      }
+    }
+
+    @Override
+    public PCollection<HealthcareIOError<HttpBody>> expand(PCollection<HttpBody> input) {
+      return input.apply(
+          ParDo.of(new ImportFn(fhirStore, tempGcsPath, deadLetterGcsPath, contentStructure)));
+    }
+
+    public enum ContentStructure {
+      /** If the content structure is not specified, the default value BUNDLE will be used. */
+      CONTENT_STRUCTURE_UNSPECIFIED,
+      /**
+       * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is
+       * a bundle, which contains one or more resources. Set the bundle type to history to import
+       * resource versions.
+       */
+      BUNDLE,
+      /**
+       * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is
+       * a single resource.
+       */
+      RESOURCE,
+      /** The entire file is one JSON bundle. The JSON can span multiple lines. */
+      BUNDLE_PRETTY,
+      /** The entire file is one JSON resource. The JSON can span multiple lines. */
+      RESOURCE_PRETTY
+    }
+
+    static class ImportFn extends DoFn<HttpBody, HealthcareIOError<HttpBody>> {
+      private final String fhirStore;
+      private final String tempGcsPath;
+      private final String deadLetterGcsPath;
+      private final ContentStructure contentStructure;
+      private ObjectMapper mapper;
+      private ResourceId resourceId;
+      private ResourceId deadLetterResourceId;
+      private WritableByteChannel ndJsonChannel;
+
+      private transient HealthcareApiClient client;
+      private static final Logger LOG = LoggerFactory.getLogger(ImportFn.class);
+
+      ImportFn(
+          String fhirStore,
+          String tempGcsPath,
+          String deadLetterGcsPath,
+          @Nullable ContentStructure contentStructure) {
+        this.fhirStore = fhirStore;
+        this.tempGcsPath = tempGcsPath;
+        this.deadLetterGcsPath = deadLetterGcsPath;
+        if (contentStructure == null) {
+          this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED;
+        } else {
+          this.contentStructure = contentStructure;
+        }
+      }
+
+      @Setup
+      public void initClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
+      }
+
+      @StartBundle
+      public void initBatch() throws IOException {
 
 Review comment:
   I am a little worried about the fact that the bundle size is unknown. In extreme (though unlikely) cases, we could be making one import API call per resource.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf removed a comment on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf removed a comment on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-610710580
 
 
   Currently The R4 Integration test is failing.
   Will look into this tomorrow.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [x] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions (improves maintainability)
   - [ ] Add IT for FhirIO.Read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-614118743
 
 
   @chamikaramj FYI this is a WIP follow up to #11151 to add an IO for FHIR store. It is less straight forward and I have some questions about signalling to runner to process large bundles or other methods for batching files up for bulk imports.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] chamikaramj commented on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-615300580
 
 
   Sorry wrong PR. Should be https://github.com/apache/beam/pull/11151

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#discussion_r405722411
 
 

 ##########
 File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
 ##########
 @@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import com.google.api.services.healthcare.v1beta1.model.Operation;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Write.Result;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.codehaus.jackson.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FhirIO} provides an API for reading and writing resources to <a
+ * href="https://cloud.google.com/healthcare/docs/concepts/fhir">Google Cloud Healthcare Fhir API.
+ * </a>
+ *
+ * <p>Read
+ *
+ * <p>FHIR resources can be read with {@link FhirIO.Read} supports use cases where you have a
+ * ${@link PCollection} of message IDS. This is appropriate for reading the Fhir notifications from
+ * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
+ * prepared list of messages that you need to process (e.g. in a text file read with {@link
+ * org.apache.beam.sdk.io.TextIO}) .
+ *
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
+ * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieved a
+ * {@link PCollection} containing the successfully fetched {@link HttpBody}s and/or {@link
+ * FhirIO.Read.Result#getFailedReads()} to retrieve a {@link PCollection} of {@link
+ * HealthcareIOError} containing the resource ID that could not be fetched and the exception as a
+ * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
+ * choosing. This error handling is mainly to transparently surface errors where the upstream {@link
+ * PCollection} contains IDs that are not valid or are not reachable due to permissions issues.
+ *
+ * <p>Write Resources can be written to FHIR with two different methods: Import or Execute Bundle.
+ *
+ * <p>Execute Bundle This is best for use cases where you are writing to a non-empty FHIR store with
+ * other clients or otherwise need referential integrity (e.g. A Streaming HL7v2 to FHIR ETL
+ * pipeline).
+ *
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>
+ *     <p>Import This is best for use cases where you are populating an empty FHIR store with no
+ *     other clients. It is faster than the execute bundles method but does not respect referential
+ *     integrity and the resources are not written transactionally (e.g. a historicaly backfill on a
+ *     new FHIR store) This requires each resource to contain a client provided ID. It is important
+ *     that when using import you give the appropriate permissions to the Google Cloud Healthcare
+ *     Service Agent
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a>
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>
+ *     A {@link PCollection} of {@link HttpBody} can be ingested into an Fhir store using {@link
+ *     FhirIO.Write#fhirStoresImport(String, String, String, ContentStructure)} This will return a
+ *     {@link FhirIO.Write.Result} on which you can call {@link
+ *     FhirIO.Write.Result#getFailedInsertsWithErr()} to retrieve a {@link PCollection} of {@link
+ *     HealthcareIOError} containing the {@link HttpBody} that failed to be ingested and the
+ *     exception.
+ */
+public class FhirIO {
+
+  @Experimental
+  public static class Read extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+    public Read() {
+      LOG.warn("FhirIO.Read is experimental and has not been tested.");
+    }
+
+    public static class Result implements POutput, PInput {
+      private PCollection<HttpBody> resources;
+
+      private PCollection<HealthcareIOError<String>> failedReads;
+      PCollectionTuple pct;
+
+      public static FhirIO.Read.Result of(PCollectionTuple pct) throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(OUT).and(DEAD_LETTER))) {
+          return new FhirIO.Read.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the FhirIO.Read.OUT "
+                  + "and FhirIO.Read.DEAD_LETTER tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.resources = pct.get(OUT).setCoder(new HttpBodyCoder());
+        this.failedReads =
+            pct.get(DEAD_LETTER).setCoder(new HealthcareIOErrorCoder<>(StringUtf8Coder.of()));
+      }
+
+      public PCollection<HealthcareIOError<String>> getFailedReads() {
+        return failedReads;
+      }
+
+      public PCollection<HttpBody> getResources() {
+        return resources;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(OUT, resources);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /** The tag for the main output of Fhir Messages. */
+    public static final TupleTag<HttpBody> OUT = new TupleTag<HttpBody>() {};
+    /** The tag for the deadletter output of Fhir Messages. */
+    public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+        new TupleTag<HealthcareIOError<String>>() {};
+
+    @Override
+    public FhirIO.Read.Result expand(PCollection<String> input) {
+      return input.apply("Fetch Fhir messages", new FhirIO.Read.FetchHttpBody());
+    }
+
+    /**
+     * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store based on resourceID
+     *
+     * <p>This DoFn consumes a {@link PCollection} of notifications {@link String}s from the FHIR
+     * store, and fetches the actual {@link HttpBody} object based on the id in the notification and
+     * will output a {@link PCollectionTuple} which contains the output and dead-letter {@link
+     * PCollection}.
+     *
+     * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
+     *
+     * <ul>
+     *   <li>{@link FhirIO.Read#OUT} - Contains all {@link PCollection} records successfully read
+     *       from the Fhir store.
+     *   <li>{@link FhirIO.Read#DEAD_LETTER} - Contains all {@link PCollection} of {@link
+     *       HealthcareIOError} of message IDs which failed to be fetched from the Fhir store, with
+     *       error message and stacktrace.
+     * </ul>
+     */
+    public static class FetchHttpBody extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+
+      /** Instantiates a new Fetch Fhir message DoFn. */
+      public FetchHttpBody() {}
+
+      @Override
+      public FhirIO.Read.Result expand(PCollection<String> resourceIds) {
+        return new FhirIO.Read.Result(
+            resourceIds.apply(
+                ParDo.of(new FhirIO.Read.FetchHttpBody.HttpBodyGetFn())
+                    .withOutputTags(FhirIO.Read.OUT, TupleTagList.of(FhirIO.Read.DEAD_LETTER))));
+      }
+
+      /** DoFn for fetching messages from the Fhir store with error handling. */
+      public static class HttpBodyGetFn extends DoFn<String, HttpBody> {
+
+        private Counter failedMessageGets =
+            Metrics.counter(FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class, "failed-message-reads");
+        private static final Logger LOG =
+            LoggerFactory.getLogger(FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class);
+        private final Counter successfulHttpBodyGets =
+            Metrics.counter(
+                FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class, "successful-hl7v2-message-gets");
+        private HealthcareApiClient client;
+
+        /** Instantiates a new Hl 7 v 2 message get fn. */
+        HttpBodyGetFn() {}
+
+        /**
+         * Instantiate healthcare client.
+         *
+         * @throws IOException the io exception
+         */
+        @Setup
+        public void instantiateHealthcareClient() throws IOException {
+          this.client = new HttpHealthcareApiClient();
+        }
+
+        /**
+         * Process element.
+         *
+         * @param context the context
+         */
+        @ProcessElement
+        public void processElement(ProcessContext context) {
+          String resourceId = context.element();
+          try {
+            context.output(fetchResource(this.client, resourceId));
+          } catch (Exception e) {
+            failedMessageGets.inc();
+            LOG.warn(
+                String.format(
+                    "Error fetching Fhir message with ID %s writing to Dead Letter "
+                        + "Queue. Cause: %s Stack Trace: %s",
+                    resourceId, e.getMessage(), Throwables.getStackTraceAsString(e)));
+            context.output(FhirIO.Read.DEAD_LETTER, HealthcareIOError.of(resourceId, e));
+          }
+        }
+
+        private HttpBody fetchResource(HealthcareApiClient client, String resourceId)
+            throws IOException, IllegalArgumentException, InterruptedException {
+          long startTime = System.currentTimeMillis();
+          Sleeper sleeper = Sleeper.DEFAULT;
+
+          com.google.api.services.healthcare.v1beta1.model.HttpBody resource =
+              client.readFhirResource(resourceId);
+
+          if (resource == null) {
+            throw new IOException(String.format("GET request for %s returned null", resourceId));
+          }
+          this.successfulHttpBodyGets.inc();
+          return resource;
+        }
+      }
+    }
+  }
+
+  /** The type Write. */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<HttpBody>, Write.Result> {
+
+    /** The tag for the failed writes to FHIR store`. */
+    public static final TupleTag<HealthcareIOError<HttpBody>> FAILED_BODY =
+        new TupleTag<HealthcareIOError<HttpBody>>() {};
+
+    /** The enum Write method. */
+    public enum WriteMethod {
+      /**
+       * Execute Bundle Method executes a batch of requests as a single transaction @see <a
+       * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
+       */
+      EXECUTE_BUNDLE,
+      /**
+       * Import Method bulk imports resources from GCS. This is ideal for initial loads to empty
+       * FHIR stores. <a
+       * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>.
+       */
+      IMPORT
+    }
+
+    public static class Result implements POutput {
+      private final Pipeline pipeline;
+      private final PCollection<HealthcareIOError<HttpBody>> failedInsertsWithErr;
+
+      /** Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. */
+      static Result in(Pipeline pipeline, PCollection<HealthcareIOError<HttpBody>> failedInserts) {
+        return new Result(pipeline, failedInserts);
+      }
+
+      public PCollection<HealthcareIOError<HttpBody>> getFailedInsertsWithErr() {
+        return this.failedInsertsWithErr;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pipeline;
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        failedInsertsWithErr.setCoder(new HealthcareIOErrorCoder<HttpBody>(new HttpBodyCoder()));
+        return ImmutableMap.of(Write.FAILED_BODY, failedInsertsWithErr);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+
+      private Result(
+          Pipeline pipeline, PCollection<HealthcareIOError<HttpBody>> failedInsertsWithErr) {
+        this.pipeline = pipeline;
+        this.failedInsertsWithErr = failedInsertsWithErr;
+      }
+    }
+
+    /**
+     * Gets Fhir store.
+     *
+     * @return the Fhir store
+     */
+    abstract String getFhirStore();
+
+    /**
+     * Gets write method.
+     *
+     * @return the write method
+     */
+    abstract WriteMethod getWriteMethod();
+
+    abstract Optional<ContentStructure> getContentStructure();
+
+    abstract Optional<String> getImportGcsTempPath();
+
+    abstract Optional<String> getImportGcsDeadLetterPath();
+
+    /** The type Builder. */
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      /**
+       * Sets Fhir store.
+       *
+       * @param fhirStore the Fhir store
+       * @return the Fhir store
+       */
+      abstract Builder setFhirStore(String fhirStore);
+
+      /**
+       * Sets write method.
+       *
+       * @param writeMethod the write method
+       * @return the write method
+       */
+      abstract Builder setWriteMethod(WriteMethod writeMethod);
+
+      abstract Builder setContentStructure(ContentStructure contentStructure);
+
+      abstract Builder setImportGcsTempPath(String gcsTempPath);
+
+      abstract Builder setImportGcsDeadLetterPath(String gcsDeadLetterPath);
+
+      /**
+       * Build write.
+       *
+       * @return the write
+       */
+      abstract Write build();
+    }
+
+    private static Write.Builder write(String fhirStore) {
+      return new AutoValue_FhirIO_Write.Builder().setFhirStore(fhirStore);
+    }
+
+    /**
+     * Create Method creates a single FHIR resource. @see <a
+     * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create></a>
+     *
+     * @param fhirStore the hl 7 v 2 store
+     * @return the write
+     */
+    public static Write fhirStoresImport(
+        String fhirStore,
+        String gcsTempPath,
+        String gcsDeadLetterPath,
+        @Nullable ContentStructure contentStructure) {
+      return new AutoValue_FhirIO_Write.Builder()
+          .setFhirStore(fhirStore)
+          .setWriteMethod(Write.WriteMethod.IMPORT)
+          .setContentStructure(contentStructure)
+          .setImportGcsTempPath(gcsTempPath)
+          .setImportGcsDeadLetterPath(gcsDeadLetterPath)
+          .build();
+    }
+
+    /**
+     * Execute Bundle Method executes a batch of requests as a single transaction @see <a
+     * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
+     *
+     * @param fhirStore the hl 7 v 2 store
+     * @return the write
+     */
+    public static Write executeBundles(String fhirStore) {
+      return new AutoValue_FhirIO_Write.Builder()
+          .setFhirStore(fhirStore)
+          .setWriteMethod(WriteMethod.EXECUTE_BUNDLE)
+          .build();
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+
+    @Override
+    public Result expand(PCollection<HttpBody> input) {
+      PCollection<HealthcareIOError<HttpBody>> failedBundles;
+      PCollection<HealthcareIOError<String>> failedImports;
+      switch (this.getWriteMethod()) {
+        case IMPORT:
+          LOG.warn(
+              "Make sure the Cloud Healthcare Service Agent has permissions when using import:"
+                  + " https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions");
+          String tempPath = getImportGcsTempPath().orElseThrow(IllegalArgumentException::new);
+          String deadPath = getImportGcsDeadLetterPath().orElseThrow(IllegalArgumentException::new);
+          ContentStructure contentStructure =
+              getContentStructure().orElseThrow(IllegalArgumentException::new);
+
+          failedBundles =
+              input
+                  .apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure))
+                  .setCoder(new HealthcareIOErrorCoder<>(new HttpBodyCoder()));
+          // fall through
+        case EXECUTE_BUNDLE:
+        default:
+          failedBundles =
+              input.apply(
+                  "Execute FHIR Bundles",
+                  ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore())));
+      }
+      return Result.in(input.getPipeline(), failedBundles);
+    }
+  }
+
+  /**
+   * Writes each bundle of elements to a new-line delimited JSON file on GCS and issues a
+   * fhirStores.import Request for that file.
+   */
+  public static class Import
+      extends PTransform<PCollection<HttpBody>, PCollection<HealthcareIOError<HttpBody>>> {
+
+    private final String fhirStore;
+    private final String tempGcsPath;
+    private final String deadLetterGcsPath;
+    private final ContentStructure contentStructure;
+
+    Import(
+        String fhirStore,
+        String tempGcsPath,
+        String deadLetterGcsPath,
+        @Nullable ContentStructure contentStructure) {
+      this.fhirStore = fhirStore;
+      this.tempGcsPath = tempGcsPath;
+      this.deadLetterGcsPath = deadLetterGcsPath;
+      if (contentStructure == null) {
+        this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED;
+      } else {
+        this.contentStructure = contentStructure;
+      }
+    }
+
+    @Override
+    public PCollection<HealthcareIOError<HttpBody>> expand(PCollection<HttpBody> input) {
+      return input.apply(
+          ParDo.of(new ImportFn(fhirStore, tempGcsPath, deadLetterGcsPath, contentStructure)));
+    }
+
+    public enum ContentStructure {
+      /** If the content structure is not specified, the default value BUNDLE will be used. */
+      CONTENT_STRUCTURE_UNSPECIFIED,
+      /**
+       * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is
+       * a bundle, which contains one or more resources. Set the bundle type to history to import
+       * resource versions.
+       */
+      BUNDLE,
+      /**
+       * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is
+       * a single resource.
+       */
+      RESOURCE,
+      /** The entire file is one JSON bundle. The JSON can span multiple lines. */
+      BUNDLE_PRETTY,
+      /** The entire file is one JSON resource. The JSON can span multiple lines. */
+      RESOURCE_PRETTY
+    }
+
+    static class ImportFn extends DoFn<HttpBody, HealthcareIOError<HttpBody>> {
+      private final String fhirStore;
+      private final String tempGcsPath;
+      private final String deadLetterGcsPath;
+      private final ContentStructure contentStructure;
+      private ObjectMapper mapper;
+      private ResourceId resourceId;
+      private ResourceId deadLetterResourceId;
+      private WritableByteChannel ndJsonChannel;
+
+      private transient HealthcareApiClient client;
+      private static final Logger LOG = LoggerFactory.getLogger(ImportFn.class);
+
+      ImportFn(
+          String fhirStore,
+          String tempGcsPath,
+          String deadLetterGcsPath,
+          @Nullable ContentStructure contentStructure) {
+        this.fhirStore = fhirStore;
+        this.tempGcsPath = tempGcsPath;
+        this.deadLetterGcsPath = deadLetterGcsPath;
+        if (contentStructure == null) {
+          this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED;
+        } else {
+          this.contentStructure = contentStructure;
+        }
+      }
+
+      @Setup
+      public void initClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
+      }
+
+      @StartBundle
+      public void initBatch() throws IOException {
 
 Review comment:
   @lastomato @yeweidaniel @ygupta89 @pabloem 
   Please review the logic here and LMK if you feel this is spamming the import API.
   As written this will create a separate new line delimited JSON file per bundle (group of elements in a `PCollection` of arbitrary size decided by runner) and import it w/ a separate call to import API.
   
   This was the "easiest" way to implement but may not be the most appropriate usage of the import API.
   
   PTAL and LMK if you'd like this to refactored into something that:
   1) asserts bounded PCollection
   2) waits til all temp files are written 
   3) blocks on import by polling the operation til complete.
   
   It's not immediately obvious to me how to achieve 1 & 2 in the beam framework but I can do some research.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [ ] Add IT for FhirIO.Read
   - [ ] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [ ] Unit test for FhirIO dead letter handling
   - [ ] Add example usage to javadoc
   - [ ] Add IT for FhirIO.Read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lastomato commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
lastomato commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#discussion_r405762395
 
 

 ##########
 File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
 ##########
 @@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.gson.GsonFactory;
+import com.google.api.services.healthcare.v1beta1.CloudHealthcare;
+import com.google.api.services.healthcare.v1beta1.CloudHealthcare.Projects.Locations.Datasets.Hl7V2Stores.Messages;
+import com.google.api.services.healthcare.v1beta1.CloudHealthcareScopes;
+import com.google.api.services.healthcare.v1beta1.model.CreateMessageRequest;
+import com.google.api.services.healthcare.v1beta1.model.Empty;
+import com.google.api.services.healthcare.v1beta1.model.FhirStore;
+import com.google.api.services.healthcare.v1beta1.model.GoogleCloudHealthcareV1beta1FhirRestGcsSource;
+import com.google.api.services.healthcare.v1beta1.model.Hl7V2Store;
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import com.google.api.services.healthcare.v1beta1.model.ImportResourcesRequest;
+import com.google.api.services.healthcare.v1beta1.model.IngestMessageRequest;
+import com.google.api.services.healthcare.v1beta1.model.IngestMessageResponse;
+import com.google.api.services.healthcare.v1beta1.model.ListMessagesResponse;
+import com.google.api.services.healthcare.v1beta1.model.Message;
+import com.google.api.services.healthcare.v1beta1.model.Operation;
+import com.google.api.services.healthcare.v1beta1.model.SearchResourcesRequest;
+import com.google.api.services.storage.StorageScopes;
+import com.google.auth.oauth2.GoogleCredentials;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A client that talks to the Cloud Healthcare API through HTTP requests. This client is created
+ * mainly to encapsulate the unserializable dependencies, since most generated classes are not
+ * serializable in the HTTP client.
+ */
+public class HttpHealthcareApiClient implements HealthcareApiClient, Serializable {
+  private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json";
+  private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8";
+  private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8";
+  private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class);
+  private transient CloudHealthcare client;
+  private transient HttpClient httpClient;
+  private transient GoogleCredentials credentials;
+
+  /**
+   * Instantiates a new Http healthcare api client.
+   *
+   * @throws IOException the io exception
+   */
+  public HttpHealthcareApiClient() throws IOException {
+    initClient();
+  }
+
+  /**
+   * Instantiates a new Http healthcare api client.
+   *
+   * @param client the client
+   * @throws IOException the io exception
+   */
+  public HttpHealthcareApiClient(CloudHealthcare client) throws IOException {
+    this.client = client;
+    this.httpClient = HttpClients.createDefault();
+    initClient();
+  }
+
+  @VisibleForTesting
+  static <T, X extends Collection<T>> Stream<T> flattenIteratorCollectionsToStream(
+      Iterator<X> iterator) {
+    Spliterator<Collection<T>> spliterator = Spliterators.spliteratorUnknownSize(iterator, 0);
+    return StreamSupport.stream(spliterator, false).flatMap(Collection::stream);
+  }
+
+  public JsonFactory getJsonFactory() {
+    return this.client.getJsonFactory();
+  }
+
+  @Override
+  public Hl7V2Store createHL7v2Store(String dataset, String name) throws IOException {
+    Hl7V2Store store = new Hl7V2Store();
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .hl7V2Stores()
+        .create(dataset, store)
+        .setHl7V2StoreId(name)
+        .execute();
+  }
+
+  @Override
+  public FhirStore createFhirStore(String dataset, String name, String version) throws IOException {
+    FhirStore store = new FhirStore();
+    // TODO add separate integration tests for each FHIR version: https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores#Version
+    // Our integration test data is STU3.
+    store.setVersion(version);
+    store.setDisableReferentialIntegrity(true);
+    store.setEnableUpdateCreate(true);
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .create(dataset, store)
+        .setFhirStoreId(name)
+        .execute();
+  }
+
+  @Override
+  public Empty deleteHL7v2Store(String name) throws IOException {
+    return client.projects().locations().datasets().hl7V2Stores().delete(name).execute();
+  }
+
+  @Override
+  public Empty deleteFhirStore(String name) throws IOException {
+    return client.projects().locations().datasets().fhirStores().delete(name).execute();
+  }
+
+  @Override
+  public ListMessagesResponse makeHL7v2ListRequest(
+      String hl7v2Store, @Nullable String filter, @Nullable String pageToken) throws IOException {
+
+    Messages.List baseRequest =
+        client
+            .projects()
+            .locations()
+            .datasets()
+            .hl7V2Stores()
+            .messages()
+            .list(hl7v2Store)
+            .set("view", "full")
+            .setPageToken(pageToken);
+
+    if (Strings.isNullOrEmpty(filter)) {
+      return baseRequest.execute();
+    } else {
+      return baseRequest.setFilter(filter).execute();
+    }
+  }
+
+  /**
+   * Gets message id page iterator.
+   *
+   * @param hl7v2Store the HL7v2 store
+   * @return the message id page iterator
+   * @throws IOException the io exception
+   */
+  @Override
+  public Stream<HL7v2Message> getHL7v2MessageStream(String hl7v2Store) throws IOException {
+    return getHL7v2MessageStream(hl7v2Store, null);
+  }
+
+  /**
+   * Get a {@link Stream} of message IDs from flattening the pages of a new {@link
+   * HL7v2MessagePages}.
+   *
+   * @param hl7v2Store the HL7v2 store
+   * @param filter the filter
+   * @return the message id Stream
+   * @throws IOException the io exception
+   */
+  @Override
+  public Stream<HL7v2Message> getHL7v2MessageStream(String hl7v2Store, @Nullable String filter)
+      throws IOException {
+    Iterator<List<HL7v2Message>> iterator =
+        new HL7v2MessagePages(this, hl7v2Store, filter).iterator();
+    return flattenIteratorCollectionsToStream(iterator);
+  }
+
+  /**
+   * Gets HL7v2 message.
+   *
+   * @param msgName the msg name
+   * @return the message
+   * @throws IOException the io exception
+   * @throws ParseException the parse exception
+   */
+  @Override
+  public Message getHL7v2Message(String msgName) throws IOException {
+    Message msg =
+        client.projects().locations().datasets().hl7V2Stores().messages().get(msgName).execute();
+    if (msg == null) {
+      throw new IOException(String.format("Couldn't find message: %s.", msgName));
+    }
+    return msg;
+  }
+
+  @Override
+  public Empty deleteHL7v2Message(String msgName) throws IOException {
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .hl7V2Stores()
+        .messages()
+        .delete(msgName)
+        .execute();
+  }
+
+  /**
+   * Gets HL7v2 store.
+   *
+   * @param storeName the store name
+   * @return the HL7v2 store
+   * @throws IOException the io exception
+   */
+  @Override
+  public Hl7V2Store getHL7v2Store(String storeName) throws IOException {
+    return client.projects().locations().datasets().hl7V2Stores().get(storeName).execute();
+  }
+
+  @Override
+  public IngestMessageResponse ingestHL7v2Message(String hl7v2Store, Message msg)
+      throws IOException {
+    IngestMessageRequest ingestMessageRequest = new IngestMessageRequest();
+    ingestMessageRequest.setMessage(msg);
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .hl7V2Stores()
+        .messages()
+        .ingest(hl7v2Store, ingestMessageRequest)
+        .execute();
+  }
+
+  @Override
+  public HttpBody fhirSearch(String fhirStore, SearchResourcesRequest query) throws IOException {
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .fhir()
+        .search(fhirStore, query)
+        .execute();
+  }
+
+  @Override
+  public Message createHL7v2Message(String hl7v2Store, Message msg) throws IOException {
+    CreateMessageRequest createMessageRequest = new CreateMessageRequest();
+    createMessageRequest.setMessage(msg);
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .hl7V2Stores()
+        .messages()
+        .create(hl7v2Store, createMessageRequest)
+        .execute();
+  }
+
+  @Override
+  public HttpBody createFhirResource(String fhirStore, String type, HttpBody body)
+      throws IOException {
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .fhir()
+        .create(fhirStore, type, body)
+        .execute();
+  }
+
+  @Override
+  public Operation importFhirResource(
+      String fhirStore, String gcsSourcePath, @Nullable String contentStructure)
+      throws IOException {
+    GoogleCloudHealthcareV1beta1FhirRestGcsSource gcsSrc =
+        new GoogleCloudHealthcareV1beta1FhirRestGcsSource();
+
+    gcsSrc.setUri(gcsSourcePath);
+    ImportResourcesRequest importRequest = new ImportResourcesRequest();
+    importRequest.setGcsSource(gcsSrc).setContentStructure(contentStructure);
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .healthcareImport(fhirStore, importRequest)
+        .execute();
+  }
+
+  @Override
+  public Operation pollOperation(Operation operation, Long sleepMs)
+      throws InterruptedException, IOException {
+    LOG.debug(String.format("started opertation %s. polling until complete.", operation.getName()));
+    while (operation.getDone() == null || !operation.getDone()) {
+      // Update the status of the operation with another request.
+      Thread.sleep(sleepMs); // Pause between requests.
+      operation =
+          client.projects().locations().datasets().operations().get(operation.getName()).execute();
+    }
+    return operation;
+  }
+
+  @Override
+  public HttpBody executeFhirBundle(String fhirStore, HttpBody bundle) throws IOException {
 
 Review comment:
   There is no ETA for a fix, and it is out of our control. It is acceptable to me :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#discussion_r405824473
 
 

 ##########
 File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
 ##########
 @@ -0,0 +1,662 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import com.google.api.services.healthcare.v1beta1.model.Operation;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Import.ContentStructure;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.Write.Result;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.codehaus.jackson.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FhirIO} provides an API for reading and writing resources to <a
+ * href="https://cloud.google.com/healthcare/docs/concepts/fhir">Google Cloud Healthcare Fhir API.
+ * </a>
+ *
+ * <p>Read
+ *
+ * <p>FHIR resources can be read with {@link FhirIO.Read} supports use cases where you have a
+ * ${@link PCollection} of message IDS. This is appropriate for reading the Fhir notifications from
+ * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
+ * prepared list of messages that you need to process (e.g. in a text file read with {@link
+ * org.apache.beam.sdk.io.TextIO}) .
+ *
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
+ * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieved a
+ * {@link PCollection} containing the successfully fetched {@link HttpBody}s and/or {@link
+ * FhirIO.Read.Result#getFailedReads()} to retrieve a {@link PCollection} of {@link
+ * HealthcareIOError} containing the resource ID that could not be fetched and the exception as a
+ * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
+ * choosing. This error handling is mainly to transparently surface errors where the upstream {@link
+ * PCollection} contains IDs that are not valid or are not reachable due to permissions issues.
+ *
+ * <p>Write Resources can be written to FHIR with two different methods: Import or Execute Bundle.
+ *
+ * <p>Execute Bundle This is best for use cases where you are writing to a non-empty FHIR store with
+ * other clients or otherwise need referential integrity (e.g. A Streaming HL7v2 to FHIR ETL
+ * pipeline).
+ *
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>
+ *     <p>Import This is best for use cases where you are populating an empty FHIR store with no
+ *     other clients. It is faster than the execute bundles method but does not respect referential
+ *     integrity and the resources are not written transactionally (e.g. a historicaly backfill on a
+ *     new FHIR store) This requires each resource to contain a client provided ID. It is important
+ *     that when using import you give the appropriate permissions to the Google Cloud Healthcare
+ *     Service Agent
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a>
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>
+ *     A {@link PCollection} of {@link HttpBody} can be ingested into an Fhir store using {@link
+ *     FhirIO.Write#fhirStoresImport(String, String, String, ContentStructure)} This will return a
+ *     {@link FhirIO.Write.Result} on which you can call {@link
+ *     FhirIO.Write.Result#getFailedInsertsWithErr()} to retrieve a {@link PCollection} of {@link
+ *     HealthcareIOError} containing the {@link HttpBody} that failed to be ingested and the
+ *     exception.
+ */
+public class FhirIO {
+
+  @Experimental
+  public static class Read extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+    public Read() {
+      LOG.warn("FhirIO.Read is experimental and has not been tested.");
+    }
+
+    public static class Result implements POutput, PInput {
+      private PCollection<HttpBody> resources;
+
+      private PCollection<HealthcareIOError<String>> failedReads;
+      PCollectionTuple pct;
+
+      public static FhirIO.Read.Result of(PCollectionTuple pct) throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(OUT).and(DEAD_LETTER))) {
+          return new FhirIO.Read.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the FhirIO.Read.OUT "
+                  + "and FhirIO.Read.DEAD_LETTER tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.resources = pct.get(OUT).setCoder(new HttpBodyCoder());
+        this.failedReads =
+            pct.get(DEAD_LETTER).setCoder(new HealthcareIOErrorCoder<>(StringUtf8Coder.of()));
+      }
+
+      public PCollection<HealthcareIOError<String>> getFailedReads() {
+        return failedReads;
+      }
+
+      public PCollection<HttpBody> getResources() {
+        return resources;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(OUT, resources);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /** The tag for the main output of Fhir Messages. */
+    public static final TupleTag<HttpBody> OUT = new TupleTag<HttpBody>() {};
+    /** The tag for the deadletter output of Fhir Messages. */
+    public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+        new TupleTag<HealthcareIOError<String>>() {};
+
+    @Override
+    public FhirIO.Read.Result expand(PCollection<String> input) {
+      return input.apply("Fetch Fhir messages", new FhirIO.Read.FetchHttpBody());
+    }
+
+    /**
+     * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store based on resourceID
+     *
+     * <p>This DoFn consumes a {@link PCollection} of notifications {@link String}s from the FHIR
+     * store, and fetches the actual {@link HttpBody} object based on the id in the notification and
+     * will output a {@link PCollectionTuple} which contains the output and dead-letter {@link
+     * PCollection}.
+     *
+     * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
+     *
+     * <ul>
+     *   <li>{@link FhirIO.Read#OUT} - Contains all {@link PCollection} records successfully read
+     *       from the Fhir store.
+     *   <li>{@link FhirIO.Read#DEAD_LETTER} - Contains all {@link PCollection} of {@link
+     *       HealthcareIOError} of message IDs which failed to be fetched from the Fhir store, with
+     *       error message and stacktrace.
+     * </ul>
+     */
+    public static class FetchHttpBody extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+
+      /** Instantiates a new Fetch Fhir message DoFn. */
+      public FetchHttpBody() {}
+
+      @Override
+      public FhirIO.Read.Result expand(PCollection<String> resourceIds) {
+        return new FhirIO.Read.Result(
+            resourceIds.apply(
+                ParDo.of(new FhirIO.Read.FetchHttpBody.HttpBodyGetFn())
+                    .withOutputTags(FhirIO.Read.OUT, TupleTagList.of(FhirIO.Read.DEAD_LETTER))));
+      }
+
+      /** DoFn for fetching messages from the Fhir store with error handling. */
+      public static class HttpBodyGetFn extends DoFn<String, HttpBody> {
+
+        private Counter failedMessageGets =
+            Metrics.counter(FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class, "failed-message-reads");
+        private static final Logger LOG =
+            LoggerFactory.getLogger(FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class);
+        private final Counter successfulHttpBodyGets =
+            Metrics.counter(
+                FhirIO.Read.FetchHttpBody.HttpBodyGetFn.class, "successful-hl7v2-message-gets");
+        private HealthcareApiClient client;
+
+        /** Instantiates a new Hl 7 v 2 message get fn. */
+        HttpBodyGetFn() {}
+
+        /**
+         * Instantiate healthcare client.
+         *
+         * @throws IOException the io exception
+         */
+        @Setup
+        public void instantiateHealthcareClient() throws IOException {
+          this.client = new HttpHealthcareApiClient();
+        }
+
+        /**
+         * Process element.
+         *
+         * @param context the context
+         */
+        @ProcessElement
+        public void processElement(ProcessContext context) {
+          String resourceId = context.element();
+          try {
+            context.output(fetchResource(this.client, resourceId));
+          } catch (Exception e) {
+            failedMessageGets.inc();
+            LOG.warn(
+                String.format(
+                    "Error fetching Fhir message with ID %s writing to Dead Letter "
+                        + "Queue. Cause: %s Stack Trace: %s",
+                    resourceId, e.getMessage(), Throwables.getStackTraceAsString(e)));
+            context.output(FhirIO.Read.DEAD_LETTER, HealthcareIOError.of(resourceId, e));
+          }
+        }
+
+        private HttpBody fetchResource(HealthcareApiClient client, String resourceId)
+            throws IOException, IllegalArgumentException, InterruptedException {
+          long startTime = System.currentTimeMillis();
+          Sleeper sleeper = Sleeper.DEFAULT;
+
+          com.google.api.services.healthcare.v1beta1.model.HttpBody resource =
+              client.readFhirResource(resourceId);
+
+          if (resource == null) {
+            throw new IOException(String.format("GET request for %s returned null", resourceId));
+          }
+          this.successfulHttpBodyGets.inc();
+          return resource;
+        }
+      }
+    }
+  }
+
+  /** The type Write. */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<HttpBody>, Write.Result> {
+
+    /** The tag for the failed writes to FHIR store`. */
+    public static final TupleTag<HealthcareIOError<HttpBody>> FAILED_BODY =
+        new TupleTag<HealthcareIOError<HttpBody>>() {};
+
+    /** The enum Write method. */
+    public enum WriteMethod {
+      /**
+       * Execute Bundle Method executes a batch of requests as a single transaction @see <a
+       * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
+       */
+      EXECUTE_BUNDLE,
+      /**
+       * Import Method bulk imports resources from GCS. This is ideal for initial loads to empty
+       * FHIR stores. <a
+       * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>.
+       */
+      IMPORT
+    }
+
+    public static class Result implements POutput {
+      private final Pipeline pipeline;
+      private final PCollection<HealthcareIOError<HttpBody>> failedInsertsWithErr;
+
+      /** Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. */
+      static Result in(Pipeline pipeline, PCollection<HealthcareIOError<HttpBody>> failedInserts) {
+        return new Result(pipeline, failedInserts);
+      }
+
+      public PCollection<HealthcareIOError<HttpBody>> getFailedInsertsWithErr() {
+        return this.failedInsertsWithErr;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pipeline;
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        failedInsertsWithErr.setCoder(new HealthcareIOErrorCoder<HttpBody>(new HttpBodyCoder()));
+        return ImmutableMap.of(Write.FAILED_BODY, failedInsertsWithErr);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+
+      private Result(
+          Pipeline pipeline, PCollection<HealthcareIOError<HttpBody>> failedInsertsWithErr) {
+        this.pipeline = pipeline;
+        this.failedInsertsWithErr = failedInsertsWithErr;
+      }
+    }
+
+    /**
+     * Gets Fhir store.
+     *
+     * @return the Fhir store
+     */
+    abstract String getFhirStore();
+
+    /**
+     * Gets write method.
+     *
+     * @return the write method
+     */
+    abstract WriteMethod getWriteMethod();
+
+    abstract Optional<ContentStructure> getContentStructure();
+
+    abstract Optional<String> getImportGcsTempPath();
+
+    abstract Optional<String> getImportGcsDeadLetterPath();
+
+    /** The type Builder. */
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      /**
+       * Sets Fhir store.
+       *
+       * @param fhirStore the Fhir store
+       * @return the Fhir store
+       */
+      abstract Builder setFhirStore(String fhirStore);
+
+      /**
+       * Sets write method.
+       *
+       * @param writeMethod the write method
+       * @return the write method
+       */
+      abstract Builder setWriteMethod(WriteMethod writeMethod);
+
+      abstract Builder setContentStructure(ContentStructure contentStructure);
+
+      abstract Builder setImportGcsTempPath(String gcsTempPath);
+
+      abstract Builder setImportGcsDeadLetterPath(String gcsDeadLetterPath);
+
+      /**
+       * Build write.
+       *
+       * @return the write
+       */
+      abstract Write build();
+    }
+
+    private static Write.Builder write(String fhirStore) {
+      return new AutoValue_FhirIO_Write.Builder().setFhirStore(fhirStore);
+    }
+
+    /**
+     * Create Method creates a single FHIR resource. @see <a
+     * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create></a>
+     *
+     * @param fhirStore the hl 7 v 2 store
+     * @return the write
+     */
+    public static Write fhirStoresImport(
+        String fhirStore,
+        String gcsTempPath,
+        String gcsDeadLetterPath,
+        @Nullable ContentStructure contentStructure) {
+      return new AutoValue_FhirIO_Write.Builder()
+          .setFhirStore(fhirStore)
+          .setWriteMethod(Write.WriteMethod.IMPORT)
+          .setContentStructure(contentStructure)
+          .setImportGcsTempPath(gcsTempPath)
+          .setImportGcsDeadLetterPath(gcsDeadLetterPath)
+          .build();
+    }
+
+    /**
+     * Execute Bundle Method executes a batch of requests as a single transaction @see <a
+     * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
+     *
+     * @param fhirStore the hl 7 v 2 store
+     * @return the write
+     */
+    public static Write executeBundles(String fhirStore) {
+      return new AutoValue_FhirIO_Write.Builder()
+          .setFhirStore(fhirStore)
+          .setWriteMethod(WriteMethod.EXECUTE_BUNDLE)
+          .build();
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(Write.class);
+
+    @Override
+    public Result expand(PCollection<HttpBody> input) {
+      PCollection<HealthcareIOError<HttpBody>> failedBundles;
+      PCollection<HealthcareIOError<String>> failedImports;
+      switch (this.getWriteMethod()) {
+        case IMPORT:
+          LOG.warn(
+              "Make sure the Cloud Healthcare Service Agent has permissions when using import:"
+                  + " https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions");
+          String tempPath = getImportGcsTempPath().orElseThrow(IllegalArgumentException::new);
+          String deadPath = getImportGcsDeadLetterPath().orElseThrow(IllegalArgumentException::new);
+          ContentStructure contentStructure =
+              getContentStructure().orElseThrow(IllegalArgumentException::new);
+
+          failedBundles =
+              input
+                  .apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure))
+                  .setCoder(new HealthcareIOErrorCoder<>(new HttpBodyCoder()));
+          // fall through
+        case EXECUTE_BUNDLE:
+        default:
+          failedBundles =
+              input.apply(
+                  "Execute FHIR Bundles",
+                  ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore())));
+      }
+      return Result.in(input.getPipeline(), failedBundles);
+    }
+  }
+
+  /**
+   * Writes each bundle of elements to a new-line delimited JSON file on GCS and issues a
+   * fhirStores.import Request for that file.
+   */
+  public static class Import
+      extends PTransform<PCollection<HttpBody>, PCollection<HealthcareIOError<HttpBody>>> {
+
+    private final String fhirStore;
+    private final String tempGcsPath;
+    private final String deadLetterGcsPath;
+    private final ContentStructure contentStructure;
+
+    Import(
+        String fhirStore,
+        String tempGcsPath,
+        String deadLetterGcsPath,
+        @Nullable ContentStructure contentStructure) {
+      this.fhirStore = fhirStore;
+      this.tempGcsPath = tempGcsPath;
+      this.deadLetterGcsPath = deadLetterGcsPath;
+      if (contentStructure == null) {
+        this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED;
+      } else {
+        this.contentStructure = contentStructure;
+      }
+    }
+
+    @Override
+    public PCollection<HealthcareIOError<HttpBody>> expand(PCollection<HttpBody> input) {
+      return input.apply(
+          ParDo.of(new ImportFn(fhirStore, tempGcsPath, deadLetterGcsPath, contentStructure)));
+    }
+
+    public enum ContentStructure {
+      /** If the content structure is not specified, the default value BUNDLE will be used. */
+      CONTENT_STRUCTURE_UNSPECIFIED,
+      /**
+       * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is
+       * a bundle, which contains one or more resources. Set the bundle type to history to import
+       * resource versions.
+       */
+      BUNDLE,
+      /**
+       * The source file contains one or more lines of newline-delimited JSON (ndjson). Each line is
+       * a single resource.
+       */
+      RESOURCE,
+      /** The entire file is one JSON bundle. The JSON can span multiple lines. */
+      BUNDLE_PRETTY,
+      /** The entire file is one JSON resource. The JSON can span multiple lines. */
+      RESOURCE_PRETTY
+    }
+
+    static class ImportFn extends DoFn<HttpBody, HealthcareIOError<HttpBody>> {
+      private final String fhirStore;
+      private final String tempGcsPath;
+      private final String deadLetterGcsPath;
+      private final ContentStructure contentStructure;
+      private ObjectMapper mapper;
+      private ResourceId resourceId;
+      private ResourceId deadLetterResourceId;
+      private WritableByteChannel ndJsonChannel;
+
+      private transient HealthcareApiClient client;
+      private static final Logger LOG = LoggerFactory.getLogger(ImportFn.class);
+
+      ImportFn(
+          String fhirStore,
+          String tempGcsPath,
+          String deadLetterGcsPath,
+          @Nullable ContentStructure contentStructure) {
+        this.fhirStore = fhirStore;
+        this.tempGcsPath = tempGcsPath;
+        this.deadLetterGcsPath = deadLetterGcsPath;
+        if (contentStructure == null) {
+          this.contentStructure = ContentStructure.CONTENT_STRUCTURE_UNSPECIFIED;
+        } else {
+          this.contentStructure = contentStructure;
+        }
+      }
+
+      @Setup
+      public void initClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
+      }
+
+      @StartBundle
+      public void initBatch() throws IOException {
 
 Review comment:
   Research:
   
   - Based on [this SO post](https://stackoverflow.com/questions/44255924/error-message-too-many-sources-provided-15285-limit-is-10000) 
   We might be able to `GroupByKey` single dummy key to encourage runner to do less bundle splitting, though I'm not sure that will really do the trick. 
   - From Beam docs:
   > Specifically, it makes it impossible to batch any operations, such as writing elements to a sink or checkpointing progress during processing.
   >The division of the collection into bundles is arbitrary and selected by the runner. This allows the runner to choose an appropriate middle-ground between persisting results after every element, and having to retry everything if there is a failure. For example, a streaming runner may prefer to process and commit small bundles, and a batch runner may prefer to process larger bundles.
   ([source](https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence))
   
   It seems to me that our predominant use case for Import is a batch pipeline and this sentence implies that bundles are larger in batch (not sure if we can find internal details on Dataflow Runner). 
   
   - BigQueryIO has a [batch loads](https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java) pattern which seems similar to what we are aiming to achieve here. This seems quite complex and I'm wondering if there are simpler approaches elsewhere in the beam code base.
   
   Do we have any guidance on how many parallel import operations would be acceptable? Perhaps we can run some benchmarks with 500, 1000, 10000 resources with this approach / adding group by single key and see if it's acceptable. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#discussion_r405725031
 
 

 ##########
 File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
 ##########
 @@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import com.google.api.client.http.HttpHeaders;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.gson.GsonFactory;
+import com.google.api.services.healthcare.v1beta1.CloudHealthcare;
+import com.google.api.services.healthcare.v1beta1.CloudHealthcare.Projects.Locations.Datasets.Hl7V2Stores.Messages;
+import com.google.api.services.healthcare.v1beta1.CloudHealthcareScopes;
+import com.google.api.services.healthcare.v1beta1.model.CreateMessageRequest;
+import com.google.api.services.healthcare.v1beta1.model.Empty;
+import com.google.api.services.healthcare.v1beta1.model.FhirStore;
+import com.google.api.services.healthcare.v1beta1.model.GoogleCloudHealthcareV1beta1FhirRestGcsSource;
+import com.google.api.services.healthcare.v1beta1.model.Hl7V2Store;
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import com.google.api.services.healthcare.v1beta1.model.ImportResourcesRequest;
+import com.google.api.services.healthcare.v1beta1.model.IngestMessageRequest;
+import com.google.api.services.healthcare.v1beta1.model.IngestMessageResponse;
+import com.google.api.services.healthcare.v1beta1.model.ListMessagesResponse;
+import com.google.api.services.healthcare.v1beta1.model.Message;
+import com.google.api.services.healthcare.v1beta1.model.Operation;
+import com.google.api.services.healthcare.v1beta1.model.SearchResourcesRequest;
+import com.google.api.services.storage.StorageScopes;
+import com.google.auth.oauth2.GoogleCredentials;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.methods.RequestBuilder;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A client that talks to the Cloud Healthcare API through HTTP requests. This client is created
+ * mainly to encapsulate the unserializable dependencies, since most generated classes are not
+ * serializable in the HTTP client.
+ */
+public class HttpHealthcareApiClient implements HealthcareApiClient, Serializable {
+  private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json";
+  private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8";
+  private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8";
+  private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class);
+  private transient CloudHealthcare client;
+  private transient HttpClient httpClient;
+  private transient GoogleCredentials credentials;
+
+  /**
+   * Instantiates a new Http healthcare api client.
+   *
+   * @throws IOException the io exception
+   */
+  public HttpHealthcareApiClient() throws IOException {
+    initClient();
+  }
+
+  /**
+   * Instantiates a new Http healthcare api client.
+   *
+   * @param client the client
+   * @throws IOException the io exception
+   */
+  public HttpHealthcareApiClient(CloudHealthcare client) throws IOException {
+    this.client = client;
+    this.httpClient = HttpClients.createDefault();
+    initClient();
+  }
+
+  @VisibleForTesting
+  static <T, X extends Collection<T>> Stream<T> flattenIteratorCollectionsToStream(
+      Iterator<X> iterator) {
+    Spliterator<Collection<T>> spliterator = Spliterators.spliteratorUnknownSize(iterator, 0);
+    return StreamSupport.stream(spliterator, false).flatMap(Collection::stream);
+  }
+
+  public JsonFactory getJsonFactory() {
+    return this.client.getJsonFactory();
+  }
+
+  @Override
+  public Hl7V2Store createHL7v2Store(String dataset, String name) throws IOException {
+    Hl7V2Store store = new Hl7V2Store();
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .hl7V2Stores()
+        .create(dataset, store)
+        .setHl7V2StoreId(name)
+        .execute();
+  }
+
+  @Override
+  public FhirStore createFhirStore(String dataset, String name, String version) throws IOException {
+    FhirStore store = new FhirStore();
+    // TODO add separate integration tests for each FHIR version: https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores#Version
+    // Our integration test data is STU3.
+    store.setVersion(version);
+    store.setDisableReferentialIntegrity(true);
+    store.setEnableUpdateCreate(true);
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .create(dataset, store)
+        .setFhirStoreId(name)
+        .execute();
+  }
+
+  @Override
+  public Empty deleteHL7v2Store(String name) throws IOException {
+    return client.projects().locations().datasets().hl7V2Stores().delete(name).execute();
+  }
+
+  @Override
+  public Empty deleteFhirStore(String name) throws IOException {
+    return client.projects().locations().datasets().fhirStores().delete(name).execute();
+  }
+
+  @Override
+  public ListMessagesResponse makeHL7v2ListRequest(
+      String hl7v2Store, @Nullable String filter, @Nullable String pageToken) throws IOException {
+
+    Messages.List baseRequest =
+        client
+            .projects()
+            .locations()
+            .datasets()
+            .hl7V2Stores()
+            .messages()
+            .list(hl7v2Store)
+            .set("view", "full")
+            .setPageToken(pageToken);
+
+    if (Strings.isNullOrEmpty(filter)) {
+      return baseRequest.execute();
+    } else {
+      return baseRequest.setFilter(filter).execute();
+    }
+  }
+
+  /**
+   * Gets message id page iterator.
+   *
+   * @param hl7v2Store the HL7v2 store
+   * @return the message id page iterator
+   * @throws IOException the io exception
+   */
+  @Override
+  public Stream<HL7v2Message> getHL7v2MessageStream(String hl7v2Store) throws IOException {
+    return getHL7v2MessageStream(hl7v2Store, null);
+  }
+
+  /**
+   * Get a {@link Stream} of message IDs from flattening the pages of a new {@link
+   * HL7v2MessagePages}.
+   *
+   * @param hl7v2Store the HL7v2 store
+   * @param filter the filter
+   * @return the message id Stream
+   * @throws IOException the io exception
+   */
+  @Override
+  public Stream<HL7v2Message> getHL7v2MessageStream(String hl7v2Store, @Nullable String filter)
+      throws IOException {
+    Iterator<List<HL7v2Message>> iterator =
+        new HL7v2MessagePages(this, hl7v2Store, filter).iterator();
+    return flattenIteratorCollectionsToStream(iterator);
+  }
+
+  /**
+   * Gets HL7v2 message.
+   *
+   * @param msgName the msg name
+   * @return the message
+   * @throws IOException the io exception
+   * @throws ParseException the parse exception
+   */
+  @Override
+  public Message getHL7v2Message(String msgName) throws IOException {
+    Message msg =
+        client.projects().locations().datasets().hl7V2Stores().messages().get(msgName).execute();
+    if (msg == null) {
+      throw new IOException(String.format("Couldn't find message: %s.", msgName));
+    }
+    return msg;
+  }
+
+  @Override
+  public Empty deleteHL7v2Message(String msgName) throws IOException {
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .hl7V2Stores()
+        .messages()
+        .delete(msgName)
+        .execute();
+  }
+
+  /**
+   * Gets HL7v2 store.
+   *
+   * @param storeName the store name
+   * @return the HL7v2 store
+   * @throws IOException the io exception
+   */
+  @Override
+  public Hl7V2Store getHL7v2Store(String storeName) throws IOException {
+    return client.projects().locations().datasets().hl7V2Stores().get(storeName).execute();
+  }
+
+  @Override
+  public IngestMessageResponse ingestHL7v2Message(String hl7v2Store, Message msg)
+      throws IOException {
+    IngestMessageRequest ingestMessageRequest = new IngestMessageRequest();
+    ingestMessageRequest.setMessage(msg);
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .hl7V2Stores()
+        .messages()
+        .ingest(hl7v2Store, ingestMessageRequest)
+        .execute();
+  }
+
+  @Override
+  public HttpBody fhirSearch(String fhirStore, SearchResourcesRequest query) throws IOException {
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .fhir()
+        .search(fhirStore, query)
+        .execute();
+  }
+
+  @Override
+  public Message createHL7v2Message(String hl7v2Store, Message msg) throws IOException {
+    CreateMessageRequest createMessageRequest = new CreateMessageRequest();
+    createMessageRequest.setMessage(msg);
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .hl7V2Stores()
+        .messages()
+        .create(hl7v2Store, createMessageRequest)
+        .execute();
+  }
+
+  @Override
+  public HttpBody createFhirResource(String fhirStore, String type, HttpBody body)
+      throws IOException {
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .fhir()
+        .create(fhirStore, type, body)
+        .execute();
+  }
+
+  @Override
+  public Operation importFhirResource(
+      String fhirStore, String gcsSourcePath, @Nullable String contentStructure)
+      throws IOException {
+    GoogleCloudHealthcareV1beta1FhirRestGcsSource gcsSrc =
+        new GoogleCloudHealthcareV1beta1FhirRestGcsSource();
+
+    gcsSrc.setUri(gcsSourcePath);
+    ImportResourcesRequest importRequest = new ImportResourcesRequest();
+    importRequest.setGcsSource(gcsSrc).setContentStructure(contentStructure);
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .healthcareImport(fhirStore, importRequest)
+        .execute();
+  }
+
+  @Override
+  public Operation pollOperation(Operation operation, Long sleepMs)
+      throws InterruptedException, IOException {
+    LOG.debug(String.format("started opertation %s. polling until complete.", operation.getName()));
+    while (operation.getDone() == null || !operation.getDone()) {
+      // Update the status of the operation with another request.
+      Thread.sleep(sleepMs); // Pause between requests.
+      operation =
+          client.projects().locations().datasets().operations().get(operation.getName()).execute();
+    }
+    return operation;
+  }
+
+  @Override
+  public HttpBody executeFhirBundle(String fhirStore, HttpBody bundle) throws IOException {
 
 Review comment:
   @lastomato @yeweidaniel @ygupta89
   This is super awkward but the `executeBundle` method of client lib seems broken always giving errors about unparsable resource based on my testing (CDAP plugins also written similar to this).
   
   Any ETA on fix for that?
   Does this workaround seem acceptable til then?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [ ] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions (improves maintainability)
   - [ ] Add IT for FhirIO.Read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-614332334
 
 
   @lastomato I added [GroupIntoBatches](https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html) in the FhirIO.Import path. 
   The logic is:
   - buffer `HttpBody`'s to an iterable until we have 1000 of them (this threshold was chosen arbitrarily)
   - ImportFn updates the ndJson write channel with all 1000 resources
   - FinishBundle will flush the batch: write to file on GCS and trigger import job
   
   This is one way to mitigate the "import job per resource" concern but I'm open to other suggestions for achieving this.
   
   Though the language in the docs is:
   >Elements are buffered until there are batchSize elements buffered, at which point they are output to the output PCollection.
   
   Which sounds like if a batch never reaches batchSize it might not be output.
   GroupIntoBatches behaves as one would expect and if there are extra elements left over they are output as a smaller batch. Verified in unit test added in 8c4d636

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [x] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions (improves maintainability)
   - [x] Add IT for FhirIO.Read
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-610710580
 
 
   Currently The R4 Integration test is failing.
   Will look into this tomorrow.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-614332334
 
 
   @lastomato I added [GroupIntoBatches](https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html) in the FhirIO.Import path. 
   The logic is:
   - buffer `HttpBody`'s to an iterable until we have 1000 of them (this threshold was chosen arbitrarily)
   - ImportFn updates the ndJson write channel with all 1000 resources
   - FinishBundle will flush the batch: write to file on GCS and trigger import job
   
   This is one way to mitigate the "import job per resource" concern but I'm open to other suggestions for achieving this.
   
   I need to verify if this will miss the last batch if it isn't full. The language in the docs is 
   >Elements are buffered until there are batchSize elements buffered, at which point they are output to the output PCollection.
   
   Which sounds like if a batch never reaches batchSize it will not be output.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [ ] ValueProvider support
   - [ ] Unit test for FhirIO dead letter handling
   - [ ] Add example usage to javadoc
   - [ ] Benchmark / load test the FhirIO.Import
   - [ ] Add IT for FhirIO.Read

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [x] ValueProvider support
   - [x] Add example usage to javadoc
   - [x] Unit test for FhirIO dead letter handling
   - [ ] Add IT for FhirIO.Read
   - [ ] Migrate ITs to parameterized tests to DRY up ITs against different FHIR versions (improves maintainability)
   - [ ] Benchmark / load test the FhirIO.Import
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-612264169
 
 
   @lastomato 
   Do you think we should add `FhirIO.Read` implementations that paginate through [fhir.search](https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/search) results?
   
   I'm not sure what use case this really solves and would like to keep it for a later PR if it becomes necessary.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on issue #11339: [BEAM-9468] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #11339: [BEAM-9468] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-614200971
 
 
   R: @pabloem 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jaketf commented on issue #11339: [BEAM-9468] [WIP] Fhir io

Posted by GitBox <gi...@apache.org>.
jaketf commented on issue #11339: [BEAM-9468] [WIP] Fhir io
URL: https://github.com/apache/beam/pull/11339#issuecomment-611125298
 
 
   TODOs:
   - [ ] ValueProvider support
   - [ ] Unit test for FhirIO dead letter handling
   - [ ] Add example usage to javadoc

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services