You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/02 19:52:10 UTC

[GitHub] [beam] lastomato commented on a change in pull request #12721: Add deidentify for FhirIO connector

lastomato commented on a change in pull request #12721:
URL: https://github.com/apache/beam/pull/12721#discussion_r482357797



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -122,12 +124,24 @@
  * store) This requires each resource to contain a client provided ID. It is important that when
  * using import you give the appropriate permissions to the Google Cloud Healthcare Service Agent.
  *
+ * <p>ExportGcs This is to export FHIR resources from a FHIR store to Google Cloud Storage. The

Review comment:
       I think we should rename this to Export since GCS is an implementation detail here. Feel free to leave it to a follow-up PR.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -170,6 +184,21 @@
  * // Alternatively you could use import for high throughput to a new store.
  * FhirIO.Write.Result writeResult =
  *     output.apply("Import FHIR Resources", FhirIO.executeBundles(options.getNewFhirStore()));
+ *
+ * // Export FHIR resources to Google Cloud Storage.
+ * String fhirStoreName = ...;
+ * String exportGcsUriPrefix = ...;
+ * FhirIO.ExportGcs.Result exportResult =

Review comment:
       Should we return errors with exporting in the `Result`?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1212,10 +1280,7 @@ public void executeBundles(ProcessContext context) {
   public static class ExportGcs extends PTransform<PBegin, ExportGcs.Result> {
     public static final TupleTag<String> OUT = new TupleTag<String>() {};
 
-    /**
-     * Represents the result of an export, including both the successful parsed messages, and
-     * invalid ones.
-     */
+    /** Represents the result of an export, a collection of FHIR resources in ndjson. */

Review comment:
       nit: ndjson -> newline delimited JSON (ndjson)

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1297,4 +1362,67 @@ public void exportResourcesToGcs(ProcessContext context)
       }
     }
   }
+
+  /** Deidentify FHIR resources from a FHIR store to a destination FHIR store */
+  public static class Deidentify extends PTransform<PBegin, PCollection<String>> {
+    private final ValueProvider<String> sourceFhirStore;
+    private final ValueProvider<String> destinationFhirStore;
+    private final ValueProvider<DeidentifyConfig> deidConfig;
+
+    public Deidentify(
+        ValueProvider<String> sourceFhirStore,
+        ValueProvider<String> destinationFhirStore,
+        ValueProvider<DeidentifyConfig> deidConfig) {
+      this.sourceFhirStore = sourceFhirStore;
+      this.destinationFhirStore = destinationFhirStore;
+      this.deidConfig = deidConfig;
+    }
+
+    @Override
+    public PCollection<String> expand(PBegin input) {
+      return input
+          .getPipeline()
+          .apply(Create.ofProvider(sourceFhirStore, StringUtf8Coder.of()))
+          .apply(
+              "ScheduleDeidentifyFhirStoreOperations",
+              ParDo.of(new DeidentifyFn(destinationFhirStore, deidConfig)));
+    }
+
+    /** A function that schedules a deidentify operation and monitors the status. */
+    public static class DeidentifyFn extends DoFn<String, String> {
+
+      private HealthcareApiClient client;
+      private final ValueProvider<String> destinationFhirStore;
+      private final String deidConfigJson;
+
+      public DeidentifyFn(
+          ValueProvider<String> destinationFhirStore, ValueProvider<DeidentifyConfig> deidConfig) {
+        this.destinationFhirStore = destinationFhirStore;
+        Gson g = new Gson();
+        this.deidConfigJson = g.toJson(deidConfig.get());
+      }
+
+      @Setup
+      public void initClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
+      }
+
+      @ProcessElement
+      public void deidentify(ProcessContext context)
+          throws IOException, InterruptedException, HealthcareHttpException {
+        String sourceFhirStore = context.element();
+        String destinationFhirStore = this.destinationFhirStore.get();
+        Gson g = new Gson();

Review comment:
       Please cache this object, otherwise we will create a `Gson` object for each element.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1297,4 +1362,67 @@ public void exportResourcesToGcs(ProcessContext context)
       }
     }
   }
+
+  /** Deidentify FHIR resources from a FHIR store to a destination FHIR store */
+  public static class Deidentify extends PTransform<PBegin, PCollection<String>> {
+    private final ValueProvider<String> sourceFhirStore;
+    private final ValueProvider<String> destinationFhirStore;
+    private final ValueProvider<DeidentifyConfig> deidConfig;
+
+    public Deidentify(
+        ValueProvider<String> sourceFhirStore,
+        ValueProvider<String> destinationFhirStore,
+        ValueProvider<DeidentifyConfig> deidConfig) {
+      this.sourceFhirStore = sourceFhirStore;
+      this.destinationFhirStore = destinationFhirStore;
+      this.deidConfig = deidConfig;
+    }
+
+    @Override
+    public PCollection<String> expand(PBegin input) {
+      return input
+          .getPipeline()
+          .apply(Create.ofProvider(sourceFhirStore, StringUtf8Coder.of()))
+          .apply(
+              "ScheduleDeidentifyFhirStoreOperations",
+              ParDo.of(new DeidentifyFn(destinationFhirStore, deidConfig)));
+    }
+
+    /** A function that schedules a deidentify operation and monitors the status. */
+    public static class DeidentifyFn extends DoFn<String, String> {
+
+      private HealthcareApiClient client;
+      private final ValueProvider<String> destinationFhirStore;
+      private final String deidConfigJson;
+
+      public DeidentifyFn(
+          ValueProvider<String> destinationFhirStore, ValueProvider<DeidentifyConfig> deidConfig) {
+        this.destinationFhirStore = destinationFhirStore;
+        Gson g = new Gson();
+        this.deidConfigJson = g.toJson(deidConfig.get());

Review comment:
       Is DeidentifyConfig not serializable?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org