You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/06/01 17:21:02 UTC

[beam] branch master updated: [BEAM-14513] Add read transform and initial healthcare client (#17748)

This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e2565a2848 [BEAM-14513] Add read transform and initial healthcare client (#17748)
4e2565a2848 is described below

commit 4e2565a2848f84a28746ace0287800af3e93a2e4
Author: Lucas Nogueira <ln...@uwaterloo.ca>
AuthorDate: Wed Jun 1 17:20:56 2022 +0000

    [BEAM-14513] Add read transform and initial healthcare client (#17748)
    
    * initial fhirio go sdk commit - client + read transform
    
    * add license comment
    
    * improve error message
    
    * rename import to follow convention
    
    * add metrics
    
    * follow import convention
    
    * renames and simplify folder structure
    
    * fix import to follow standard
    
    * add comments
    
    * improve variable naming
    
    * adjust comment formatting
    
    * remove unnecessary named import
    
    * adjust naming for consistency
    
    * use forward declarations for metrics
    
    * improve error messages to follow go style
    
    * improve metrics assertions
    
    * simplify function signature
    
    * use optimized register function
    
    * rollback settings changes commit mistakenly
    
    * use time.Since for staticcheck
---
 sdks/go/pkg/beam/io/fhirio/common.go     | 53 +++++++++++++++++
 sdks/go/pkg/beam/io/fhirio/fakes_test.go | 39 +++++++++++++
 sdks/go/pkg/beam/io/fhirio/read.go       | 98 ++++++++++++++++++++++++++++++++
 sdks/go/pkg/beam/io/fhirio/read_test.go  | 90 +++++++++++++++++++++++++++++
 4 files changed, 280 insertions(+)

diff --git a/sdks/go/pkg/beam/io/fhirio/common.go b/sdks/go/pkg/beam/io/fhirio/common.go
new file mode 100644
index 00000000000..8156b42ab35
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/common.go
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"net/http"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
+	"google.golang.org/api/healthcare/v1"
+	"google.golang.org/api/option"
+)
+
+const (
+	baseMetricPrefix = "fhirio/"
+	userAgent        = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
+)
+
+type fhirStoreClient interface {
+	readResource(resourcePath string) (*http.Response, error)
+}
+
+type fhirStoreClientImpl struct {
+	fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
+}
+
+func newFhirStoreClient() *fhirStoreClientImpl {
+	healthcareService, err := healthcare.NewService(context.Background(), option.WithUserAgent(userAgent))
+	if err != nil {
+		panic("Failed to initialize Google Cloud Healthcare Service. Reason: " + err.Error())
+	}
+	return &fhirStoreClientImpl{fhirService: healthcare.NewProjectsLocationsDatasetsFhirStoresFhirService(healthcareService)}
+}
+
+func (c *fhirStoreClientImpl) readResource(resourcePath string) (*http.Response, error) {
+	return c.fhirService.Read(resourcePath).Do()
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/fakes_test.go b/sdks/go/pkg/beam/io/fhirio/fakes_test.go
new file mode 100644
index 00000000000..4e1a51aeb23
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/fakes_test.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import "net/http"
+
+type fakeFhirStoreClient struct {
+	fakeReadResources func(string) (*http.Response, error)
+}
+
+func (c *fakeFhirStoreClient) readResource(resourcePath string) (*http.Response, error) {
+	return c.fakeReadResources(resourcePath)
+}
+
+// Useful to fake the Body of a http.Response.
+type fakeReaderCloser struct {
+	fakeRead func([]byte) (int, error)
+}
+
+func (*fakeReaderCloser) Close() error {
+	return nil
+}
+
+func (m *fakeReaderCloser) Read(b []byte) (int, error) {
+	return m.fakeRead(b)
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/read.go b/sdks/go/pkg/beam/io/fhirio/read.go
new file mode 100644
index 00000000000..a710c92a869
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/read.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"io"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, string, func(string), func(string)]((*readResourceFn)(nil))
+	register.Emitter1[string]()
+}
+
+type readResourceFn struct {
+	client                fhirStoreClient
+	readResourceErrors    beam.Counter
+	readResourceSuccess   beam.Counter
+	readResourceLatencyMs beam.Distribution
+}
+
+func (fn readResourceFn) String() string {
+	return "readResourceFn"
+}
+
+func (fn *readResourceFn) Setup() {
+	if fn.client == nil {
+		fn.client = newFhirStoreClient()
+	}
+	fn.readResourceErrors = beam.NewCounter(fn.String(), baseMetricPrefix+"read_resource_error_count")
+	fn.readResourceSuccess = beam.NewCounter(fn.String(), baseMetricPrefix+"read_resource_success_count")
+	fn.readResourceLatencyMs = beam.NewDistribution(fn.String(), baseMetricPrefix+"read_resource_latency_ms")
+}
+
+func (fn *readResourceFn) ProcessElement(ctx context.Context, resourcePath string, emitResource, emitDeadLetter func(string)) {
+	timeBeforeReadRequest := time.Now()
+	response, err := fn.client.readResource(resourcePath)
+	fn.readResourceLatencyMs.Update(ctx, time.Since(timeBeforeReadRequest).Milliseconds())
+
+	if err != nil {
+		fn.readResourceErrors.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "failed fetching resource [%s]", resourcePath).Error())
+		return
+	}
+
+	if response.StatusCode != 200 {
+		fn.readResourceErrors.Inc(ctx, 1)
+		emitDeadLetter(errors.Errorf("fetched resource [%s] returned bad status [%d]", resourcePath, response.StatusCode).Error())
+		return
+	}
+
+	bytes, err := io.ReadAll(response.Body)
+	if err != nil {
+		fn.readResourceErrors.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error reading response body of resource [%s]", resourcePath).Error())
+		return
+	}
+
+	fn.readResourceSuccess.Inc(ctx, 1)
+	emitResource(string(bytes))
+}
+
+// Read fetches resources from Google Cloud Healthcare FHIR stores based on the
+// resource path. It consumes a PCollection<string> of notifications from the
+// FHIR store of resource paths, and fetches the actual resource object on the
+// path in the notification. It outputs two PCollection<string>. The first
+// contains the fetched object as a JSON-encoded string, and the second is a
+// dead-letter with an error message, in case the object failed to be fetched.
+// See: https://cloud.google.com/healthcare-api/docs/how-tos/fhir-resources#getting_a_fhir_resource.
+func Read(s beam.Scope, resourcePaths beam.PCollection) (beam.PCollection, beam.PCollection) {
+	s = s.Scope("fhirio.Read")
+	return read(s, resourcePaths, nil)
+}
+
+func read(s beam.Scope, resourcePaths beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
+	return beam.ParDo2(s, &readResourceFn{client: client}, resourcePaths)
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/read_test.go b/sdks/go/pkg/beam/io/fhirio/read_test.go
new file mode 100644
index 00000000000..4ed9bc5ab53
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/read_test.go
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+	"errors"
+	"net/http"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestRead(t *testing.T) {
+	testCases := []struct {
+		name           string
+		client         fhirStoreClient
+		containedError string
+	}{
+		{
+			name: "Read Request Failed",
+			client: &fakeFhirStoreClient{
+				fakeReadResources: func(resource string) (*http.Response, error) {
+					return nil, errors.New("")
+				},
+			},
+			containedError: "failed fetching resource",
+		},
+		{
+			name: "Read Request Returns Bad Status",
+			client: &fakeFhirStoreClient{
+				fakeReadResources: func(resource string) (*http.Response, error) {
+					return &http.Response{StatusCode: 403}, nil
+				},
+			},
+			containedError: "returned bad status",
+		},
+		{
+			name: "Response body fails to be parsed",
+			client: &fakeFhirStoreClient{
+				fakeReadResources: func(resource string) (*http.Response, error) {
+					return &http.Response{Body: &fakeReaderCloser{
+						fakeRead: func([]byte) (int, error) {
+							return 0, errors.New("")
+						},
+					}, StatusCode: 200}, nil
+				},
+			},
+			containedError: "error reading response body",
+		},
+	}
+
+	testResourcePaths := []string{"foo", "bar"}
+	for _, testCase := range testCases {
+		t.Run(testCase.name, func(t *testing.T) {
+			p, s, resourcePaths := ptest.CreateList(testResourcePaths)
+			resources, failedReads := read(s, resourcePaths, testCase.client)
+			passert.Empty(s, resources)
+			passert.Count(s, failedReads, "", len(testResourcePaths))
+			passert.True(s, failedReads, func(errorMsg string) bool {
+				return strings.Contains(errorMsg, testCase.containedError)
+			})
+			pipelineResult := ptest.RunAndValidate(t, p)
+			counterResults := pipelineResult.Metrics().AllMetrics().Counters()
+			if len(counterResults) != 1 {
+				t.Fatal("Only one counter should have been used")
+			}
+			if counterResults[0].Name() != "fhirio/read_resource_error_count" {
+				t.Fatal("Only error counter should have been used")
+			}
+			if counterResults[0].Result() != int64(len(testResourcePaths)) {
+				t.Fatal("Counter should have been incremented by the number of test resource paths")
+			}
+		})
+	}
+}