You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/06 00:12:13 UTC

[GitHub] [beam] pabloem commented on a change in pull request #11702: [BEAM-9990] Add Conditional Update and Conditional Create to FhirIO

pabloem commented on a change in pull request #11702:
URL: https://github.com/apache/beam/pull/11702#discussion_r436210077



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1173,4 +1276,339 @@ public void executeBundles(ProcessContext context) {
       }
     }
   }
+
+  /**
+   * Create resources fhir io . create resources.
+   *
+   * @param <T> the type parameter
+   * @param fhirStore the fhir store
+   * @return the fhir io . create resources
+   */
+  public static <T> FhirIO.CreateResources<T> createResources(ValueProvider<String> fhirStore) {
+    return new CreateResources(fhirStore);
+  }
+
+  /**
+   * Create resources fhir io . create resources.
+   *
+   * @param <T> the type parameter
+   * @param fhirStore the fhir store
+   * @return the fhir io . create resources
+   */
+  public static <T> FhirIO.CreateResources<T> createResources(String fhirStore) {
+    return new CreateResources(fhirStore);
+  }
+  /**
+   * {@link PTransform} for Creating FHIR resources.
+   *
+   * <p>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+   */
+  public static class CreateResources<T> extends PTransform<PCollection<T>, Write.Result> {
+    private final String fhirStore;
+    private SerializableFunction<T, String> ifNoneExistFunction;
+    private SerializableFunction<T, String> formatBodyFunction;
+    private SerializableFunction<T, String> typeFunction;
+    private static final Logger LOG = LoggerFactory.getLogger(CreateResources.class);
+
+    /**
+     * Instantiates a new Create resources transform.
+     *
+     * @param fhirStore the fhir store
+     */
+    CreateResources(ValueProvider<String> fhirStore) {
+      this.fhirStore = fhirStore.get();
+    }
+
+    /**
+     * Instantiates a new Create resources.
+     *
+     * @param fhirStore the fhir store
+     */
+    CreateResources(String fhirStore) {
+      this.fhirStore = fhirStore;
+    }
+
+    /**
+     * This adds a {@link SerializableFunction} that reads an resource string and extracts an
+     * If-None-Exists query for conditional create. Typically this will just be extracting an ID to
+     * look for.
+     *
+     * <p>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+     *
+     * @param ifNoneExistFunction the if none exist function
+     * @return the create resources
+     */
+    public CreateResources withIfNotExistFunction(
+        SerializableFunction<T, String> ifNoneExistFunction) {
+      this.ifNoneExistFunction = ifNoneExistFunction;
+      return this;
+    }
+
+    /**
+     * This adds a {@link SerializableFunction} that reads an resource string and extracts an
+     * resource type.
+     *
+     * <p>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
+     *
+     * @param typeFunction for extracting type from a resource.
+     * @return the create resources
+     */
+    public CreateResources withTypeFunction(SerializableFunction<T, String> typeFunction) {
+      this.typeFunction = typeFunction;
+      return this;
+    }
+    /**
+     * With format body function create resources.
+     *
+     * @param formatBodyFunction the format body function
+     * @return the create resources
+     */
+    public CreateResources withFormatBodyFunction(

Review comment:
       I don't think I understand this function very well. It seems like a fn to format a resource properly in case its formatting is not correct? Could you detail the documentation for it?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  *     output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ *     output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection<HealthcareIOError<String>> failedBundles = writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  *     BigQueryIO
  *         .write()
  *         .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  *         .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ *     .apply("Reconcile with Conditional Update",
+ *         FhirIO.ConditionalUpdate(fhirStore)
+ *             .withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ *             .withTypeFunction((HealthcareIOError<String> err) -> {
+ *               String body = err.getDataResource();
+ *               // TODO(user) insert logic to exctract type.
+ *               return params;
+ *             })
+ *             .withSearchParametersFunction((HealthcareIOError<String> err) -> {
+ *               String body = err.getDataResource();
+ *               Map<String, String> params = new HashMap();
+ *               // TODO(user) insert logic to exctract search query parameters.
+ *               return params;
+ *             });
+ * // [End Reconciliation with Conditional Update]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional create
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ *     .apply("Reconcile with Conditional Create",
+ *         FhirIO.CreateResources(fhirStore)
+ *             .withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ *             .withIfNotExistsFunction((HealthcareIOError<String> err) -> {
+ *               String body = err.getDataResource();
+ *               // TODO(user) insert logic to exctract a query to be used in If-Not-Exists header.
+ *               return params;

Review comment:
       It is not clear to me if this function should return an ID to check for existence, or a fully formatted query? Perhaps detail this in documentation / snippet?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -130,10 +132,18 @@
  *     FhirIO.Write.Result#getFailedBodies()} to retrieve a {@link PCollection} of {@link
  *     HealthcareIOError} containing the {@link String} that failed to be ingested and the
  *     exception.
+ *     <h3>Conditional Creating / Updating Resources</h3>
+ *     {@link FhirIO} supports interfaces for conditional update. These can be useful to handle
+ *     scenarios where an executeBundle failed. For example if you tried to create a resource that
+ *     already exists you can grab the faield bodies of your {@link FhirIO.ExecuteBundles} transform
+ *     with {@link FhirIO.Write.Result#getFailedBodies()} perform some logic on the reason for
+ *     failures and if appropriate route this to {@link FhirIO.ConditionalUpdate} or {@link
+ *     FhirIO.CreateResources} to take the appropriate action on your FHIR store.

Review comment:
       Perhaps link the method documentation here? (https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create, etc)

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  *     output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ *     output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection<HealthcareIOError<String>> failedBundles = writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  *     BigQueryIO
  *         .write()
  *         .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  *         .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ *     .apply("Reconcile with Conditional Update",
+ *         FhirIO.ConditionalUpdate(fhirStore)
+ *             .withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ *             .withTypeFunction((HealthcareIOError<String> err) -> {
+ *               String body = err.getDataResource();
+ *               // TODO(user) insert logic to exctract type.
+ *               return params;
+ *             })
+ *             .withSearchParametersFunction((HealthcareIOError<String> err) -> {

Review comment:
       It seems like the search parameters follow [a particular syntax](https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/search#google.cloud.healthcare.v1beta1.fhir.rest.FhirService.SearchResources) - does it make sense to provide a builder class for search parameters? this may help prevent mistakes on building the query
   
   e.g:
   ```
   FhirSearchParameter.builder()
     .withBase(...)
     .withType(FhirIO.ResourceType.Patient)
     .addParameter(...)
     .withSearchModifier(...)
     .toString(); // (or .build())
   ```
   
   The con is that it may add the burden of having to add support for every new thing added to the search parameter query. It seems a bit complex, so IDK. Thoughts?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  *     output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ *     output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection<HealthcareIOError<String>> failedBundles = writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  *     BigQueryIO
  *         .write()
  *         .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  *         .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ *     .apply("Reconcile with Conditional Update",
+ *         FhirIO.ConditionalUpdate(fhirStore)
+ *             .withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ *             .withTypeFunction((HealthcareIOError<String> err) -> {
+ *               String body = err.getDataResource();
+ *               // TODO(user) insert logic to exctract type.
+ *               return params;

Review comment:
       I see that [type can be Patient or Observation?](https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create) - is this an exhaustive list? Does it make sense to have an ENUM for the types that we expect? It would prevent issues with random strings being returned?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -155,17 +168,53 @@
  * FhirIO.Write.Result writeResult =
  *     output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getExistingFhirStore()));
  *
+ * // Alternatively you could use import for high throughput to a new store.
+ * FhirIO.Write.Result writeResult =
+ *     output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore()));
+ * // [End Writing ]
+ *
  * PCollection<HealthcareIOError<String>> failedBundles = writeResult.getFailedInsertsWithErr();
  *
+ * // [Begin Writing to Dead Letter Queue]
  * failedBundles.apply("Write failed bundles to BigQuery",
  *     BigQueryIO
  *         .write()
  *         .to(option.getBQFhirExecuteBundlesDeadLetterTable())
  *         .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * // [End Writing to Dead Letter Queue]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional update
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ *     .apply("Reconcile with Conditional Update",
+ *         FhirIO.ConditionalUpdate(fhirStore)
+ *             .withFormatBodyFunction(HealthcareIOError<String>::getDataResource)
+ *             .withTypeFunction((HealthcareIOError<String> err) -> {
+ *               String body = err.getDataResource();
+ *               // TODO(user) insert logic to exctract type.
+ *               return params;
+ *             })
+ *             .withSearchParametersFunction((HealthcareIOError<String> err) -> {
+ *               String body = err.getDataResource();
+ *               Map<String, String> params = new HashMap();
+ *               // TODO(user) insert logic to exctract search query parameters.
+ *               return params;
+ *             });
+ * // [End Reconciliation with Conditional Update]
+ *
+ * // Alternatively you may want to handle DeadLetter with conditional create
+ * // [Begin Reconciliation with Conditional Update]
+ * failedBundles
+ *     .apply("Reconcile with Conditional Create",
+ *         FhirIO.CreateResources(fhirStore)
+ *             .withFormatBodyFunction(HealthcareIOError<String>::getDataResource)

Review comment:
       I wonder if this should be called `withFormatFunction` or `withFormatResourceFunction` - wdyt? (you know better than me. just brain storming to see what you think)

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -130,10 +132,18 @@
  *     FhirIO.Write.Result#getFailedBodies()} to retrieve a {@link PCollection} of {@link
  *     HealthcareIOError} containing the {@link String} that failed to be ingested and the
  *     exception.
+ *     <h3>Conditional Creating / Updating Resources</h3>
+ *     {@link FhirIO} supports interfaces for conditional update. These can be useful to handle
+ *     scenarios where an executeBundle failed. For example if you tried to create a resource that
+ *     already exists you can grab the faield bodies of your {@link FhirIO.ExecuteBundles} transform
+ *     with {@link FhirIO.Write.Result#getFailedBodies()} perform some logic on the reason for
+ *     failures and if appropriate route this to {@link FhirIO.ConditionalUpdate} or {@link
+ *     FhirIO.CreateResources} to take the appropriate action on your FHIR store.

Review comment:
       Also, perhaps link the search method documentation for the `withSearchFunction` method so users can consult the syntax..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org