You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/18 20:35:59 UTC

[GitHub] [beam] pabloem commented on a change in pull request #15936: FhirIO GetPatientEverything connector

pabloem commented on a change in pull request #15936:
URL: https://github.com/apache/beam/pull/15936#discussion_r752594621



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -94,20 +96,36 @@
  *
  * <h3>Reading</h3>
  *
- * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have
- * a ${@link PCollection} of message IDs. This is appropriate for reading the Fhir notifications
- * from a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a
- * manually prepared list of messages that you need to process (e.g. in a text file read with {@link
+ * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have a
+ * ${@link PCollection} of FHIR resource names in the format of projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+ * This is appropriate for reading the Fhir notifications from
+ * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
+ * prepared list of resources that you need to process (e.g. in a text file read with {@link
  * org.apache.beam.sdk.io.TextIO}*) .
  *
- * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID
- * strings {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to
- * retrieve a {@link PCollection} containing the successfully fetched {@link String}s and/or {@link
- * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link
- * HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a
+ * <<<<<<< HEAD

Review comment:
       oops there seem to be some issues with the merged result of the file

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverything.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.FhirIO.BASE_METRIC_PREFIX;
+
+import com.google.gson.JsonArray;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIOPatientEverything.PatientEverythingParameter;
+import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The type FhirIOPatientEverything for querying a FHIR Patient resource's compartment. * */
+public class FhirIOPatientEverything
+    extends PTransform<PCollection<PatientEverythingParameter>, FhirIOPatientEverything.Result> {
+
+  /** The tag for the main output of FHIR Resources from a GetPatientEverything request. */
+  public static final TupleTag<JsonArray> OUT = new TupleTag<JsonArray>() {};
+  /** The tag for the deadletter output of FHIR Resources from a GetPatientEverything request. */
+  public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+      new TupleTag<HealthcareIOError<String>>() {};
+
+  /**
+   * PatientEverythingParameter defines required attributes for a FHIR GetPatientEverything request
+   * in {@link FhirIOPatientEverything}. *
+   */
+  @DefaultCoder(PatientEverythingParameterCoder.class)

Review comment:
       you may be able to do `@DefaultSchema(JavaBeanSchema.class)` for this - the advantage is that you would not have to add your own coder (though you may need to add setters and getters or constructor - or you can make it an autovalue class and use `@DefaultSchema(AutoValueSchema.class)`). The other advantage is that if it has a schema, it can be used easily as a cross-language transform, and users can use it with other transforms that operate on PCollections with schema.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverything.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.FhirIO.BASE_METRIC_PREFIX;
+
+import com.google.gson.JsonArray;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIOPatientEverything.PatientEverythingParameter;
+import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The type FhirIOPatientEverything for querying a FHIR Patient resource's compartment. * */
+public class FhirIOPatientEverything
+    extends PTransform<PCollection<PatientEverythingParameter>, FhirIOPatientEverything.Result> {
+
+  /** The tag for the main output of FHIR Resources from a GetPatientEverything request. */
+  public static final TupleTag<JsonArray> OUT = new TupleTag<JsonArray>() {};
+  /** The tag for the deadletter output of FHIR Resources from a GetPatientEverything request. */
+  public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+      new TupleTag<HealthcareIOError<String>>() {};
+
+  /**
+   * PatientEverythingParameter defines required attributes for a FHIR GetPatientEverything request
+   * in {@link FhirIOPatientEverything}. *
+   */
+  @DefaultCoder(PatientEverythingParameterCoder.class)
+  public static class PatientEverythingParameter implements Serializable {
+
+    /**
+     * FHIR Patient resource name in the format of
+     * projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+     */
+    private final String resourceName;
+    /** Optional filters for the request, eg. start, end, _type, _since, _count */
+    private final @Nullable Map<String, String> filters;
+
+    PatientEverythingParameter(String resourceName, @Nullable Map<String, String> filters) {
+      this.resourceName = resourceName;
+      this.filters = filters;
+    }
+
+    /**
+     * Creates a PatientEverythingParameter.
+     *
+     * @param resourceName The FHIR Patient resource name in the format of
+     *     projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+     * @return the PatientEverythingParameter
+     */
+    public static PatientEverythingParameter of(String resourceName) {
+      return new PatientEverythingParameter(resourceName, null);
+    }
+
+    /**
+     * Creates a PatientEverythingParameter.
+     *
+     * @param resourceName The FHIR Patient resource name in the format of
+     *     projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+     * @param filters Optional filters for the request.
+     * @return the PatientEverythingParameter
+     */
+    public static PatientEverythingParameter of(
+        String resourceName, @Nullable Map<String, String> filters) {
+      return new PatientEverythingParameter(resourceName, filters);
+    }
+
+    public String getResourceName() {
+      return resourceName;
+    }
+
+    public @Nullable Map<String, String> getFilters() {
+      return filters;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      PatientEverythingParameter that = (PatientEverythingParameter) o;
+      return Objects.equals(resourceName, that.getResourceName())
+          && Objects.equals(filters, that.getFilters());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(resourceName, filters);
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "FhirIOPatientEverything.Parameter{resourcePath='%s', filters='%s'}'",
+          resourceName, filters);
+    }
+  }
+
+  /** PatientEverythingParameterCoder is a coder for {@link PatientEverythingParameter}. */
+  public static class PatientEverythingParameterCoder
+      extends CustomCoder<PatientEverythingParameter> {
+
+    private static final NullableCoder<String> STRING_CODER =
+        NullableCoder.of(StringUtf8Coder.of());
+    private static final NullableCoder<Map<String, String>> MAP_CODER =
+        NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+    public static PatientEverythingParameterCoder of() {
+      return new PatientEverythingParameterCoder();
+    }
+
+    @Override
+    public void encode(PatientEverythingParameter value, OutputStream outStream)
+        throws IOException {
+      STRING_CODER.encode(value.getResourceName(), outStream);
+      MAP_CODER.encode(value.getFilters(), outStream);
+    }
+
+    @Override
+    public PatientEverythingParameter decode(InputStream inStream) throws IOException {
+      String resourceName = STRING_CODER.decode(inStream);
+      Map<String, String> queries = MAP_CODER.decode(inStream);
+      return PatientEverythingParameter.of(resourceName, queries);
+    }
+  }
+
+  /** The Result for a {@link FhirIOPatientEverything} request. */
+  public static class Result implements POutput, PInput {
+
+    private final PCollection<JsonArray> patientCompartments;
+    private final PCollection<HealthcareIOError<String>> failedReads;
+
+    PCollectionTuple pct;
+
+    /**
+     * Create FhirIOPatientEverything.Result form PCollectionTuple with OUT and DEAD_LETTER tags.
+     *
+     * @param pct the pct
+     * @return the patient everything result
+     * @throws IllegalArgumentException the illegal argument exception
+     */
+    static Result of(PCollectionTuple pct) throws IllegalArgumentException {
+      if (pct.has(OUT) && pct.has(DEAD_LETTER)) {
+        return new Result(pct);
+      } else {
+        throw new IllegalArgumentException(
+            "The PCollection tuple must have the FhirIOPatientEverything.OUT "
+                + "and FhirIOPatientEverything.DEAD_LETTER tuple tags");
+      }
+    }
+
+    private Result(PCollectionTuple pct) {
+      this.pct = pct;
+      this.patientCompartments = pct.get(OUT).setCoder(JsonArrayCoder.of());
+      this.failedReads =
+          pct.get(DEAD_LETTER).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+    }
+
+    /**
+     * Gets failed reads.
+     *
+     * @return the failed reads
+     */
+    public PCollection<HealthcareIOError<String>> getFailedReads() {
+      return failedReads;
+    }
+
+    /**
+     * Gets the patient compartment responses for GetPatientEverything requests.
+     *
+     * @return the read patient compartments
+     */
+    public PCollection<JsonArray> getPatientCompartments() {
+      return patientCompartments;
+    }
+
+    @Override
+    public Pipeline getPipeline() {
+      return this.pct.getPipeline();
+    }
+
+    @Override
+    public Map<TupleTag<?>, PValue> expand() {
+      return ImmutableMap.of(OUT, patientCompartments, DEAD_LETTER, failedReads);
+    }
+
+    @Override
+    public void finishSpecifyingOutput(
+        String transformName, PInput input, PTransform<?, ?> transform) {}
+  }
+
+  @Override
+  public Result expand(PCollection<PatientEverythingParameter> input) {
+    PCollectionTuple results =
+        input.apply(
+            "GetPatientEverything",
+            ParDo.of(new GetPatientEverythingFn())
+                .withOutputTags(OUT, TupleTagList.of(DEAD_LETTER)));
+    return new Result(results);
+  }
+
+  /** GetPatientEverythingFn executes a GetPatientEverything request. */
+  static class GetPatientEverythingFn extends DoFn<PatientEverythingParameter, JsonArray> {
+
+    private static final Counter GET_PATIENT_EVERYTHING_ERROR_COUNT =
+        Metrics.counter(
+            GetPatientEverythingFn.class,
+            BASE_METRIC_PREFIX + "get_patient_everything_error_count");
+    private static final Counter GET_PATIENT_EVERYTHING_SUCCESS_COUNT =
+        Metrics.counter(
+            GetPatientEverythingFn.class,
+            BASE_METRIC_PREFIX + "get_patient_everything_success_count");
+    private static final Distribution GET_PATIENT_EVERYTHING_LATENCY_MS =
+        Metrics.distribution(
+            GetPatientEverythingFn.class, BASE_METRIC_PREFIX + "get_patient_everything_latency_ms");
+    private static final Logger LOG = LoggerFactory.getLogger(GetPatientEverythingFn.class);
+
+    @SuppressWarnings("initialization.fields.uninitialized")
+    private HealthcareApiClient client;
+
+    /**
+     * Instantiate healthcare client.
+     *
+     * @throws IOException the io exception
+     */
+    @Setup
+    public void instantiateHealthcareClient() throws IOException {
+      this.client = new HttpHealthcareApiClient();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      PatientEverythingParameter patientEverythingParameter = context.element();
+      try {
+        context.output(

Review comment:
       if you want to parallelize the records of a single bundle, you could run something like that:
   
   ```
   @StartBundle
   tpe = new ThreadPoolExecutor();
   
   @ProcessBundle
   future = tpe.submit(param -> getPatientEverything(....))
   futures.add(future)
   
   @FinishBundle
   for (f : futures) {
     // collect outputs or errors
   }
   ```
   
   for other healthcare IO we decided to not parallelize things - and this may be a fair choice in this case - but I'm just curious if you think this may be useful

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatientEverything.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.FhirIO.BASE_METRIC_PREFIX;
+
+import com.google.gson.JsonArray;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CustomCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.MapCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIOPatientEverything.PatientEverythingParameter;
+import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.FhirResourcePagesIterator;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The type FhirIOPatientEverything for querying a FHIR Patient resource's compartment. * */
+public class FhirIOPatientEverything
+    extends PTransform<PCollection<PatientEverythingParameter>, FhirIOPatientEverything.Result> {
+
+  /** The tag for the main output of FHIR Resources from a GetPatientEverything request. */
+  public static final TupleTag<JsonArray> OUT = new TupleTag<JsonArray>() {};
+  /** The tag for the deadletter output of FHIR Resources from a GetPatientEverything request. */
+  public static final TupleTag<HealthcareIOError<String>> DEAD_LETTER =
+      new TupleTag<HealthcareIOError<String>>() {};
+
+  /**
+   * PatientEverythingParameter defines required attributes for a FHIR GetPatientEverything request
+   * in {@link FhirIOPatientEverything}. *
+   */
+  @DefaultCoder(PatientEverythingParameterCoder.class)
+  public static class PatientEverythingParameter implements Serializable {
+
+    /**
+     * FHIR Patient resource name in the format of
+     * projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+     */
+    private final String resourceName;
+    /** Optional filters for the request, eg. start, end, _type, _since, _count */
+    private final @Nullable Map<String, String> filters;
+
+    PatientEverythingParameter(String resourceName, @Nullable Map<String, String> filters) {
+      this.resourceName = resourceName;
+      this.filters = filters;
+    }
+
+    /**
+     * Creates a PatientEverythingParameter.
+     *
+     * @param resourceName The FHIR Patient resource name in the format of
+     *     projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+     * @return the PatientEverythingParameter
+     */
+    public static PatientEverythingParameter of(String resourceName) {
+      return new PatientEverythingParameter(resourceName, null);
+    }
+
+    /**
+     * Creates a PatientEverythingParameter.
+     *
+     * @param resourceName The FHIR Patient resource name in the format of
+     *     projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}.
+     * @param filters Optional filters for the request.
+     * @return the PatientEverythingParameter
+     */
+    public static PatientEverythingParameter of(
+        String resourceName, @Nullable Map<String, String> filters) {
+      return new PatientEverythingParameter(resourceName, filters);
+    }
+
+    public String getResourceName() {
+      return resourceName;
+    }
+
+    public @Nullable Map<String, String> getFilters() {
+      return filters;
+    }
+
+    @Override
+    public boolean equals(@Nullable Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      PatientEverythingParameter that = (PatientEverythingParameter) o;
+      return Objects.equals(resourceName, that.getResourceName())
+          && Objects.equals(filters, that.getFilters());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(resourceName, filters);
+    }
+
+    @Override
+    public String toString() {
+      return String.format(
+          "FhirIOPatientEverything.Parameter{resourcePath='%s', filters='%s'}'",
+          resourceName, filters);
+    }
+  }
+
+  /** PatientEverythingParameterCoder is a coder for {@link PatientEverythingParameter}. */
+  public static class PatientEverythingParameterCoder
+      extends CustomCoder<PatientEverythingParameter> {
+
+    private static final NullableCoder<String> STRING_CODER =
+        NullableCoder.of(StringUtf8Coder.of());
+    private static final NullableCoder<Map<String, String>> MAP_CODER =
+        NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+
+    public static PatientEverythingParameterCoder of() {
+      return new PatientEverythingParameterCoder();
+    }
+
+    @Override
+    public void encode(PatientEverythingParameter value, OutputStream outStream)
+        throws IOException {
+      STRING_CODER.encode(value.getResourceName(), outStream);
+      MAP_CODER.encode(value.getFilters(), outStream);
+    }
+
+    @Override
+    public PatientEverythingParameter decode(InputStream inStream) throws IOException {
+      String resourceName = STRING_CODER.decode(inStream);
+      Map<String, String> queries = MAP_CODER.decode(inStream);
+      return PatientEverythingParameter.of(resourceName, queries);
+    }
+  }
+
+  /** The Result for a {@link FhirIOPatientEverything} request. */
+  public static class Result implements POutput, PInput {
+
+    private final PCollection<JsonArray> patientCompartments;
+    private final PCollection<HealthcareIOError<String>> failedReads;
+
+    PCollectionTuple pct;
+
+    /**
+     * Create FhirIOPatientEverything.Result form PCollectionTuple with OUT and DEAD_LETTER tags.
+     *
+     * @param pct the pct
+     * @return the patient everything result
+     * @throws IllegalArgumentException the illegal argument exception
+     */
+    static Result of(PCollectionTuple pct) throws IllegalArgumentException {
+      if (pct.has(OUT) && pct.has(DEAD_LETTER)) {
+        return new Result(pct);
+      } else {
+        throw new IllegalArgumentException(
+            "The PCollection tuple must have the FhirIOPatientEverything.OUT "
+                + "and FhirIOPatientEverything.DEAD_LETTER tuple tags");
+      }
+    }
+
+    private Result(PCollectionTuple pct) {
+      this.pct = pct;
+      this.patientCompartments = pct.get(OUT).setCoder(JsonArrayCoder.of());
+      this.failedReads =
+          pct.get(DEAD_LETTER).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+    }
+
+    /**
+     * Gets failed reads.
+     *
+     * @return the failed reads
+     */
+    public PCollection<HealthcareIOError<String>> getFailedReads() {
+      return failedReads;
+    }
+
+    /**
+     * Gets the patient compartment responses for GetPatientEverything requests.
+     *
+     * @return the read patient compartments
+     */
+    public PCollection<JsonArray> getPatientCompartments() {
+      return patientCompartments;
+    }
+
+    @Override
+    public Pipeline getPipeline() {
+      return this.pct.getPipeline();
+    }
+
+    @Override
+    public Map<TupleTag<?>, PValue> expand() {
+      return ImmutableMap.of(OUT, patientCompartments, DEAD_LETTER, failedReads);
+    }
+
+    @Override
+    public void finishSpecifyingOutput(
+        String transformName, PInput input, PTransform<?, ?> transform) {}
+  }
+
+  @Override
+  public Result expand(PCollection<PatientEverythingParameter> input) {
+    PCollectionTuple results =
+        input.apply(
+            "GetPatientEverything",
+            ParDo.of(new GetPatientEverythingFn())
+                .withOutputTags(OUT, TupleTagList.of(DEAD_LETTER)));
+    return new Result(results);
+  }
+
+  /** GetPatientEverythingFn executes a GetPatientEverything request. */
+  static class GetPatientEverythingFn extends DoFn<PatientEverythingParameter, JsonArray> {
+
+    private static final Counter GET_PATIENT_EVERYTHING_ERROR_COUNT =
+        Metrics.counter(
+            GetPatientEverythingFn.class,
+            BASE_METRIC_PREFIX + "get_patient_everything_error_count");
+    private static final Counter GET_PATIENT_EVERYTHING_SUCCESS_COUNT =
+        Metrics.counter(
+            GetPatientEverythingFn.class,
+            BASE_METRIC_PREFIX + "get_patient_everything_success_count");
+    private static final Distribution GET_PATIENT_EVERYTHING_LATENCY_MS =
+        Metrics.distribution(
+            GetPatientEverythingFn.class, BASE_METRIC_PREFIX + "get_patient_everything_latency_ms");
+    private static final Logger LOG = LoggerFactory.getLogger(GetPatientEverythingFn.class);
+
+    @SuppressWarnings("initialization.fields.uninitialized")
+    private HealthcareApiClient client;
+
+    /**
+     * Instantiate healthcare client.
+     *
+     * @throws IOException the io exception
+     */
+    @Setup
+    public void instantiateHealthcareClient() throws IOException {
+      this.client = new HttpHealthcareApiClient();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      PatientEverythingParameter patientEverythingParameter = context.element();
+      try {
+        context.output(
+            getPatientEverything(
+                patientEverythingParameter.getResourceName(),
+                patientEverythingParameter.getFilters()));
+      } catch (IllegalArgumentException | NoSuchElementException e) {

Review comment:
       IIUC - if there are any other exceptions, they will be thrown out right? And retried by the runner however it chooses to - correct?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org