You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/01 21:23:27 UTC

[GitHub] [beam] trucleduc commented on a change in pull request #12721: Add deidentify for FhirIO connector

trucleduc commented on a change in pull request #12721:
URL: https://github.com/apache/beam/pull/12721#discussion_r481439815



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1297,4 +1362,67 @@ public void exportResourcesToGcs(ProcessContext context)
       }
     }
   }
+
+  /** Deidentify FHIR resources from a FHIR store to a destination FHIR store */
+  public static class Deidentify extends PTransform<PBegin, PCollection<String>> {
+    private final ValueProvider<String> sourceFhirStore;
+    private final ValueProvider<String> destinationFhirStore;
+    private final ValueProvider<DeidentifyConfig> deidConfig;
+
+    public Deidentify(
+        ValueProvider<String> sourceFhirStore,
+        ValueProvider<String> destinationFhirStore,
+        ValueProvider<DeidentifyConfig> deidConfig) {
+      this.sourceFhirStore = sourceFhirStore;
+      this.destinationFhirStore = destinationFhirStore;
+      this.deidConfig = deidConfig;
+    }
+
+    @Override
+    public PCollection<String> expand(PBegin input) {
+      return input
+          .getPipeline()
+          .apply(Create.ofProvider(sourceFhirStore, StringUtf8Coder.of()))
+          .apply(
+              "ScheduleDeidentifyFhirStoreOperations",
+              ParDo.of(new DeidentifyFn(destinationFhirStore, deidConfig)));
+    }
+
+    /** A function that schedules a deidentify operation and monitors the status. */
+    public static class DeidentifyFn extends DoFn<String, String> {
+
+      private HealthcareApiClient client;
+      private final ValueProvider<String> destinationFhirStore;
+      private final String deidConfigJson;
+
+      public DeidentifyFn(
+          ValueProvider<String> destinationFhirStore, ValueProvider<DeidentifyConfig> deidConfig) {
+        this.destinationFhirStore = destinationFhirStore;
+        Gson g = new Gson();
+        this.deidConfigJson = g.toJson(deidConfig.get());
+      }
+
+      @Setup
+      public void initClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
+      }
+
+      @ProcessElement
+      public void deidentify(ProcessContext context)
+          throws IOException, InterruptedException, HealthcareHttpException {
+        String sourceFhirStore = context.element();
+        String destinationFhirStore = this.destinationFhirStore.get();
+        Gson g = new Gson();
+        DeidentifyConfig deidConfig = g.fromJson(this.deidConfigJson, DeidentifyConfig.class);
+        Operation operation =
+            client.deidentifyFhirStore(sourceFhirStore, destinationFhirStore, deidConfig);
+        operation = client.pollOperation(operation, 1000L);
+        if (operation.getError() != null) {

Review comment:
       Yep, exactly 👍 . We're just sending operation and wait for it to complete.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org