You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/04 17:19:55 UTC
[beam] branch master updated: Merge pull request #17440 from [BEAM-14329] Enable exponential backoff retries in FhirIO Execute bundle requests.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0bde7ae8a0e Merge pull request #17440 from [BEAM-14329] Enable exponential backoff retries in FhirIO Execute bundle requests.
0bde7ae8a0e is described below
commit 0bde7ae8a0e16a9923a22c3031ec73fe16d7ce80
Author: fbeevikm <91...@users.noreply.github.com>
AuthorDate: Wed May 4 13:19:48 2022 -0400
Merge pull request #17440 from [BEAM-14329] Enable exponential backoff retries in FhirIO Execute bundle requests.
---
sdks/java/io/google-cloud-platform/build.gradle | 1 +
.../apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 2 +-
.../io/gcp/healthcare/HttpHealthcareApiClient.java | 56 +++++-----------------
.../beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java | 2 +-
4 files changed, 16 insertions(+), 45 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/build.gradle b/sdks/java/io/google-cloud-platform/build.gradle
index 8da891d3aba..0ff6749246c 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -146,6 +146,7 @@ dependencies {
implementation library.java.arrow_memory_core
implementation library.java.arrow_vector
+ implementation 'com.google.http-client:google-http-client-gson:1.41.2'
implementation "org.threeten:threetenbp:1.4.4"
testImplementation library.java.arrow_memory_netty
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index f08ce8da166..d5facd34cef 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -1460,7 +1460,7 @@ public class FhirIO {
private void parseResponse(ProcessContext context, String inputBody, HttpBody resp)
throws JsonProcessingException {
- JsonObject bundle = JsonParser.parseString(resp.getData()).getAsJsonObject();
+ JsonObject bundle = JsonParser.parseString(resp.toString()).getAsJsonObject();
String bundleType = bundle.getAsJsonPrimitive(BUNDLE_TYPE_FIELD).getAsString();
JsonArray entries = bundle.getAsJsonArray(BUNDLE_ENTRY_FIELD).getAsJsonArray();
if (entries == null) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
index cb70069bfaf..2bf497ed526 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
@@ -23,6 +23,7 @@ import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.gson.GsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.healthcare.v1.CloudHealthcare;
import com.google.api.services.healthcare.v1.CloudHealthcare.Projects.Locations.Datasets.FhirStores.Fhir.PatientEverything;
@@ -75,18 +76,14 @@ import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
-import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.methods.RequestBuilder;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -105,9 +102,7 @@ public class HttpHealthcareApiClient implements HealthcareApiClient, Serializabl
String.format(
"apache-beam-io-google-cloud-platform-healthcare/%s",
ReleaseInfo.getReleaseInfo().getSdkVersion());
- private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json";
- private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8";
- private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8";
+ private static final JsonFactory PARSER = new GsonFactory();
private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class);
private transient CloudHealthcare client;
private transient HttpClient httpClient;
@@ -553,45 +548,20 @@ public class HttpHealthcareApiClient implements HealthcareApiClient, Serializabl
}
@Override
- public HttpBody executeFhirBundle(String fhirStore, String bundle)
- throws IOException, HealthcareHttpException {
- if (httpClient == null || client == null) {
+ public HttpBody executeFhirBundle(String fhirStore, String bundle) throws IOException {
+ if (client == null) {
initClient();
}
+ HttpBody httpBody = PARSER.fromString(bundle, HttpBody.class);
- credentials.refreshIfExpired();
- StringEntity requestEntity = new StringEntity(bundle, ContentType.APPLICATION_JSON);
- URI uri;
- try {
- uri = new URIBuilder(client.getRootUrl() + "v1/" + fhirStore + "/fhir").build();
- } catch (URISyntaxException e) {
- LOG.error("URL error when making executeBundle request to FHIR API. " + e.getMessage());
- throw new IllegalArgumentException(e);
- }
-
- HttpUriRequest request =
- RequestBuilder.post()
- .setUri(uri)
- .setEntity(requestEntity)
- .addHeader("Authorization", "Bearer " + credentials.getAccessToken().getTokenValue())
- .addHeader("User-Agent", USER_AGENT)
- .addHeader("Content-Type", FHIRSTORE_HEADER_CONTENT_TYPE)
- .addHeader("Accept-Charset", FHIRSTORE_HEADER_ACCEPT_CHARSET)
- .addHeader("Accept", FHIRSTORE_HEADER_ACCEPT)
- .build();
-
- HttpResponse response = httpClient.execute(request);
- HttpEntity responseEntity = response.getEntity();
- String content = EntityUtils.toString(responseEntity);
-
- // Check 2XX code.
- int statusCode = response.getStatusLine().getStatusCode();
- if (!(statusCode / 100 == 2)) {
- throw HealthcareHttpException.of(statusCode, content);
- }
- HttpBody responseModel = new HttpBody();
- responseModel.setData(content);
- return responseModel;
+ return client
+ .projects()
+ .locations()
+ .datasets()
+ .fhirStores()
+ .fhir()
+ .executeBundle(fhirStore, httpBody)
+ .execute();
}
/**
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
index 754e5f6a292..6bc0fbbbd7c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTestUtil.java
@@ -95,7 +95,7 @@ class FhirIOTestUtil {
for (String bundle : bundles) {
HttpBody resp = client.executeFhirBundle(fhirStore, bundle);
- JsonObject jsonResponse = JsonParser.parseString(resp.getData()).getAsJsonObject();
+ JsonObject jsonResponse = JsonParser.parseString(resp.toString()).getAsJsonObject();
for (JsonElement entry : jsonResponse.getAsJsonArray("entry")) {
String location =
entry