You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/04 17:19:55 UTC

[beam] branch master updated: Merge pull request #17440 from [BEAM-14329] Enable exponential backoff retries in FhirIO Execute bundle requests.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bde7ae8a0e Merge pull request #17440 from [BEAM-14329] Enable exponential backoff retries in FhirIO Execute bundle requests.
0bde7ae8a0e is described below

commit 0bde7ae8a0e16a9923a22c3031ec73fe16d7ce80
Author: fbeevikm <91...@users.noreply.github.com>
AuthorDate: Wed May 4 13:19:48 2022 -0400

    Merge pull request #17440 from [BEAM-14329] Enable exponential backoff retries in FhirIO Execute bundle requests.
---
 sdks/java/io/google-cloud-platform/build.gradle    |  1 +
 .../apache/beam/sdk/io/gcp/healthcare/FhirIO.java  |  2 +-
 .../io/gcp/healthcare/HttpHealthcareApiClient.java | 56 +++++-----------------
 .../beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java |  2 +-
 4 files changed, 16 insertions(+), 45 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 8da891d3aba..0ff6749246c 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -146,6 +146,7 @@ dependencies {
   implementation library.java.arrow_memory_core
   implementation library.java.arrow_vector
 
+  implementation 'com.google.http-client:google-http-client-gson:1.41.2'
   implementation "org.threeten:threetenbp:1.4.4"
 
   testImplementation library.java.arrow_memory_netty
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index f08ce8da166..d5facd34cef 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -1460,7 +1460,7 @@ public class FhirIO {
 
       private void parseResponse(ProcessContext context, String inputBody, HttpBody resp)
           throws JsonProcessingException {
-        JsonObject bundle = JsonParser.parseString(resp.getData()).getAsJsonObject();
+        JsonObject bundle = JsonParser.parseString(resp.toString()).getAsJsonObject();
         String bundleType = bundle.getAsJsonPrimitive(BUNDLE_TYPE_FIELD).getAsString();
         JsonArray entries = bundle.getAsJsonArray(BUNDLE_ENTRY_FIELD).getAsJsonArray();
         if (entries == null) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
index cb70069bfaf..2bf497ed526 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
@@ -23,6 +23,7 @@ import com.google.api.client.http.HttpRequest;
 import com.google.api.client.http.HttpRequestInitializer;
 import com.google.api.client.http.javanet.NetHttpTransport;
 import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.gson.GsonFactory;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.services.healthcare.v1.CloudHealthcare;
 import com.google.api.services.healthcare.v1.CloudHealthcare.Projects.Locations.Datasets.FhirStores.Fhir.PatientEverything;
@@ -75,18 +76,14 @@ import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
-import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.client.methods.RequestBuilder;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
 import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -105,9 +102,7 @@ public class HttpHealthcareApiClient implements HealthcareApiClient, Serializabl
       String.format(
           "apache-beam-io-google-cloud-platform-healthcare/%s",
           ReleaseInfo.getReleaseInfo().getSdkVersion());
-  private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json";
-  private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8";
-  private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8";
+  private static final JsonFactory PARSER = new GsonFactory();
   private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class);
   private transient CloudHealthcare client;
   private transient HttpClient httpClient;
@@ -553,45 +548,20 @@ public class HttpHealthcareApiClient implements HealthcareApiClient, Serializabl
   }
 
   @Override
-  public HttpBody executeFhirBundle(String fhirStore, String bundle)
-      throws IOException, HealthcareHttpException {
-    if (httpClient == null || client == null) {
+  public HttpBody executeFhirBundle(String fhirStore, String bundle) throws IOException {
+    if (client == null) {
       initClient();
     }
+    HttpBody httpBody = PARSER.fromString(bundle, HttpBody.class);
 
-    credentials.refreshIfExpired();
-    StringEntity requestEntity = new StringEntity(bundle, ContentType.APPLICATION_JSON);
-    URI uri;
-    try {
-      uri = new URIBuilder(client.getRootUrl() + "v1/" + fhirStore + "/fhir").build();
-    } catch (URISyntaxException e) {
-      LOG.error("URL error when making executeBundle request to FHIR API. " + e.getMessage());
-      throw new IllegalArgumentException(e);
-    }
-
-    HttpUriRequest request =
-        RequestBuilder.post()
-            .setUri(uri)
-            .setEntity(requestEntity)
-            .addHeader("Authorization", "Bearer " + credentials.getAccessToken().getTokenValue())
-            .addHeader("User-Agent", USER_AGENT)
-            .addHeader("Content-Type", FHIRSTORE_HEADER_CONTENT_TYPE)
-            .addHeader("Accept-Charset", FHIRSTORE_HEADER_ACCEPT_CHARSET)
-            .addHeader("Accept", FHIRSTORE_HEADER_ACCEPT)
-            .build();
-
-    HttpResponse response = httpClient.execute(request);
-    HttpEntity responseEntity = response.getEntity();
-    String content = EntityUtils.toString(responseEntity);
-
-    // Check 2XX code.
-    int statusCode = response.getStatusLine().getStatusCode();
-    if (!(statusCode / 100 == 2)) {
-      throw HealthcareHttpException.of(statusCode, content);
-    }
-    HttpBody responseModel = new HttpBody();
-    responseModel.setData(content);
-    return responseModel;
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .fhir()
+        .executeBundle(fhirStore, httpBody)
+        .execute();
   }
 
   /**
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
index 754e5f6a292..6bc0fbbbd7c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
@@ -95,7 +95,7 @@ class FhirIOTestUtil {
     for (String bundle : bundles) {
       HttpBody resp = client.executeFhirBundle(fhirStore, bundle);
 
-      JsonObject jsonResponse = JsonParser.parseString(resp.getData()).getAsJsonObject();
+      JsonObject jsonResponse = JsonParser.parseString(resp.toString()).getAsJsonObject();
       for (JsonElement entry : jsonResponse.getAsJsonArray("entry")) {
         String location =
             entry