You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/28 21:16:53 UTC

[GitHub] [beam] pabloem commented on a change in pull request #11538: [BEAM-9831] Improve performance and UX for HL7v2IO

pabloem commented on a change in pull request #11538:
URL: https://github.com/apache/beam/pull/11538#discussion_r416060393



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -475,7 +497,14 @@ public void initClient() throws IOException {
     public void listMessages(ProcessContext context) throws IOException {
       String hl7v2Store = context.element();
       // Output all elements of all pages.
-      this.client.getHL7v2MessageStream(hl7v2Store, this.filter).forEach(context::output);
+      HttpHealthcareApiClient.HL7v2MessagePages pages =
+          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+      long reqestTime = Instant.now().getMillis();

Review comment:
       `requestTime`?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -437,6 +444,20 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
           .apply(Create.of(this.hl7v2Stores))
           .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
           .setCoder(new HL7v2MessageCoder())
+          // Listing takes a long time for each input element (HL7v2 store) because it has to
+          // paginate through results in a single thread / ProcessElement call in order to keep
+          // track of page token.
+          // Eagerly emit data on 1 second intervals so downstream processing can get started before
+          // all of the list results have been paginated through.

Review comment:
       Unfortunately, this is not possible. If you are paginating from inside the single DoFn `processelement` call, the data coming out of it will only go downstream after the element is done being processed, so this windowing is not changing that in the execution.
   This is because bundle execution is committed atomically, so the whole bundle executes before data can go downstream. You do touch on an interesting example, which is one of the reasons that we came up with SplittableDoFn.
   
   Something you could try to do is:
   ```
   PColll<HL7v2Message> pages = hl7v2Stores.apply(ParDo.of(new RetrieveAndOutputPagesFn()))
   
   pages.apply(Reshuffle.viaRandomKey()).apply(ParDo.of(new FetchEachPageFn())
   ```
   Though I don't know if you can actually do that : )




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org