You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/13 20:17:59 UTC

[GitHub] [beam] pabloem commented on a change in pull request #11339: [BEAM-9468] Fhir io

pabloem commented on a change in pull request #11339:
URL: https://github.com/apache/beam/pull/11339#discussion_r424706218



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -0,0 +1,1107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import com.google.api.services.healthcare.v1beta1.model.Operation;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Wait;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.codehaus.jackson.JsonProcessingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link FhirIO} provides an API for reading and writing resources to <a
+ * href="https://cloud.google.com/healthcare/docs/concepts/fhir">Google Cloud Healthcare Fhir API.
+ * </a>
+ *
+ * <p>Read
+ *
+ * <p>FHIR resources can be read with {@link FhirIO.Read} supports use cases where you have a
+ * ${@link PCollection} of message IDS. This is appropriate for reading the Fhir notifications from
+ * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
+ * prepared list of messages that you need to process (e.g. in a text file read with {@link
+ * org.apache.beam.sdk.io.TextIO}*) .
+ *
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
+ * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieved a
+ * {@link PCollection} containing the successfully fetched {@link String}s and/or {@link
+ * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link
+ * HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a
+ * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
+ * choosing. This error handling is mainly to transparently surface errors where the upstream {@link
+ * PCollection}* contains IDs that are not valid or are not reachable due to permissions issues.
+ *
+ * <p>Write Resources can be written to FHIR with two different methods: Import or Execute Bundle.
+ *
+ * <p>Execute Bundle This is best for use cases where you are writing to a non-empty FHIR store with
+ * other clients or otherwise need referential integrity (e.g. A Streaming HL7v2 to FHIR ETL
+ * pipeline).
+ *
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>
+ *     <p>Import This is best for use cases where you are populating an empty FHIR store with no
+ *     other clients. It is faster than the execute bundles method but does not respect referential
+ *     integrity and the resources are not written transactionally (e.g. a historicaly backfill on a
+ *     new FHIR store) This requires each resource to contain a client provided ID. It is important
+ *     that when using import you give the appropriate permissions to the Google Cloud Healthcare
+ *     Service Agent
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/how-tos/permissions-healthcare-api-gcp-products#fhir_store_cloud_storage_permissions></a>
+ * @see <a
+ *     href=>https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores/import></a>
+ *     A {@link PCollection} of {@link String} can be ingested into an Fhir store using {@link
+ *     FhirIO.Write#fhirStoresImport(String, String, String, FhirIO.Import.ContentStructure)} This
+ *     will return a {@link FhirIO.Write.Result} on which you can call {@link
+ *     FhirIO.Write.Result#getFailedBodies()} to retrieve a {@link PCollection} of {@link
+ *     HealthcareIOError} containing the {@link String} that failed to be ingested and the
+ *     exception.
+ *     <p>Example
+ *     <pre>{@code
+ * Pipeline pipeline = ...
+ *
+ * // Tail the FHIR store by retrieving resources based on Pub/Sub notifications.
+ * FhirIO.Read.Result readResult = p
+ *   .apply("Read FHIR notifications",
+ *     PubsubIO.readStrings().fromSubscription(options.getNotificationSubscription()))
+ *   .apply(FhirIO.readResources());
+ *
+ * // happily retrived messages
+ * PCollection<String> resources = readResult.getResources();
+ * // message IDs that couldn't be retrieved + error context
+ * PCollection<HealthcareIOError<String>> failedReads = readResult.getFailedReads();
+ *
+ * failedReads.apply("Write Message IDs / Stacktrace for Failed Reads to BigQuery",
+ *     BigQueryIO
+ *         .write()
+ *         .to(option.getBQFhirExecuteBundlesDeadLetterTable())
+ *         .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ *
+ * output = resources.apply("Happy path transformations", ...);
+ * FhirIO.Write.Result writeResult =
+ *     output.apply("Execute FHIR Bundles", FhirIO.executeBundles(options.getFhirStore()));
+ *
+ * PCollection<HealthcareIOError<String>> failedBundles = writeResult.getFailedInsertsWithErr();
+ *
+ * failedBundles.apply("Write failed bundles to BigQuery",
+ *     BigQueryIO
+ *         .write()
+ *         .to(option.getBQFhirExecuteBundlesDeadLetterTable())
+ *         .withFormatFunction(new HealthcareIOErrorToTableRow()));
+ * }***
+ * </pre>
+ */
+public class FhirIO {
+
+  /**
+   * Read resources from a PCollection of resource IDs (e.g. when subscribing the pubsub
+   * notifications)
+   *
+   * @return the read
+   * @see Read
+   */
+  public static Read readResources() {
+    return new Read();
+  }
+
+  /**
+   * Import resources. Intended for use on empty FHIR stores
+   *
+   * @param fhirStore the fhir store
+   * @param tempDir the temp dir
+   * @param deadLetterDir the dead letter dir
+   * @param contentStructure the content structure
+   * @return the import
+   * @see Import
+   */
+  public static Import importResources(
+      String fhirStore,
+      String tempDir,
+      String deadLetterDir,
+      @Nullable FhirIO.Import.ContentStructure contentStructure) {
+    return new Import(fhirStore, tempDir, deadLetterDir, contentStructure);
+  }
+
+  /**
+   * Import resources. Intended for use on empty FHIR stores
+   *
+   * @param fhirStore the fhir store
+   * @param tempDir the temp dir
+   * @param deadLetterDir the dead letter dir
+   * @param contentStructure the content structure
+   * @return the import
+   * @see Import
+   */
+  public static Import importResources(
+      ValueProvider<String> fhirStore,
+      ValueProvider<String> tempDir,
+      ValueProvider<String> deadLetterDir,
+      @Nullable FhirIO.Import.ContentStructure contentStructure) {
+    return new Import(fhirStore, tempDir, deadLetterDir, contentStructure);
+  }
+
+  /** The type Read. */
+  public static class Read extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+    /** Instantiates a new Read. */
+    public Read() {}
+
+    /** The type Result. */
+    public static class Result implements POutput, PInput {
+      private PCollection<String> resources;
+
+      private PCollection<HealthcareIOError<String>> failedReads;
+      /** The Pct. */
+      PCollectionTuple pct;
+
+      /**
+       * Create FhirIO.Read.Result form PCollectionTuple with OUT and DEAD_LETTER tags.
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      public static FhirIO.Read.Result of(PCollectionTuple pct) throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(OUT).and(DEAD_LETTER))) {
+          return new FhirIO.Read.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the FhirIO.Read.OUT "
+                  + "and FhirIO.Read.DEAD_LETTER tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.resources = pct.get(OUT);
+        this.failedReads =
+            pct.get(DEAD_LETTER).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+      }
+
+      /**
+       * Gets failed reads.
+       *
+       * @return the failed reads
+       */
+      public PCollection<HealthcareIOError<String>> getFailedReads() {
+        return failedReads;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getResources() {
+        return resources;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(OUT, resources);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /** The tag for the main output of Fhir Messages. */
+    public static final TupleTag<String> OUT = new TupleTag<String>() {};
+    /** The tag for the deadletter output of Fhir Messages. */
+    public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+        new TupleTag<HealthcareIOError<String>>() {};
+
+    @Override
+    public FhirIO.Read.Result expand(PCollection<String> input) {
+      return input.apply("Fetch Fhir messages", new FetchResourceJsonString());
+    }
+
+    /**
+     * DoFn to fetch a resource from an Google Cloud Healthcare FHIR store based on resourceID
+     *
+     * <p>This DoFn consumes a {@link PCollection} of notifications {@link String}s from the FHIR
+     * store, and fetches the actual {@link String} object based on the id in the notification and
+     * will output a {@link PCollectionTuple} which contains the output and dead-letter {@link
+     * PCollection}*.
+     *
+     * <p>The {@link PCollectionTuple} output will contain the following {@link PCollection}:
+     *
+     * <ul>
+     *   <li>{@link FhirIO.Read#OUT} - Contains all {@link PCollection} records successfully read
+     *       from the Fhir store.
+     *   <li>{@link FhirIO.Read#DEAD_LETTER} - Contains all {@link PCollection} of {@link
+     *       HealthcareIOError}* of message IDs which failed to be fetched from the Fhir store, with
+     *       error message and stacktrace.
+     * </ul>
+     */
+    public static class FetchResourceJsonString
+        extends PTransform<PCollection<String>, FhirIO.Read.Result> {
+
+      /** Instantiates a new Fetch Fhir message DoFn. */
+      public FetchResourceJsonString() {}
+
+      @Override
+      public FhirIO.Read.Result expand(PCollection<String> resourceIds) {
+        return new FhirIO.Read.Result(
+            resourceIds.apply(
+                ParDo.of(new FetchResourceJsonString.StringGetFn())
+                    .withOutputTags(FhirIO.Read.OUT, TupleTagList.of(FhirIO.Read.DEAD_LETTER))));
+      }
+
+      /** DoFn for fetching messages from the Fhir store with error handling. */
+      public static class StringGetFn extends DoFn<String, String> {
+
+        private Counter failedMessageGets =
+            Metrics.counter(FetchResourceJsonString.StringGetFn.class, "failed-message-reads");
+        private static final Logger LOG =
+            LoggerFactory.getLogger(FetchResourceJsonString.StringGetFn.class);
+        private final Counter successfulStringGets =
+            Metrics.counter(
+                FetchResourceJsonString.StringGetFn.class, "successful-hl7v2-message-gets");
+        private HealthcareApiClient client;
+        private ObjectMapper mapper;
+
+        /** Instantiates a new Hl 7 v 2 message get fn. */
+        StringGetFn() {}
+
+        /**
+         * Instantiate healthcare client.
+         *
+         * @throws IOException the io exception
+         */
+        @Setup
+        public void instantiateHealthcareClient() throws IOException {
+          this.client = new HttpHealthcareApiClient();
+          this.mapper = new ObjectMapper();
+        }
+
+        /**
+         * Process element.
+         *
+         * @param context the context
+         */
+        @ProcessElement
+        public void processElement(ProcessContext context) {
+          String resourceId = context.element();
+          try {
+            context.output(fetchResource(this.client, resourceId));

Review comment:
       You could perform multiple `Fhir.read` requests from separate threads? This may be fine for now, though, and if you find that the extra optimization will help, then add it later on?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org