You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/06/01 17:21:02 UTC
[beam] branch master updated: [BEAM-14513] Add read transform and initial healthcare client (#17748)
This is an automated email from the ASF dual-hosted git repository.
danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4e2565a2848 [BEAM-14513] Add read transform and initial healthcare client (#17748)
4e2565a2848 is described below
commit 4e2565a2848f84a28746ace0287800af3e93a2e4
Author: Lucas Nogueira <ln...@uwaterloo.ca>
AuthorDate: Wed Jun 1 17:20:56 2022 +0000
[BEAM-14513] Add read transform and initial healthcare client (#17748)
* initial fhirio go sdk commit - client + read transform
* add license comment
* improve error message
* rename import to follow convention
* add metrics
* follow import convention
* renames and simplify folder structure
* fix import to follow standard
* add comments
* improve variable naming
* adjust comment formatting
* remove unnecessary named import
* adjust naming for consistency
* use forward declarations for metrics
* improve error messages to follow go style
* improve metrics assertions
* simplify function signature
* use optimized register function
* rollback settings changes commit mistakenly
* use time.Since for staticcheck
---
sdks/go/pkg/beam/io/fhirio/common.go | 53 +++++++++++++++++
sdks/go/pkg/beam/io/fhirio/fakes_test.go | 39 +++++++++++++
sdks/go/pkg/beam/io/fhirio/read.go | 98 ++++++++++++++++++++++++++++++++
sdks/go/pkg/beam/io/fhirio/read_test.go | 90 +++++++++++++++++++++++++++++
4 files changed, 280 insertions(+)
diff --git a/sdks/go/pkg/beam/io/fhirio/common.go b/sdks/go/pkg/beam/io/fhirio/common.go
new file mode 100644
index 00000000000..8156b42ab35
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/common.go
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+ "context"
+ "net/http"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core"
+ "google.golang.org/api/healthcare/v1"
+ "google.golang.org/api/option"
+)
+
+const (
+ baseMetricPrefix = "fhirio/"
+ userAgent = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
+)
+
+type fhirStoreClient interface {
+ readResource(resourcePath string) (*http.Response, error)
+}
+
+type fhirStoreClientImpl struct {
+ fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
+}
+
+func newFhirStoreClient() *fhirStoreClientImpl {
+ healthcareService, err := healthcare.NewService(context.Background(), option.WithUserAgent(userAgent))
+ if err != nil {
+ panic("Failed to initialize Google Cloud Healthcare Service. Reason: " + err.Error())
+ }
+ return &fhirStoreClientImpl{fhirService: healthcare.NewProjectsLocationsDatasetsFhirStoresFhirService(healthcareService)}
+}
+
+func (c *fhirStoreClientImpl) readResource(resourcePath string) (*http.Response, error) {
+ return c.fhirService.Read(resourcePath).Do()
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/fakes_test.go b/sdks/go/pkg/beam/io/fhirio/fakes_test.go
new file mode 100644
index 00000000000..4e1a51aeb23
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/fakes_test.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import "net/http"
+
+type fakeFhirStoreClient struct {
+ fakeReadResources func(string) (*http.Response, error)
+}
+
+func (c *fakeFhirStoreClient) readResource(resourcePath string) (*http.Response, error) {
+ return c.fakeReadResources(resourcePath)
+}
+
+// Useful to fake the Body of a http.Response.
+type fakeReaderCloser struct {
+ fakeRead func([]byte) (int, error)
+}
+
+func (*fakeReaderCloser) Close() error {
+ return nil
+}
+
+func (m *fakeReaderCloser) Read(b []byte) (int, error) {
+ return m.fakeRead(b)
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/read.go b/sdks/go/pkg/beam/io/fhirio/read.go
new file mode 100644
index 00000000000..a710c92a869
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/read.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+ "context"
+ "io"
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+ register.DoFn4x0[context.Context, string, func(string), func(string)]((*readResourceFn)(nil))
+ register.Emitter1[string]()
+}
+
+type readResourceFn struct {
+ client fhirStoreClient
+ readResourceErrors beam.Counter
+ readResourceSuccess beam.Counter
+ readResourceLatencyMs beam.Distribution
+}
+
+func (fn readResourceFn) String() string {
+ return "readResourceFn"
+}
+
+func (fn *readResourceFn) Setup() {
+ if fn.client == nil {
+ fn.client = newFhirStoreClient()
+ }
+ fn.readResourceErrors = beam.NewCounter(fn.String(), baseMetricPrefix+"read_resource_error_count")
+ fn.readResourceSuccess = beam.NewCounter(fn.String(), baseMetricPrefix+"read_resource_success_count")
+ fn.readResourceLatencyMs = beam.NewDistribution(fn.String(), baseMetricPrefix+"read_resource_latency_ms")
+}
+
+func (fn *readResourceFn) ProcessElement(ctx context.Context, resourcePath string, emitResource, emitDeadLetter func(string)) {
+ timeBeforeReadRequest := time.Now()
+ response, err := fn.client.readResource(resourcePath)
+ fn.readResourceLatencyMs.Update(ctx, time.Since(timeBeforeReadRequest).Milliseconds())
+
+ if err != nil {
+ fn.readResourceErrors.Inc(ctx, 1)
+ emitDeadLetter(errors.Wrapf(err, "failed fetching resource [%s]", resourcePath).Error())
+ return
+ }
+
+ if response.StatusCode != 200 {
+ fn.readResourceErrors.Inc(ctx, 1)
+ emitDeadLetter(errors.Errorf("fetched resource [%s] returned bad status [%d]", resourcePath, response.StatusCode).Error())
+ return
+ }
+
+ bytes, err := io.ReadAll(response.Body)
+ if err != nil {
+ fn.readResourceErrors.Inc(ctx, 1)
+ emitDeadLetter(errors.Wrapf(err, "error reading response body of resource [%s]", resourcePath).Error())
+ return
+ }
+
+ fn.readResourceSuccess.Inc(ctx, 1)
+ emitResource(string(bytes))
+}
+
+// Read fetches resources from Google Cloud Healthcare FHIR stores based on the
+// resource path. It consumes a PCollection<string> of notifications from the
+// FHIR store of resource paths, and fetches the actual resource object on the
+// path in the notification. It outputs two PCollection<string>. The first
+// contains the fetched object as a JSON-encoded string, and the second is a
+// dead-letter with an error message, in case the object failed to be fetched.
+// See: https://cloud.google.com/healthcare-api/docs/how-tos/fhir-resources#getting_a_fhir_resource.
+func Read(s beam.Scope, resourcePaths beam.PCollection) (beam.PCollection, beam.PCollection) {
+ s = s.Scope("fhirio.Read")
+ return read(s, resourcePaths, nil)
+}
+
+func read(s beam.Scope, resourcePaths beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
+ return beam.ParDo2(s, &readResourceFn{client: client}, resourcePaths)
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/read_test.go b/sdks/go/pkg/beam/io/fhirio/read_test.go
new file mode 100644
index 00000000000..4ed9bc5ab53
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/read_test.go
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+ "errors"
+ "net/http"
+ "strings"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestRead(t *testing.T) {
+ testCases := []struct {
+ name string
+ client fhirStoreClient
+ containedError string
+ }{
+ {
+ name: "Read Request Failed",
+ client: &fakeFhirStoreClient{
+ fakeReadResources: func(resource string) (*http.Response, error) {
+ return nil, errors.New("")
+ },
+ },
+ containedError: "failed fetching resource",
+ },
+ {
+ name: "Read Request Returns Bad Status",
+ client: &fakeFhirStoreClient{
+ fakeReadResources: func(resource string) (*http.Response, error) {
+ return &http.Response{StatusCode: 403}, nil
+ },
+ },
+ containedError: "returned bad status",
+ },
+ {
+ name: "Response body fails to be parsed",
+ client: &fakeFhirStoreClient{
+ fakeReadResources: func(resource string) (*http.Response, error) {
+ return &http.Response{Body: &fakeReaderCloser{
+ fakeRead: func([]byte) (int, error) {
+ return 0, errors.New("")
+ },
+ }, StatusCode: 200}, nil
+ },
+ },
+ containedError: "error reading response body",
+ },
+ }
+
+ testResourcePaths := []string{"foo", "bar"}
+ for _, testCase := range testCases {
+ t.Run(testCase.name, func(t *testing.T) {
+ p, s, resourcePaths := ptest.CreateList(testResourcePaths)
+ resources, failedReads := read(s, resourcePaths, testCase.client)
+ passert.Empty(s, resources)
+ passert.Count(s, failedReads, "", len(testResourcePaths))
+ passert.True(s, failedReads, func(errorMsg string) bool {
+ return strings.Contains(errorMsg, testCase.containedError)
+ })
+ pipelineResult := ptest.RunAndValidate(t, p)
+ counterResults := pipelineResult.Metrics().AllMetrics().Counters()
+ if len(counterResults) != 1 {
+ t.Fatal("Only one counter should have been used")
+ }
+ if counterResults[0].Name() != "fhirio/read_resource_error_count" {
+ t.Fatal("Only error counter should have been used")
+ }
+ if counterResults[0].Result() != int64(len(testResourcePaths)) {
+ t.Fatal("Counter should have been incremented by the number of test resource paths")
+ }
+ })
+ }
+}