You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/02 02:22:32 UTC

[GitHub] [beam] jaketf opened a new pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

jaketf opened a new pull request #11596:
URL: https://github.com/apache/beam/pull/11596


   This PR is an experiment to illustrate a potential strategy for implementing HL7v2IO.ListMessages as a splittable DoFn that carves the sendTime dimension up into time ranges and uses the Messages.List API.
   
   ## DISCLAIMER
   This PR is currently a discussion piece to help drive the decision of the future of HL7v2IO.ListMessages. 
   
   It has excessive logging for demonstrating behavior / debugging and is currently broken.
   
   CC: @pabloem
   
   ## Questions
   - What are the prototypical tests for validating splitable DoFn's various methods (e.g. SplitRestriction) stability?
   
   ## Current Status: Broken
   Currently get confusing NPE with OffsetRangeTracker when running
   
   `./gradlew :sdks:java:io:google-cloud-platform:integrationTest --tests "org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOReadIT.testHL7v2IO_ListHL7v2Messages_filtered"`
   
   I seem to get the expected initial daily splits:
   ```
   WARNING: splitting initial sendTime restriction of [minSendTime, now): [2019-03-09T13:24:44.000Z,2020-05-02T02:18:51.850Z), or [1552137884000, 1588385931850). 
   total days: 419 
   into 420 splits. 
   Last split: [1588339484000, 1588385931850)
   ```
   
   NPE Error:
   ```
   java.lang.NullPointerException
   org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NullPointerException
   	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
   	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
   	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
   	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
   	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
   	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
   	at org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOReadIT.testHL7v2IO_ListHL7v2Messages_filtered(HL7v2IOReadIT.java:135)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
   	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
   	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
   	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
   	at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
   	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
   	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
   	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
   	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
   	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
   	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
   	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
   	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
   	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
   	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
   	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
   	at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
   	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
   	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
   	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
   	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
   	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
   	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
   	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
   	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
   	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
   	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
   	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
   	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
   	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
   	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
   	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
   	at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.NullPointerException
   	at org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:98)
   	at org.apache.beam.repackaged.direct_java.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
   	at org.apache.beam.repackaged.direct_java.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:234)
   	at org.apache.beam.repackaged.direct_java.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)
   ```
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [x] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427643387



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of

Review comment:
       ```suggestion
      * <p>This transform is optimized for splitting of message.list calls for large batches of
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427545617



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +547,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {

Review comment:
       tl;dr No that is not possible based on 629-640
   `tryClaim` on 624 claims `lastClaimedMillisSecond`
   
   `tryClaim` on 645 claims the millisecond of the cursor (which is advanced based on the while loop in L629-633) 
   
   if the cusor is not advanced to a new millisecond (this happens when many messages at the end of a page came in the same millisecond) the if block in L636-640 contains `continue` which will eagerly exit this iteration of the while loop before getting to the try claim on L645 again. I called this scenario "hangingClaim" because we  cannot know if the first message(s) of the next page will also be from this millisecond. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623793025


   @chamikaramj @lukecwik 
   Apologies, I believe the the NPE was a user error on my part. 
   I've been able to revert my changes to to OffsetRangeTracker without reintroducing the NPE.
   
   To help future users as foolish as me,  to get a less confusing NPE, I suggest we do something like I put in 8891292. But this is definitely not a core issue in OffsetRangeTracker.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631224035


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421847633



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +548,120 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OrderedTimeRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OrderedTimeRange(from, to);
+    }
+
+    @NewTracker
+    public OrderedTimeRangeTracker newTracker(@Restriction OrderedTimeRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(
+        @Restriction OrderedTimeRange timeRange, OutputReceiver<OrderedTimeRange> out) {
+      // TODO(jaketf) How to pick optimal values for desiredNumOffsetsPerSplit ?

Review comment:
       My intuition says hourly splits seems like a more reasonable starting point than day but I'm unsure about this without testing on a few datasets.
   
   However for super large time ranges 10 years = 87,600 splits. naturally this explodes if we think about even smaller time ranges.
   
   For example, I assume the initial collection of splits returned in SplitRestriction has to fit in memory? 
   Or even just the loop to define all these might take a significant amount of time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421756445



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OrderedTimeRange.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** A restriction represented by a range of Instants [from, to). */
+public class OrderedTimeRange

Review comment:
       why not use the offset range tracker and convert time to `long`?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
##########
@@ -94,6 +94,9 @@ public void checkDone() throws IllegalStateException {
     if (range.getFrom() == range.getTo()) {
       return;
     }
+    if (lastAttemptedOffset == null) {
+      throw new IllegalStateException("lastAttemptedOffset should not be null");
+    }
     checkState(
         lastAttemptedOffset >= range.getTo() - 1,
         "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",

Review comment:
       Please make this a seperate PR and add a test that covers this case to the OffsetRangeTrackerTest
   
   ```suggestion
       checkState(
           lastAttemptedOffset != null && lastAttemptedOffset >= range.getTo() - 1,
           "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +548,120 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OrderedTimeRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OrderedTimeRange(from, to);
+    }
+
+    @NewTracker
+    public OrderedTimeRangeTracker newTracker(@Restriction OrderedTimeRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(
+        @Restriction OrderedTimeRange timeRange, OutputReceiver<OrderedTimeRange> out) {
+      // TODO(jaketf) How to pick optimal values for desiredNumOffsetsPerSplit ?

Review comment:
       Typically, doing one split for every 64mbs of output has been our guidance here in the past.
   
   Dynamic splitting is meant to fill in the gap if there is too little splitting or a specific restriction has a lot more data then other restrictions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427618071



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of

Review comment:
       Consider using `{@code ...}` when referring to code and `{@link ...}` for things you can directly link against.
   
   ```suggestion
      * <p>This transform is optimized for dynamic splitting of {@code message.list} calls for large batches of
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #11596:
URL: https://github.com/apache/beam/pull/11596


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623741333


   Is using SplittableDoFn API here intentional ?
   
   I think this API is being updated. cc: @lukecwik 
   
   If you need dynamic work rebalancing, consider using the BoundedSource interface. Otherwise we can just implement the source as a regular and wait for SplittableDoFn API to stabilize before adding support for dynamic work rebalancing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631015851


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421809510



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OrderedTimeRange.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** A restriction represented by a range of Instants [from, to). */
+public class OrderedTimeRange

Review comment:
       TBH, the main value add here is more readable restriction bounds / values in the error messages for the context of the time range use case. 
   
   If there is a cleaner way of borrowing all implementation from OffsetRangeTracker but changing format of these error messages, I'd be all ears.
   
   I suppose I could add a  `private final bool offestsAreInstants` member to OffsetRangeTracker and throw conditional formatting on each error message?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631277688


   @pabloem please retest this


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427616856



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.

Review comment:
       wouldn' this just be a small amount of waste since we would effectively get an empty response?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of

Review comment:
       consider using `<ol>` and `<li>` tags in the javadoc for your ordered list

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list calls.
+   *
+   * <p>This will make more queries than necessary when used with very small data sets. (or very
+   * sparse data sets in the sendTime dimension).
+   *
+   * <p>If you have large but sparse data (e.g. hours between consecutive message sendTimes) and
+   * know something about the time ranges where you have no data, consider using multiple instances
+   * of this transform specifying sendTime filters to omit the ranges where there is no data.
+   */
   public static class ListHL7v2Messages extends PTransform<PBegin, PCollection<HL7v2Message>> {
-    private final List<String> hl7v2Stores;
-    private final String filter;
+    private final ValueProvider<List<String>> hl7v2Stores;
+    private ValueProvider<String> filter;
+    private Duration initialSplitDuration;

Review comment:
       even if a member variable is null, it should still be final since it doesn't look like we mutate it locally. Same reason for other places I suggest to change this.
   ```suggestion
       private final ValueProvider<String> filter;
       private final Duration initialSplitDuration;
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
      * @param filter the filter
      */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, ValueProvider<String> filter) {
-      this.hl7v2Stores = hl7v2Stores.get();
-      this.filter = filter.get();
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+    }
+
+    /**
+     * Instantiates a new List hl 7 v 2 messages.
+     *
+     * @param hl7v2Stores the hl 7 v 2 stores
+     * @param filter the filter
+     * @param initialSplitDuration the initial split duration for sendTime dimension splits
+     */
+    ListHL7v2Messages(
+        ValueProvider<List<String>> hl7v2Stores,
+        ValueProvider<String> filter,
+        Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+      this.initialSplitDuration = initialSplitDuration;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) {
-      this.hl7v2Stores = hl7v2Stores.get();
+      this.hl7v2Stores = hl7v2Stores;
       this.filter = null;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     * @param initialSplitDuration the initial split duration
+     */
+    ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.initialSplitDuration = initialSplitDuration;
+    }
+
     @Override
     public PCollection<HL7v2Message> expand(PBegin input) {
       return input
-          .apply(Create.of(this.hl7v2Stores))
-          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
+          .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of())))
+          .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x))
+          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, initialSplitDuration)))
           .setCoder(new HL7v2MessageCoder())
           // Break fusion to encourage parallelization of downstream processing.
           .apply(Reshuffle.viaRandomKey());
     }
   }
 
+  /**
+   * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the
+   * Message.sendTime dimension.
+   */
+  @BoundedPerElement
   static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {
 
-    private final String filter;
+    private static final Logger LOG = LoggerFactory.getLogger(ListHL7v2MessagesFn.class);
+    private ValueProvider<String> filter;
+    // These control the initial restriction split which means that the list of integer pairs
+    // must comfortably fit in memory.
+    private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = Duration.standardDays(1);
+    private static final Duration DEFAULT_MIN_SPLIT_DURATION = Duration.standardHours(1);
+    private Duration initialSplitDuration;
+    private Instant from;
+    private Instant to;

Review comment:
       Can any of these be final?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +547,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to make sure that we
+            // process all messages for this millisecond because sendTime is allegedly nano second
+            // resolution.
+            // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);
+              msg = page.get(i++);

Review comment:
       I agree with @pabloem that this logic is not simple to follow and I think you could really simplify your code if you used https://guava.dev/releases/21.0/api/docs/com/google/common/collect/FluentIterable.html#concat-java.lang.Iterable-
   since it would convert `Iterable<List<HL7v2Message>>` into `Iterable<HL7v2Message>` and only accesses the elements lazily so it wouldn't prefetch everything.
   
   This would allow you to not worry that `messages` are in a `page` and your processing multiple `pages`.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
      * @param filter the filter
      */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, ValueProvider<String> filter) {
-      this.hl7v2Stores = hl7v2Stores.get();
-      this.filter = filter.get();
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+    }
+
+    /**
+     * Instantiates a new List hl 7 v 2 messages.
+     *
+     * @param hl7v2Stores the hl 7 v 2 stores
+     * @param filter the filter
+     * @param initialSplitDuration the initial split duration for sendTime dimension splits
+     */
+    ListHL7v2Messages(
+        ValueProvider<List<String>> hl7v2Stores,
+        ValueProvider<String> filter,
+        Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+      this.initialSplitDuration = initialSplitDuration;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) {
-      this.hl7v2Stores = hl7v2Stores.get();
+      this.hl7v2Stores = hl7v2Stores;
       this.filter = null;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     * @param initialSplitDuration the initial split duration
+     */
+    ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.initialSplitDuration = initialSplitDuration;
+    }
+
     @Override
     public PCollection<HL7v2Message> expand(PBegin input) {
       return input
-          .apply(Create.of(this.hl7v2Stores))
-          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
+          .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of())))
+          .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x))
+          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, initialSplitDuration)))
           .setCoder(new HL7v2MessageCoder())
           // Break fusion to encourage parallelization of downstream processing.
           .apply(Reshuffle.viaRandomKey());
     }
   }
 
+  /**
+   * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the
+   * Message.sendTime dimension.
+   */
+  @BoundedPerElement
   static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {
 
-    private final String filter;
+    private static final Logger LOG = LoggerFactory.getLogger(ListHL7v2MessagesFn.class);
+    private ValueProvider<String> filter;
+    // These control the initial restriction split which means that the list of integer pairs
+    // must comfortably fit in memory.
+    private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = Duration.standardDays(1);
+    private static final Duration DEFAULT_MIN_SPLIT_DURATION = Duration.standardHours(1);

Review comment:
       nit: Might want to group your statics at the top together separate from the member variables.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter.get(), "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to make sure that we
+            // process all messages for this millisecond because sendTime is allegedly nano second
+            // resolution.
+            // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);

Review comment:
       Should we be outputting elements with the timestamp of the message and should we be reporting a watermark?
   
   Even though you have a bounded SDF, it could be useful to report the watermark incase it is used in a streaming pipeline or users wanted to assign windows and perform grouping per window.
   
   The current logic will assign the input's timestamp to all outputs which won't allow users to use windowing to effectively window the elements being output without assigning timestamps themselves. If we do want to go down this path it is simple right now because this transform always starts with PBegin but what would you want to do it the timestamp of the record is before the timestamp of the input element to the SDF (since it is illegal to output messages with timestamps before the input elements timestamp)?
   
   To add watermark tracking based on timestamp of elements output, you would need to add the implementation for `@GetInitialWatermarkEstimatorState` and `@NewWatermarkEstimator` as seen in https://github.com/apache/beam/blob/27656d74fa9fb7085e2532275c821c3601c3f4b7/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L763
   
   
   
   

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+

Review comment:
       This method is not necessary since `OffsetRange` supports `HasDefaultTracker` which your effectively invoking yourself.
   ```suggestion
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of

Review comment:
       Consider using `{@code ...}` when referring to code and `{@link ...} for thinks you can directly link against.
   
   ```suggestion
      * <p>This transform is optimized for dynamic splitting of {@code message.list} calls for large batches of
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list calls.

Review comment:
       I'm not sure if the users need to know the exact implementation details as this may lock future maintainers into meeting these goals even when they can produce a more efficient solution in the future.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list calls.
+   *
+   * <p>This will make more queries than necessary when used with very small data sets. (or very
+   * sparse data sets in the sendTime dimension).

Review comment:
       ```suggestion
      * sparse data sets in the {@code sendTime} dimension).
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list calls.
+   *
+   * <p>This will make more queries than necessary when used with very small data sets. (or very
+   * sparse data sets in the sendTime dimension).
+   *
+   * <p>If you have large but sparse data (e.g. hours between consecutive message sendTimes) and
+   * know something about the time ranges where you have no data, consider using multiple instances
+   * of this transform specifying sendTime filters to omit the ranges where there is no data.

Review comment:
       I don't think we want people to do this since empty splits are not that expensive and will quickly clear out a block of work.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
      * @param filter the filter
      */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, ValueProvider<String> filter) {
-      this.hl7v2Stores = hl7v2Stores.get();
-      this.filter = filter.get();
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+    }
+
+    /**
+     * Instantiates a new List hl 7 v 2 messages.
+     *
+     * @param hl7v2Stores the hl 7 v 2 stores
+     * @param filter the filter
+     * @param initialSplitDuration the initial split duration for sendTime dimension splits
+     */
+    ListHL7v2Messages(
+        ValueProvider<List<String>> hl7v2Stores,
+        ValueProvider<String> filter,
+        Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+      this.initialSplitDuration = initialSplitDuration;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) {
-      this.hl7v2Stores = hl7v2Stores.get();
+      this.hl7v2Stores = hl7v2Stores;
       this.filter = null;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     * @param initialSplitDuration the initial split duration
+     */
+    ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.initialSplitDuration = initialSplitDuration;
+    }
+
     @Override
     public PCollection<HL7v2Message> expand(PBegin input) {
       return input
-          .apply(Create.of(this.hl7v2Stores))
-          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
+          .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of())))
+          .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x))
+          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, initialSplitDuration)))
           .setCoder(new HL7v2MessageCoder())
           // Break fusion to encourage parallelization of downstream processing.
           .apply(Reshuffle.viaRandomKey());
     }
   }
 
+  /**
+   * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the
+   * Message.sendTime dimension.
+   */
+  @BoundedPerElement
   static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {

Review comment:
       ```suggestion
     @VisibleForTesting
     static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -459,7 +532,13 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
      * @param filter the filter
      */
     ListHL7v2MessagesFn(String filter) {
+      new ListHL7v2MessagesFn(StaticValueProvider.of(filter), null);

Review comment:
       https://stackoverflow.com/questions/285177/how-do-i-call-one-constructor-from-another-in-java
   ```suggestion
         this(StaticValueProvider.of(filter), null);
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,

Review comment:
       ```suggestion
           RestrictionTracker<OffsetRange, Long> tracker,
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter.get(), "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to make sure that we
+            // process all messages for this millisecond because sendTime is allegedly nano second
+            // resolution.
+            // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);
+              msg = page.get(i++);
+              cursor = Instant.parse(msg.getSendTime());
+            }
+
+            if (i == page.size() && cursor.getMillis() == lastClaimedMilliSecond) {
+              // reached the end of the page and timestamp still in the claimed ms.
+              hangingClaim = true;
+              continue;
+            }
+
+            // If reached this point, msg.sendTime is outside the current claim.
+            // Need to claim time range up to (and including) the cursor to properly advance the
+            // tracker.
+            tracker.tryClaim(cursor.getMillis());
+            lastClaimedMilliSecond = cursor.getMillis();
+            LOG.info(
+                String.format(
+                    "After claiming between messages lastClaimedMilliSecond = %s",
+                    lastClaimedMilliSecond));
+          }
+        }
         messageListingLatencyMs.update(Instant.now().getMillis() - reqestTime);

Review comment:
       It would sense to move this into wherever we do the list call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421827964



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +548,120 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OrderedTimeRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OrderedTimeRange(from, to);
+    }
+
+    @NewTracker
+    public OrderedTimeRangeTracker newTracker(@Restriction OrderedTimeRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(
+        @Restriction OrderedTimeRange timeRange, OutputReceiver<OrderedTimeRange> out) {
+      // TODO(jaketf) How to pick optimal values for desiredNumOffsetsPerSplit ?

Review comment:
       Yeah I think the "spiky backfill" (many cases in a small sendTime) is a corner case of a hot split that would just be slow and users would have to accept that or take it up with their upstream system.
   
   splitting on messageType / sendFacility are probably more popular logical filters and feels like a hack for a corner case that might mess with performance under the "typical" distribution of data in sendTime.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631598650


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf edited a comment on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623766796


   @chamikaramj thanks for the suggestion. I will look into using BoundedSource API.
   
   Unfortunately, regular DoFns don't cut it because a single elements outputs are committed atomically (see this [conversation](https://github.com/apache/beam/pull/11538#discussion_r416927740)).
   Basically we have one input element (HL7v2 store) exploding to many, many output elements (all the messages in that store) in a single ProcessElement call. I'm trying to explore strategies for splitting up this listing.
   
   I originally chose splittable DoFn over BoundedSource based off the sentiment of this statement:
   > **Coding against the Source API involves a lot of boilerplate and is error-prone**, and it does not compose well with the rest of the Beam model because a Source can appear only at the root of a pipeline. - https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
   
   The blog also mentions 
   - A Source can not emit an additional output (for example, records that failed to parse).
       - Healthcare customers feeding requirements for this plugin want DLQ on all sinks and sources. To be consistent with the streaming API provided in `HL7v2IO.Read` I wanted to provide DLQ in `HLv2IO.ListMessages`. However, I believe this is more of a nice to have for batch use cases (because there's no room for passing ListMessages bad messages IDs like there is in HL7v2IO.Read).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427640111



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.

Review comment:
       correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421824462



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
##########
@@ -94,6 +94,9 @@ public void checkDone() throws IllegalStateException {
     if (range.getFrom() == range.getTo()) {
       return;
     }
+    if (lastAttemptedOffset == null) {
+      throw new IllegalStateException("lastAttemptedOffset should not be null");
+    }
     checkState(
         lastAttemptedOffset >= range.getTo() - 1,
         "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",

Review comment:
       Sure, I can move this to separate PR.
   
   Kindly, disagree on suggestion as it will just cause a different NPE on [L102](https://github.com/apache/beam/pull/11596/files/d2a094d5a133132d015fc7ed335e5b430a19f183#diff-b07e9d49c549836ef7e2d7006fa93f3cL102) when you call `lastAttemptedOffset + 1` to try and print this error message.
   
   I think the lastAttemptedOffset null check should be separate and throw a more specific error message before we get to this stateCheck.
   
   This failure mode is almost definitely mis-use of OffsetRangeTracker that would even cause this, and would be difficult to say what work was or wasn't attempted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421825288



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OrderedTimeRange.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** A restriction represented by a range of Instants [from, to). */
+public class OrderedTimeRange

Review comment:
       that make a lot of sense, thanks for the suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf edited a comment on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623766796


   @chamikaramj thanks for the suggestion. I will look into using BoundedSource API in a separate PR and we can compare.
   
   Unfortunately, regular DoFns don't cut it because a single elements outputs are committed atomically (see this [conversation](https://github.com/apache/beam/pull/11538#discussion_r416927740)).
   Basically we have one input element (HL7v2 store) exploding to many, many output elements (all the messages in that store) in a single ProcessElement call. I'm trying to explore strategies for splitting up this listing.
   
   I originally chose splittable DoFn over BoundedSource based off the sentiment of this statement:
   > **Coding against the Source API involves a lot of boilerplate and is error-prone**, and it does not compose well with the rest of the Beam model because a Source can appear only at the root of a pipeline. - https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
   
   Using splittable DoFn also made reading from multiple HL7v2 stores come for free (e.g. if you had several regional HL7v2 stores and your use case was to read from them all and write to a single multi-regional store). This admittedly a rather contrived use case. 
   
   The blog also mentions 
   - A Source can not emit an additional output (for example, records that failed to parse).
       - Healthcare customers feeding requirements for this plugin want DLQ on all sinks and sources. To be consistent with the streaming API provided in `HL7v2IO.Read` I wanted to provide DLQ in `HLv2IO.ListMessages`. However, I believe this is more of a nice to have for batch use cases (because there's no room for passing ListMessages bad messages IDs like there is in HL7v2IO.Read).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623718338


   The NPE is caused by `OffsetRangeTracker.lastAttemptedOffset` being unboxed in `OffsetRangeTracker::checkDone` [here](https://github.com/apache/beam/blob/0291976d6029b4cf4d72caa03bd3836900fb6328/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L98). 
   
   [all](https://github.com/apache/beam/blob/0291976d6029b4cf4d72caa03bd3836900fb6328/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L53) [other](https://github.com/apache/beam/blob/0291976d6029b4cf4d72caa03bd3836900fb6328/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L77) [uses](https://github.com/apache/beam/blob/0291976d6029b4cf4d72caa03bd3836900fb6328/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L119) check that `lastAttemptedOffset`  is not null.
   
   I'm not sure if this was intentional in the implementation of OffsetRangeTracker. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427641350



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list calls.
+   *
+   * <p>This will make more queries than necessary when used with very small data sets. (or very
+   * sparse data sets in the sendTime dimension).
+   *
+   * <p>If you have large but sparse data (e.g. hours between consecutive message sendTimes) and
+   * know something about the time ranges where you have no data, consider using multiple instances
+   * of this transform specifying sendTime filters to omit the ranges where there is no data.

Review comment:
       That's great to know! will remove this guidance as it will lead to unnecessary complexity. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631522462


   Run Java postcommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r418843954



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
##########
@@ -59,7 +59,7 @@
               + "AL1|2|allergy|Z91.013^Personal history of allergy to sea food^ZAL|SEVERE|Swollen face|\r"
               + "AL1|3|allergy|Z91.040^Latex allergy^ZAL|MODERATE|Raised, itchy, red rash|",
           // Another ADT Message
-          "MSH|^~\\&|hl7Integration|hl7Integration|||||ADT^A08|||2.5|\r"
+          "MSH|^~\\&|hl7Integration|hl7Integration|||20190309132544||ADT^A08|||2.5|\r"

Review comment:
       this time stamp will always be present in valid HL7v2 Messages.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r428095390



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +523,77 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker<OffsetRange, Long> tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
-      long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
-        messageListingLatencyMs.update(Instant.now().getMillis() - reqestTime);
-        page.forEach(context::output);
-        reqestTime = Instant.now().getMillis();
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter.get(), "sendTime");
+      Instant cursor;
+      long lastClaimedMilliSecond = startRestriction.getMillis() - 1;
+      for (HL7v2Message msg : FluentIterable.concat(pages)) {
+        cursor = Instant.parse(msg.getSendTime());
+        if (cursor.getMillis() > lastClaimedMilliSecond && tracker.tryClaim(cursor.getMillis())) {
+          lastClaimedMilliSecond = cursor.getMillis();
+        }
+
+        if (cursor.getMillis() == lastClaimedMilliSecond) { // loop over messages in millisecond.
+          outputReceiver.output(msg);
+        }

Review comment:
       ```suggestion
           if (cursor.getMillis() > lastClaimedMilliSecond) {
             // Return early after the first claim failure preventing us from iterating
             // through the remaining messages.
             if (!tracker.tryClaim(cursor.getMillis())) {
               return;
             }
             lastClaimedMilliSecond = cursor.getMillis();
           }
   
           outputReceiver.output(msg);
   ```

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter.get(), "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to make sure that we
+            // process all messages for this millisecond because sendTime is allegedly nano second
+            // resolution.
+            // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);

Review comment:
       Yes we can defer since this would change what people are getting from the existing implementation and would likely require opt in to not break any existing users.
   
   If nobody depends on this transform yet then it would be wise to address it before adoption since a lot of users typically expect the output timestamp to match the record's source timestamp. This may not apply to this specific source and is dependent on what users expect so itis your judgement call.
   
   If you do go with changing the output timestamp, the watermark tracking would help for streaming pipelines since it would allow them to perform better. The current implementation would still produce correct results with or without and adding it later would be very safe (it may expose problems in pipelines that were already broken for other reasons).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-628307591


   @pabloem PTAL / retest this if you have the chance.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r428107636



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -427,39 +458,59 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
      * @param filter the filter
      */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, ValueProvider<String> filter) {
-      this.hl7v2Stores = hl7v2Stores.get();
-      this.filter = filter.get();
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+      this.initialSplitDuration = null;
     }
 
-    ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) {
-      this.hl7v2Stores = hl7v2Stores.get();
-      this.filter = null;
+    public ListHL7v2Messages withInitialSplitDuration(Duration initialSplitDuration) {
+      this.initialSplitDuration = initialSplitDuration;
+      return this;
     }
 
     @Override
     public PCollection<HL7v2Message> expand(PBegin input) {
       return input
-          .apply(Create.of(this.hl7v2Stores))
-          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
-          .setCoder(new HL7v2MessageCoder())
+          .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of())))
+          .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x))
+          .apply(ParDo.of(new ListHL7v2MessagesFn(filter, initialSplitDuration)))
+          .setCoder(HL7v2MessageCoder.of())
           // Break fusion to encourage parallelization of downstream processing.
           .apply(Reshuffle.viaRandomKey());
     }
   }
 
+  /**
+   * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the
+   * Message.sendTime dimension.
+   */
+  @BoundedPerElement
+  @VisibleForTesting
   static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {
-
-    private final String filter;
+    // These control the initial restriction split which means that the list of integer pairs
+    // must comfortably fit in memory.
+    private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = Duration.standardDays(1);
+    private static final Duration DEFAULT_MIN_SPLIT_DURATION = Duration.standardHours(1);
+
+    private static final Logger LOG = LoggerFactory.getLogger(ListHL7v2MessagesFn.class);
+    private ValueProvider<String> filter;
+    private Duration initialSplitDuration;
+    private Instant from;
+    private Instant to;

Review comment:
       `from` and `to` seem to only be used within `@GetInitialRestriction`, can we make them local variables there?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +424,32 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes.
+   *
+   * <p>Note on Benchmarking By default, this will make more queries than necessary when used with

Review comment:
       `Benchmarking By`?
   
   Awkward sentence and capitalization.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623733179


   @chamikaramj can you take a look at this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r422233303



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +548,120 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OrderedTimeRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OrderedTimeRange(from, to);
+    }
+
+    @NewTracker
+    public OrderedTimeRangeTracker newTracker(@Restriction OrderedTimeRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(
+        @Restriction OrderedTimeRange timeRange, OutputReceiver<OrderedTimeRange> out) {
+      // TODO(jaketf) How to pick optimal values for desiredNumOffsetsPerSplit ?

Review comment:
       That seems like a lot.
   
   Dataflow has an API limit of 20mbs for split descriptions when being returned which usually tops out around 10k splits for sources but even 10k is too much. Typically 20-50 splits is enough since dynamic splitting will ramp that up to 1000s if necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427542617



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +547,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to make sure that we
+            // process all messages for this millisecond because sendTime is allegedly nano second
+            // resolution.
+            // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);
+              msg = page.get(i++);

Review comment:
       but won't we go right back to 631, and emit the message that we got from 632 after calling `get(0)` for the second time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631055752


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r418843954



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
##########
@@ -59,7 +59,7 @@
               + "AL1|2|allergy|Z91.013^Personal history of allergy to sea food^ZAL|SEVERE|Swollen face|\r"
               + "AL1|3|allergy|Z91.040^Latex allergy^ZAL|MODERATE|Raised, itchy, red rash|",
           // Another ADT Message
-          "MSH|^~\\&|hl7Integration|hl7Integration|||||ADT^A08|||2.5|\r"
+          "MSH|^~\\&|hl7Integration|hl7Integration|||20190309132544||ADT^A08|||2.5|\r"

Review comment:
       this time stamp will always be present in valid HL7v2 Messages.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631522261


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421807059



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +548,120 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OrderedTimeRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OrderedTimeRange(from, to);
+    }
+
+    @NewTracker
+    public OrderedTimeRangeTracker newTracker(@Restriction OrderedTimeRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(
+        @Restriction OrderedTimeRange timeRange, OutputReceiver<OrderedTimeRange> out) {
+      // TODO(jaketf) How to pick optimal values for desiredNumOffsetsPerSplit ?

Review comment:
       Unfortunately, in this use case dynamic splitting would be crucial because we can't know the distribution of data in the restriction dimension (sendTime). 
   
   If you imagine a hospital might be much busier during daytime / weekdays than night times weekends (though never dormant due to ICU and emergency services). "Day time" might change base on hospital location, week days are subject to holidays, etc.
   
   Data distribution in sendTime may be subject to significant spikes if one of the upstream systems populating sendTime has to backfill after a maintenance period and doesn't responsibly set this field to event time but sets all of the sendTimes to a short range of  backfill 
   processing time (this is sub optimal behavior of that system but sometimes a reality).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj edited a comment on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
chamikaramj edited a comment on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623741333


   Is using SplittableDoFn API here intentional ?
   
   I think this API is being updated. cc: @lukecwik 
   
   If you need dynamic work rebalancing, consider using the BoundedSource interface. Otherwise we can just implement the source using regular DoFns and wait for SplittableDoFn API to stabilize before adding support for dynamic work rebalancing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631584876


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623737955


   > The NPE is caused by `OffsetRangeTracker.lastAttemptedOffset` being unboxed in `OffsetRangeTracker::checkDone` [here](https://github.com/apache/beam/blob/0291976d6029b4cf4d72caa03bd3836900fb6328/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L98).
   > 
   > [all](https://github.com/apache/beam/blob/0291976d6029b4cf4d72caa03bd3836900fb6328/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L53) [other](https://github.com/apache/beam/blob/0291976d6029b4cf4d72caa03bd3836900fb6328/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L77) [uses](https://github.com/apache/beam/blob/0291976d6029b4cf4d72caa03bd3836900fb6328/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L119) check that `lastAttemptedOffset` is not null.
   > 
   > I'm not sure if this was intentional in the implementation of OffsetRangeTracker.
   
   @chamikaramj pablo said you might know about this.
   
   should check done have some conditional on `lastAttemptedOffset != null`
   e.g.
   ```java
     @Override
     public void checkDone() throws IllegalStateException {
       if (range.getFrom() == range.getTo()) {
         return;
       }
   
       if (lastAttemptedOffset != null) {
         checkState(
             lastAttemptedOffset >= range.getTo() - 1,
             "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",
             lastAttemptedOffset,
             range,
             lastAttemptedOffset + 1,
             range.getTo());
       }
     }
   ```
   
   I'm not really familiar with what checkDone should do in the case that lastAttemptedOffset was null.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421826446



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
##########
@@ -94,6 +94,9 @@ public void checkDone() throws IllegalStateException {
     if (range.getFrom() == range.getTo()) {
       return;
     }
+    if (lastAttemptedOffset == null) {
+      throw new IllegalStateException("lastAttemptedOffset should not be null");
+    }
     checkState(
         lastAttemptedOffset >= range.getTo() - 1,
         "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted",

Review comment:
       Good point but we should structure message saying that we haven't claimed anything in the range which is non-empty.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631064445


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631189422


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631058701


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631010013


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427640873



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.
+   *
+   * <p>Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of
+   * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges
+   * (default daily) to achieve parallelization in separate messages.list calls.

Review comment:
       I originally included this for users who may try to benchmark this against tiny / sparse results set and be surprised why it is slow / making so many api calls.
   
   I see your point will remove.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421813915



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +548,120 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OrderedTimeRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OrderedTimeRange(from, to);
+    }
+
+    @NewTracker
+    public OrderedTimeRangeTracker newTracker(@Restriction OrderedTimeRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(
+        @Restriction OrderedTimeRange timeRange, OutputReceiver<OrderedTimeRange> out) {
+      // TODO(jaketf) How to pick optimal values for desiredNumOffsetsPerSplit ?

Review comment:
       Yeah, you'll have to choose a reasonable value based upon your knowledge within this domain and I understand that this may not work in practice.
   
   In the case where people send a lot of data for a narrow time range, you would have to support filtering based upon other properties such as messageType or sendFacility and could break up the space based upon prefixes such as `~a`, `~b`, `NOT ~a AND NOT ~b` but this is dependent on this being efficient in the service as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631583181


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427654986



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +551,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get());
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter.get(), "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to make sure that we
+            // process all messages for this millisecond because sendTime is allegedly nano second
+            // resolution.
+            // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);

Review comment:
       Thanks for suggestion.
   In the interest of time, can I punt this to a future PR?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631595939


   Run Java Postcommit
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623766796


   @chamikaramj thanks for the suggestion. I will look into using BoundedSource API.
   
   Unfortunately, regular DoFns don't cut it because a single elements outputs are committed atomically (see this [conversation](https://github.com/apache/beam/pull/11538#discussion_r416927740)).
   Basically we have one input element (HL7v2 store) exploding to many, many output elements (all the messages in that store) in a single ProcessElement call. I'm trying to explore strategies for splitting up this listing.
   
   I originally chose splittable DoFn over BoundedSource based off the sentiment of this statement:
   > **Coding against the Source API involves a lot of boilerplate and is error-prone**, and it does not compose well with the rest of the Beam model because a Source can appear only at the root of a pipeline. - https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
   
   The blog also mentions 
   - A Source can not emit an additional output (for example, records that failed to parse).
       - Healthcare customers feeding requirements for this plugin want DLQ on all sinks and sources. To be consistent with the streaming API provided in `HL7v2IO.Read` I wanted to provide DLQ in `HLv2IO.ListMessages`. However, I believe this is more of a nice to have for batch use cases.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631046540


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r422426383



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +548,120 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OrderedTimeRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OrderedTimeRange(from, to);
+    }
+
+    @NewTracker
+    public OrderedTimeRangeTracker newTracker(@Restriction OrderedTimeRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(
+        @Restriction OrderedTimeRange timeRange, OutputReceiver<OrderedTimeRange> out) {
+      // TODO(jaketf) How to pick optimal values for desiredNumOffsetsPerSplit ?

Review comment:
       Interesting, thanks for those details! 
   
   But I thought Dataflow doesn't really do dynamic splitting yet so the initial split is all that matters? Maybe I'm confusing dynamic rebalancing != dynamic splitting?
     
   Perhaps daily is the appropriate happy medium for now? 
   1000s not 10,000s of initial splits.
   
   In the future when dynamic splitting is supported bump initial split default to weeks?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427540468



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +547,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to make sure that we
+            // process all messages for this millisecond because sendTime is allegedly nano second
+            // resolution.
+            // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);
+              msg = page.get(i++);

Review comment:
       tl;dr No, we don't output the element twice. This is intended behavior.
    
   `page` is  `List<HL7v2Message>`
   When we call `page.get` we just call [List::get](https://docs.oracle.com/javase/8/docs/api/java/util/List.html#get-int-).
   This is safe to call twice and will "get" the first element of the list both times.
   
   
   We only emit results in L631 where we call [OutputReceiver::output](https://beam.apache.org/releases/javadoc/2.20.0/index.html?org/apache/beam/sdk/transforms/DoFn.OutputReceiver.html)
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf edited a comment on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623766796


   @chamikaramj thanks for the suggestion. I will look into using BoundedSource API in a separate PR and we can compare.
   
   Unfortunately, regular DoFns don't cut it because a single elements outputs are committed atomically (see this [conversation](https://github.com/apache/beam/pull/11538#discussion_r416927740)).
   Basically we have one input element (HL7v2 store) exploding to many, many output elements (all the messages in that store) in a single ProcessElement call. I'm trying to explore strategies for splitting up this listing.
   
   I originally chose splittable DoFn over BoundedSource based off the sentiment of this statement:
   > **Coding against the Source API involves a lot of boilerplate and is error-prone**, and it does not compose well with the rest of the Beam model because a Source can appear only at the root of a pipeline. - https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
   
   The blog also mentions 
   - A Source can not emit an additional output (for example, records that failed to parse).
       - Healthcare customers feeding requirements for this plugin want DLQ on all sinks and sources. To be consistent with the streaming API provided in `HL7v2IO.Read` I wanted to provide DLQ in `HLv2IO.ListMessages`. However, I believe this is more of a nice to have for batch use cases (because there's no room for passing ListMessages bad messages IDs like there is in HL7v2IO.Read).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631585052


   Run Java Postcommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427643888



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
     }
   }
 
-  /** List HL7v2 messages in HL7v2 Stores with optional filter. */
+  /**
+   * List HL7v2 messages in HL7v2 Stores with optional filter.
+   *
+   * <p>This transform is optimized for dynamic splitting of message.list calls for large batches of
+   * historical data and assumes rather continuous stream of sendTimes. It will dynamically
+   * rebalance resources to handle "peak traffic times" but will waste resources if there are large
+   * durations (days) of the sendTime dimension without data.

Review comment:
       note to self: remove reference to "dynamically rebalance" as this is not yet supported by dataflow runner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r421819001



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OrderedTimeRange.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.util.VarInt;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** A restriction represented by a range of Instants [from, to). */
+public class OrderedTimeRange

Review comment:
       Any bug in this space is an error in the implementation of the IO connector so these error messages will mostly come up during development and then if any bugs are discovered during the usage the user won't be able to do much and this will go to the Beam community/IO author to fix.
   
   If you really want to, adding a `format` method and creating a subclass would make more sense then having effectively a copy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf edited a comment on pull request #11596: [BEAM-9856] [*WIP DO NOT MERGE*] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf edited a comment on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-623766796


   @chamikaramj thanks for the suggestion. I will look into using BoundedSource API in a separate PR and we can compare.
   
   Unfortunately, regular DoFns don't cut it because a single elements outputs are committed atomically (see this [conversation](https://github.com/apache/beam/pull/11538#discussion_r416927740)).
   Basically we have one input element (HL7v2 store) exploding to many, many output elements (all the messages in that store) in a single ProcessElement call. I'm trying to explore strategies for splitting up this listing (because due to pagination nature of messages.list it is single threaded).
   
   I originally chose splittable DoFn over BoundedSource based off the sentiment of this statement:
   > **Coding against the Source API involves a lot of boilerplate and is error-prone**, and it does not compose well with the rest of the Beam model because a Source can appear only at the root of a pipeline. - https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
   
   Using splittable DoFn also made reading from multiple HL7v2 stores come for free (e.g. if you had several regional HL7v2 stores and your use case was to read from them all and write to a single multi-regional store). This admittedly a rather contrived use case. 
   
   The blog also mentions 
   - A Source can not emit an additional output (for example, records that failed to parse).
       - Healthcare customers feeding requirements for this plugin want DLQ on all sinks and sources. To be consistent with the streaming API provided in `HL7v2IO.Read` I wanted to provide DLQ in `HLv2IO.ListMessages`. However, I believe this is more of a nice to have for batch use cases (because there's no room for passing ListMessages bad messages IDs like there is in HL7v2IO.Read).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427686974



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
      * @param filter the filter
      */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, ValueProvider<String> filter) {
-      this.hl7v2Stores = hl7v2Stores.get();
-      this.filter = filter.get();
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+    }
+
+    /**
+     * Instantiates a new List hl 7 v 2 messages.
+     *
+     * @param hl7v2Stores the hl 7 v 2 stores
+     * @param filter the filter
+     * @param initialSplitDuration the initial split duration for sendTime dimension splits
+     */
+    ListHL7v2Messages(
+        ValueProvider<List<String>> hl7v2Stores,
+        ValueProvider<String> filter,
+        Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.filter = filter;
+      this.initialSplitDuration = initialSplitDuration;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     */
     ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores) {
-      this.hl7v2Stores = hl7v2Stores.get();
+      this.hl7v2Stores = hl7v2Stores;
       this.filter = null;
     }
 
+    /**
+     * Instantiates a new List hl7v2 messages.
+     *
+     * @param hl7v2Stores the hl7v2 stores
+     * @param initialSplitDuration the initial split duration
+     */
+    ListHL7v2Messages(ValueProvider<List<String>> hl7v2Stores, Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores;
+      this.initialSplitDuration = initialSplitDuration;
+    }
+
     @Override
     public PCollection<HL7v2Message> expand(PBegin input) {
       return input
-          .apply(Create.of(this.hl7v2Stores))
-          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter)))
+          .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of())))
+          .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x))
+          .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, initialSplitDuration)))
           .setCoder(new HL7v2MessageCoder())
           // Break fusion to encourage parallelization of downstream processing.
           .apply(Reshuffle.viaRandomKey());
     }
   }
 
+  /**
+   * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the
+   * Message.sendTime dimension.
+   */
+  @BoundedPerElement
   static class ListHL7v2MessagesFn extends DoFn<String, HL7v2Message> {
 
-    private final String filter;
+    private static final Logger LOG = LoggerFactory.getLogger(ListHL7v2MessagesFn.class);
+    private ValueProvider<String> filter;
+    // These control the initial restriction split which means that the list of integer pairs
+    // must comfortably fit in memory.
+    private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = Duration.standardDays(1);
+    private static final Duration DEFAULT_MIN_SPLIT_DURATION = Duration.standardHours(1);
+    private Duration initialSplitDuration;
+    private Instant from;
+    private Instant to;

Review comment:
       I don't think so they don't get set until we make the earliest / lastest sendTime query in @GetInitialRestriction




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-631189470


   Run Java PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427546641



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +547,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {

Review comment:
       sgtm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427525429



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +547,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {
+            // This means we have claimed an entire millisecond we need to make sure that we
+            // process all messages for this millisecond because sendTime is allegedly nano second
+            // resolution.
+            // https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message
+            while (cursor.getMillis() == lastClaimedMilliSecond
+                && i < page.size()) { // loop over messages in millisecond.
+              outputReceiver.output(msg);
+              msg = page.get(i++);

Review comment:
       Would we output the first message twice?
   in the first iteration, `msg = page.get(0)`, from line 616
   in the second iteration, `msg = page.get(i++)` - which is still `0`, and then `i` is incremented?
   You could use `++i`, but then you would have to adjust the condition in the while loop, right?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -472,24 +547,118 @@ public void initClient() throws IOException {
       this.client = new HttpHealthcareApiClient();
     }
 
+    @GetInitialRestriction
+    public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store)
+        throws IOException {
+      from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter);
+      // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be
+      // included in results set to add an extra ms to the upper bound.
+      to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter).plus(1);
+      return new OffsetRange(from.getMillis(), to.getMillis());
+    }
+
+    @NewTracker
+    public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) {
+      return timeRange.newTracker();
+    }
+
+    @SplitRestriction
+    public void split(@Restriction OffsetRange timeRange, OutputReceiver<OffsetRange> out) {
+      List<OffsetRange> splits =
+          timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis());
+      Instant from = Instant.ofEpochMilli(timeRange.getFrom());
+      Instant to = Instant.ofEpochMilli(timeRange.getTo());
+      Duration totalDuration = new Duration(from, to);
+      LOG.info(
+          String.format(
+              "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), "
+                  + "or [%s, %s). \n"
+                  + "total days: %s \n"
+                  + "into %s splits. \n"
+                  + "Last split: %s",
+              from,
+              to,
+              timeRange.getFrom(),
+              timeRange.getTo(),
+              totalDuration.getStandardDays(),
+              splits.size(),
+              splits.get(splits.size() - 1).toString()));
+
+      for (OffsetRange s : splits) {
+        out.output(s);
+      }
+    }
+
     /**
      * List messages.
      *
-     * @param context the context
+     * @param hl7v2Store the HL7v2 store to list messages from
      * @throws IOException the io exception
      */
     @ProcessElement
-    public void listMessages(ProcessContext context) throws IOException {
-      String hl7v2Store = context.element();
-      // Output all elements of all pages.
+    public void listMessages(
+        @Element String hl7v2Store,
+        RestrictionTracker tracker,
+        OutputReceiver<HL7v2Message> outputReceiver)
+        throws IOException {
+      OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction();
+      Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom());
+      Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo());
       HttpHealthcareApiClient.HL7v2MessagePages pages =
-          new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter);
+          new HttpHealthcareApiClient.HL7v2MessagePages(
+              client, hl7v2Store, startRestriction, endRestriction, filter, "sendTime");
       long reqestTime = Instant.now().getMillis();
-      for (Stream<HL7v2Message> page : pages) {
+      long lastClaimedMilliSecond;
+      Instant cursor;
+      boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page.
+      for (List<HL7v2Message> page : pages) { // loop over pages.
+        int i = 0;
+        HL7v2Message msg = page.get(i);
+        while (i < page.size()) { // loop over messages in page
+          cursor = Instant.parse(msg.getSendTime());
+          lastClaimedMilliSecond = cursor.getMillis();
+          LOG.info(
+              String.format(
+                  "initial claim for page %s lastClaimedMilliSecond = %s",
+                  i, lastClaimedMilliSecond));
+          if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) {

Review comment:
       I don't know the API requirements, but you would be calling `tryClaim` twice on a milliSecond? Once in line 645, and once more here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jaketf commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #11596:
URL: https://github.com/apache/beam/pull/11596#issuecomment-629406986


   @pabloem bump ^. please retest this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11596:
URL: https://github.com/apache/beam/pull/11596#discussion_r427506366



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -431,25 +455,70 @@ private Message fetchMessage(HealthcareApiClient client, String msgId)
       this.filter = filter.get();
     }
 
+    /**
+     * Instantiates a new List hl 7 v 2 messages.
+     *
+     * @param hl7v2Stores the hl 7 v 2 stores
+     * @param filter the filter
+     * @param initialSplitDuration the initial split duration for sendTime dimension splits
+     */
+    ListHL7v2Messages(
+        ValueProvider<List<String>> hl7v2Stores,
+        ValueProvider<String> filter,
+        Duration initialSplitDuration) {
+      this.hl7v2Stores = hl7v2Stores.get();
+      this.filter = filter.get();
+      this.initialSplitDuration = initialSplitDuration;

Review comment:
       `ValueProvider` arguments are usually not known at pipeline construction time, so it is not useful to call `get` on them when you create the PCollection. The usual method is to have ValueProvider attributes for the class - when you get String arguments, you would wrap them with `StaticValueProvider` - and in the execution-time methods (e.g. processelement, finish/startbundle), you would call `get` on the valueproviders. I seem to have missed this point earlier. sorry about that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org