You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/07/01 01:04:57 UTC

[beam] branch master updated: Fix #21977: Add Search transform to Go FhirIO (#21979)

This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 32efddcf6d7 Fix #21977: Add Search transform to Go FhirIO (#21979)
32efddcf6d7 is described below

commit 32efddcf6d716412ef3c4878ea95229d35bba5f5
Author: Lucas Nogueira <ln...@uwaterloo.ca>
AuthorDate: Thu Jun 30 21:04:50 2022 -0400

    Fix #21977: Add Search transform to Go FhirIO (#21979)
    
    * parent 2574f9106c75e413310dcb7756952889da3fe7cd
    author Lucas Nogueira <ln...@uwaterloo.ca> 1655155614 +0000
    committer Lucas Nogueira <ln...@uwaterloo.ca> 1655882203 +0000
    
    parent 2574f9106c75e413310dcb7756952889da3fe7cd
    author Lucas Nogueira <ln...@uwaterloo.ca> 1655155614 +0000
    committer Lucas Nogueira <ln...@uwaterloo.ca> 1655882058 +0000
    
    squashed rebase conflict commits
    
    add execute bundles transform with integration test
    
    adjust import to follow convention
    
    fix variable scope
    
    adjust read unit tests
    
    add comment to execute bundles transform
    
    include error reason
    
    make variable exported to fix integration test
    
    adjust integration tests after merge
    
    update license comment
    
    add comment explaining the purpose of unexported transform function
    
    remove unnecessary generic usage
    
    remove coded mistakenly added
    
    improve transaction vs batch bundle comment
    
    use net/http constants instead of hardcoded values
    
    adjust import spacing
    
    improve error message
    
    * add search transform
    
    * extract content of lambda to function
    
    * add integration tests
    
    * improve naming and fix IDE warnings
    
    * use more appropriate function to check for status
    
    * add documentation and improve member variable name
    
    * add unit tests
    
    * use while loop style to shorten line
    
    * run integration tests in same pipeline
    
    * fix integration test on dataflow and make it cleaner
    
    * add pagination test and improve test utils
    
    * fix search request when resourceType is empty
    
    * improve readability of pagination logic
    
    * log upon nondetrimental errors
    
    * add retry mechanism to prevent flaky test results
    
    * remove backoff retry on read transform test since it should never be flaky
    
    * add dummy content to resource to improve readability
    
    * adjust spacing for consistency
---
 sdks/go/data/fhir_bundles/transaction-success.json |  86 +++++++++--
 sdks/go/pkg/beam/io/fhirio/common.go               |  85 ++++++-----
 sdks/go/pkg/beam/io/fhirio/execute_bundles.go      |  20 ++-
 sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go |  24 +--
 sdks/go/pkg/beam/io/fhirio/read.go                 |   8 +-
 sdks/go/pkg/beam/io/fhirio/read_test.go            |   8 +-
 sdks/go/pkg/beam/io/fhirio/search.go               | 168 +++++++++++++++++++++
 sdks/go/pkg/beam/io/fhirio/search_test.go          | 106 +++++++++++++
 sdks/go/pkg/beam/io/fhirio/utils_test.go           |  63 +++++---
 sdks/go/test/integration/io/fhirio/fhirio_test.go  |  48 ++++++
 10 files changed, 526 insertions(+), 90 deletions(-)

diff --git a/sdks/go/data/fhir_bundles/transaction-success.json b/sdks/go/data/fhir_bundles/transaction-success.json
index c95e0b19344..a4946b36a2e 100644
--- a/sdks/go/data/fhir_bundles/transaction-success.json
+++ b/sdks/go/data/fhir_bundles/transaction-success.json
@@ -5,20 +5,48 @@
     {
       "request": {
         "method": "POST",
-        "url": "Patient"
+        "url": "Organization"
       },
       "resource": {
-        "name": [
+        "resourceType": "Organization",
+        "id": "b0e04623-b02c-3f8b-92ea-943fc4db60da",
+        "identifier": [
           {
-            "use": "official",
-            "given": [
-              "John"
-            ]
+            "system": "https://github.com/synthetichealth/synthea",
+            "value": "b0e04623-b02c-3f8b-92ea-943fc4db60da"
           }
         ],
-        "gender": "male",
-        "birthDate": "1973-01-21",
-        "resourceType": "Patient"
+        "active": true,
+        "type": [
+          {
+            "coding": [
+              {
+                "system": "http://terminology.hl7.org/CodeSystem/organization-type",
+                "code": "prov",
+                "display": "Healthcare Provider"
+              }
+            ],
+            "text": "Healthcare Provider"
+          }
+        ],
+        "name": "LOWELL GENERAL HOSPITAL",
+        "telecom": [
+          {
+            "system": "phone",
+            "value": "9789376000"
+          }
+        ],
+        "address": [
+          {
+            "line": [
+              "295 VARNUM AVENUE"
+            ],
+            "city": "LOWELL",
+            "state": "MA",
+            "postalCode": "01854",
+            "country": "US"
+          }
+        ]
       }
     },
     {
@@ -30,6 +58,7 @@
         "name": [
           {
             "use": "official",
+            "family": "Smith",
             "given": [
               "Alice"
             ]
@@ -39,6 +68,45 @@
         "birthDate": "1970-01-01",
         "resourceType": "Patient"
       }
+    },
+    {
+      "request": {
+        "method": "POST",
+        "url": "Practitioner"
+      },
+      "resource": {
+        "resourceType": "Practitioner",
+        "name": [
+          {
+            "family": "Tillman293",
+            "given": [
+              "Franklin857"
+            ],
+            "prefix": [
+              "Dr."
+            ]
+          }
+        ],
+        "telecom": [
+          {
+            "system": "email",
+            "value": "Franklin857.Tillman293@example.com",
+            "use": "work"
+          }
+        ],
+        "address": [
+          {
+            "line": [
+              "295 VARNUM AVENUE"
+            ],
+            "city": "LOWELL",
+            "state": "MA",
+            "postalCode": "01854",
+            "country": "US"
+          }
+        ],
+        "gender": "male"
+      }
     }
   ]
 }
diff --git a/sdks/go/pkg/beam/io/fhirio/common.go b/sdks/go/pkg/beam/io/fhirio/common.go
index 666f9d6f0e0..649bbbd9c6d 100644
--- a/sdks/go/pkg/beam/io/fhirio/common.go
+++ b/sdks/go/pkg/beam/io/fhirio/common.go
@@ -23,31 +23,35 @@ import (
 	"context"
 	"io"
 	"net/http"
-	"regexp"
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"google.golang.org/api/googleapi"
 	"google.golang.org/api/healthcare/v1"
 	"google.golang.org/api/option"
 )
 
 const (
-	UserAgent        = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
-	baseMetricPrefix = "fhirio/"
+	UserAgent             = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
+	baseMetricPrefix      = "fhirio/"
+	errorCounterName      = baseMetricPrefix + "resource_error_count"
+	successCounterName    = baseMetricPrefix + "resource_success_count"
+	pageTokenParameterKey = "_page_token"
 )
 
-func executeRequestAndRecordLatency(ctx context.Context, latencyMs *beam.Distribution, requestSupplier func() (*http.Response, error)) (*http.Response, error) {
+func executeAndRecordLatency[T any](ctx context.Context, latencyMs *beam.Distribution, executionSupplier func() (T, error)) (T, error) {
 	timeBeforeReadRequest := time.Now()
-	response, err := requestSupplier()
+	result, err := executionSupplier()
 	latencyMs.Update(ctx, time.Since(timeBeforeReadRequest).Milliseconds())
-	return response, err
+	return result, err
 }
 
 func extractBodyFrom(response *http.Response) (string, error) {
-	if isBadStatusCode(response.Status) {
-		return "", errors.Errorf("response contains bad status: [%v]", response.Status)
+	err := googleapi.CheckResponse(response)
+	if err != nil {
+		return "", errors.Wrapf(err, "response contains bad status: [%v]", response.Status)
 	}
 
 	bodyBytes, err := io.ReadAll(response.Body)
@@ -58,38 +62,39 @@ func extractBodyFrom(response *http.Response) (string, error) {
 	return string(bodyBytes), nil
 }
 
-func isBadStatusCode(status string) bool {
-	// 2XXs are successes, otherwise failure.
-	isMatch, err := regexp.MatchString("^2\\d{2}", status)
-	if err != nil {
-		return true
-	}
-	return !isMatch
+type fhirStoreClient interface {
+	readResource(resourcePath string) (*http.Response, error)
+	executeBundle(storePath string, bundle []byte) (*http.Response, error)
+	search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error)
 }
 
-type fhirioFnCommon struct {
-	client                fhirStoreClient
-	resourcesErrorCount   beam.Counter
-	resourcesSuccessCount beam.Counter
-	latencyMs             beam.Distribution
+type fhirStoreClientImpl struct {
+	fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
 }
 
-func (fnc *fhirioFnCommon) setup(namespace string) {
-	if fnc.client == nil {
-		fnc.client = newFhirStoreClient()
-	}
-	fnc.resourcesErrorCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_error_count")
-	fnc.resourcesSuccessCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_success_count")
-	fnc.latencyMs = beam.NewDistribution(namespace, baseMetricPrefix+"latency_ms")
+func (c *fhirStoreClientImpl) readResource(resourcePath string) (*http.Response, error) {
+	return c.fhirService.Read(resourcePath).Do()
 }
 
-type fhirStoreClient interface {
-	readResource(resourcePath string) (*http.Response, error)
-	executeBundle(storePath string, bundle []byte) (*http.Response, error)
+func (c *fhirStoreClientImpl) executeBundle(storePath string, bundle []byte) (*http.Response, error) {
+	return c.fhirService.ExecuteBundle(storePath, bytes.NewReader(bundle)).Do()
 }
 
-type fhirStoreClientImpl struct {
-	fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
+func (c *fhirStoreClientImpl) search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error) {
+	queryParams := make([]googleapi.CallOption, 0)
+	for key, value := range queries {
+		queryParams = append(queryParams, googleapi.QueryParameter(key, value))
+	}
+
+	if pageToken != "" {
+		queryParams = append(queryParams, googleapi.QueryParameter(pageTokenParameterKey, pageToken))
+	}
+
+	searchRequest := &healthcare.SearchResourcesRequest{}
+	if resourceType == "" {
+		return c.fhirService.Search(storePath, searchRequest).Do(queryParams...)
+	}
+	return c.fhirService.SearchType(storePath, resourceType, searchRequest).Do(queryParams...)
 }
 
 func newFhirStoreClient() *fhirStoreClientImpl {
@@ -100,10 +105,18 @@ func newFhirStoreClient() *fhirStoreClientImpl {
 	return &fhirStoreClientImpl{fhirService: healthcare.NewProjectsLocationsDatasetsFhirStoresFhirService(healthcareService)}
 }
 
-func (c *fhirStoreClientImpl) readResource(resourcePath string) (*http.Response, error) {
-	return c.fhirService.Read(resourcePath).Do()
+type fnCommonVariables struct {
+	client                fhirStoreClient
+	resourcesErrorCount   beam.Counter
+	resourcesSuccessCount beam.Counter
+	latencyMs             beam.Distribution
 }
 
-func (c *fhirStoreClientImpl) executeBundle(storePath string, bundle []byte) (*http.Response, error) {
-	return c.fhirService.ExecuteBundle(storePath, bytes.NewReader(bundle)).Do()
+func (fnc *fnCommonVariables) setup(namespace string) {
+	if fnc.client == nil {
+		fnc.client = newFhirStoreClient()
+	}
+	fnc.resourcesErrorCount = beam.NewCounter(namespace, errorCounterName)
+	fnc.resourcesSuccessCount = beam.NewCounter(namespace, successCounterName)
+	fnc.latencyMs = beam.NewDistribution(namespace, baseMetricPrefix+"latency_ms")
 }
diff --git a/sdks/go/pkg/beam/io/fhirio/execute_bundles.go b/sdks/go/pkg/beam/io/fhirio/execute_bundles.go
index c96d72fae39..4b7757bbf95 100644
--- a/sdks/go/pkg/beam/io/fhirio/execute_bundles.go
+++ b/sdks/go/pkg/beam/io/fhirio/execute_bundles.go
@@ -23,6 +23,7 @@ import (
 	"context"
 	"encoding/json"
 	"net/http"
+	"regexp"
 	"strings"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
@@ -41,7 +42,7 @@ func init() {
 }
 
 type executeBundleFn struct {
-	fhirioFnCommon
+	fnCommonVariables
 	successesCount beam.Counter
 	// Path to FHIR store where bundle requests will be executed on.
 	FhirStorePath string
@@ -52,12 +53,12 @@ func (fn executeBundleFn) String() string {
 }
 
 func (fn *executeBundleFn) Setup() {
-	fn.fhirioFnCommon.setup(fn.String())
+	fn.fnCommonVariables.setup(fn.String())
 	fn.successesCount = beam.NewCounter(fn.String(), baseMetricPrefix+"success_count")
 }
 
 func (fn *executeBundleFn) ProcessElement(ctx context.Context, inputBundleBody []byte, emitSuccess, emitFailure func(string)) {
-	response, err := executeRequestAndRecordLatency(ctx, &fn.latencyMs, func() (*http.Response, error) {
+	response, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() (*http.Response, error) {
 		return fn.client.executeBundle(fn.FhirStorePath, inputBundleBody)
 	})
 	if err != nil {
@@ -114,7 +115,7 @@ func (fn *executeBundleFn) processResponseBody(ctx context.Context, body string,
 				continue
 			}
 
-			if isBadStatusCode(entryFields.Response.Status) {
+			if batchResponseStatusIsBad(entryFields.Response.Status) {
 				fn.resourcesErrorCount.Inc(ctx, 1)
 				emitFailure(errors.Errorf("execute bundles entry contains bad status: [%v]", entryFields.Response.Status).Error())
 			} else {
@@ -127,6 +128,15 @@ func (fn *executeBundleFn) processResponseBody(ctx context.Context, body string,
 	fn.successesCount.Inc(ctx, 1)
 }
 
+func batchResponseStatusIsBad(status string) bool {
+	// 2XXs are successes, otherwise failure.
+	isMatch, err := regexp.MatchString("^2\\d{2}", status)
+	if err != nil {
+		return true
+	}
+	return !isMatch
+}
+
 // ExecuteBundles performs all the requests in the specified bundles on a given
 // FHIR store. This transform takes a path to a FHIR store and a PCollection of
 // bundles as JSON-encoded strings. It executes the requests defined on the
@@ -142,5 +152,5 @@ func ExecuteBundles(s beam.Scope, fhirStorePath string, bundles beam.PCollection
 
 // This is useful as an entry point for testing because we can provide a fake FHIR store client.
 func executeBundles(s beam.Scope, fhirStorePath string, bundles beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
-	return beam.ParDo2(s, &executeBundleFn{fhirioFnCommon: fhirioFnCommon{client: client}, FhirStorePath: fhirStorePath}, bundles)
+	return beam.ParDo2(s, &executeBundleFn{fnCommonVariables: fnCommonVariables{client: client}, FhirStorePath: fhirStorePath}, bundles)
 }
diff --git a/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go b/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go
index c0eb1174bb1..77a8fd85668 100644
--- a/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go
+++ b/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go
@@ -16,8 +16,9 @@
 package fhirio
 
 import (
-	"bytes"
+	"io"
 	"net/http"
+	"strconv"
 	"strings"
 	"testing"
 
@@ -39,7 +40,7 @@ func TestExecuteBundles(t *testing.T) {
 		{
 			name:           "Execute Bundles request returns bad status",
 			client:         badStatusFakeClient,
-			containedError: fakeBadStatus,
+			containedError: strconv.Itoa(http.StatusForbidden),
 		},
 		{
 			name:           "Execute Bundles request response body fails to be read",
@@ -47,18 +48,9 @@ func TestExecuteBundles(t *testing.T) {
 			containedError: fakeBodyReaderErrorMessage,
 		},
 		{
-			name: "Execute Bundles request response body failed to be decoded",
-			client: &fakeFhirStoreClient{
-				fakeExecuteBundles: func(storePath string, bundle []byte) (*http.Response, error) {
-					return &http.Response{
-						Body: &fakeReaderCloser{
-							fakeRead: func(t []byte) (int, error) {
-								return bytes.NewReader([]byte("")).Read(t)
-							},
-						}, Status: "200 Ok"}, nil
-				},
-			},
-			containedError: "EOF",
+			name:           "Execute Bundles request response body failed to be decoded",
+			client:         emptyResponseBodyFakeClient,
+			containedError: io.EOF.Error(),
 		},
 	}
 
@@ -73,9 +65,9 @@ func TestExecuteBundles(t *testing.T) {
 				return strings.Contains(errorMsg, testCase.containedError)
 			})
 			pipelineResult := ptest.RunAndValidate(t, p)
-			err := validateResourceErrorCounter(pipelineResult, len(testBundles))
+			err := validateCounter(pipelineResult, errorCounterName, len(testBundles))
 			if err != nil {
-				t.Fatalf("validateResourceErrorCounter returned error [%v]", err.Error())
+				t.Fatalf("validateCounter returned error [%v]", err.Error())
 			}
 		})
 	}
diff --git a/sdks/go/pkg/beam/io/fhirio/read.go b/sdks/go/pkg/beam/io/fhirio/read.go
index 41c53a540b6..d6041612ea5 100644
--- a/sdks/go/pkg/beam/io/fhirio/read.go
+++ b/sdks/go/pkg/beam/io/fhirio/read.go
@@ -33,7 +33,7 @@ func init() {
 }
 
 type readResourceFn struct {
-	fhirioFnCommon
+	fnCommonVariables
 }
 
 func (fn readResourceFn) String() string {
@@ -41,11 +41,11 @@ func (fn readResourceFn) String() string {
 }
 
 func (fn *readResourceFn) Setup() {
-	fn.fhirioFnCommon.setup(fn.String())
+	fn.fnCommonVariables.setup(fn.String())
 }
 
 func (fn *readResourceFn) ProcessElement(ctx context.Context, resourcePath string, emitResource, emitDeadLetter func(string)) {
-	response, err := executeRequestAndRecordLatency(ctx, &fn.latencyMs, func() (*http.Response, error) {
+	response, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() (*http.Response, error) {
 		return fn.client.readResource(resourcePath)
 	})
 	if err != nil {
@@ -79,5 +79,5 @@ func Read(s beam.Scope, resourcePaths beam.PCollection) (beam.PCollection, beam.
 
 // This is useful as an entry point for testing because we can provide a fake FHIR store client.
 func read(s beam.Scope, resourcePaths beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
-	return beam.ParDo2(s, &readResourceFn{fhirioFnCommon: fhirioFnCommon{client: client}}, resourcePaths)
+	return beam.ParDo2(s, &readResourceFn{fnCommonVariables: fnCommonVariables{client: client}}, resourcePaths)
 }
diff --git a/sdks/go/pkg/beam/io/fhirio/read_test.go b/sdks/go/pkg/beam/io/fhirio/read_test.go
index 39ae484b92b..e6a8cbd669c 100644
--- a/sdks/go/pkg/beam/io/fhirio/read_test.go
+++ b/sdks/go/pkg/beam/io/fhirio/read_test.go
@@ -16,6 +16,8 @@
 package fhirio
 
 import (
+	"net/http"
+	"strconv"
 	"strings"
 	"testing"
 
@@ -37,7 +39,7 @@ func TestRead(t *testing.T) {
 		{
 			name:           "Read request returns bad status",
 			client:         badStatusFakeClient,
-			containedError: fakeBadStatus,
+			containedError: strconv.Itoa(http.StatusForbidden),
 		},
 		{
 			name:           "Read request response body fails to be read",
@@ -57,9 +59,9 @@ func TestRead(t *testing.T) {
 				return strings.Contains(errorMsg, testCase.containedError)
 			})
 			pipelineResult := ptest.RunAndValidate(t, p)
-			err := validateResourceErrorCounter(pipelineResult, len(testResourcePaths))
+			err := validateCounter(pipelineResult, errorCounterName, len(testResourcePaths))
 			if err != nil {
-				t.Fatalf("validateResourceErrorCounter returned error [%v]", err.Error())
+				t.Fatalf("validateCounter returned error [%v]", err.Error())
 			}
 		})
 	}
diff --git a/sdks/go/pkg/beam/io/fhirio/search.go b/sdks/go/pkg/beam/io/fhirio/search.go
new file mode 100644
index 00000000000..4e88fecbab8
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/search.go
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(ctx, query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(ctx context.Context, query SearchQuery) ([]string, error) {
+	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(ctx, query, "")
+	allResources := resourcesInPage
+	for nextPageToken != "" {
+		resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(ctx, query, nextPageToken)
+		allResources = append(allResources, resourcesInPage...)
+	}
+	return allResources, err
+}
+
+// Performs a search request retrieving results only from the page identified by
+// `pageToken`. If `pageToken` is the empty string it will retrieve the results
+// from the first page.
+func (fn *searchResourcesFn) searchResourcesPaginated(ctx context.Context, query SearchQuery, pageToken string) ([]string, string, error) {
+	response, err := fn.client.search(fn.FhirStorePath, query.ResourceType, query.Parameters, pageToken)
+	if err != nil {
+		return nil, "", err
+	}
+
+	body, err := extractBodyFrom(response)
+	if err != nil {
+		return nil, "", err
+	}
+
+	var bodyFields struct {
+		Entries []interface{}        `json:"entry"`
+		Links   []responseLinkFields `json:"link"`
+	}
+	err = json.NewDecoder(strings.NewReader(body)).Decode(&bodyFields)
+	if err != nil {
+		return nil, "", err
+	}
+
+	resourcesFoundInPage := mapEntryToString(ctx, bodyFields.Entries)
+	return resourcesFoundInPage, extractNextPageTokenFrom(ctx, bodyFields.Links), nil
+}
+
+func mapEntryToString(ctx context.Context, entries []interface{}) []string {
+	stringifiedEntries := make([]string, 0)
+	for _, entry := range entries {
+		entryBytes, err := json.Marshal(entry)
+		if err != nil {
+			log.Warnf(ctx, "Ignoring malformed entry resource. Error: %v", err)
+			continue
+		}
+		stringifiedEntries = append(stringifiedEntries, string(entryBytes))
+	}
+	return stringifiedEntries
+}
+
+func extractNextPageTokenFrom(ctx context.Context, searchResponseLinks []responseLinkFields) string {
+	for _, link := range searchResponseLinks {
+		// The link with relation field valued "next" contains the page token
+		if link.Relation != "next" {
+			continue
+		}
+
+		parsedUrl, err := url.Parse(link.Url)
+		if err != nil {
+			log.Warnf(ctx, "Search next page token failed to be parsed from URL [%v]. Reason: %v", link.Url, err)
+			break
+		}
+		return parsedUrl.Query().Get(pageTokenParameterKey)
+	}
+	return ""
+}
+
+// Search transform searches for resources in a Google Cloud Healthcare FHIR
+// store based on input queries. It consumes a PCollection<fhirio.SearchQuery>
+// and outputs two PCollections, the first a tuple (identifier, searchResults)
+// where `identifier` is the SearchQuery identifier field and `searchResults` is
+// a slice of all found resources as a JSON-encoded string. The second
+// PCollection is a dead-letter for the input queries that caused errors when
+// performing the search.
+// See: https://cloud.google.com/healthcare-api/docs/how-tos/fhir-search
+func Search(s beam.Scope, fhirStorePath string, searchQueries beam.PCollection) (beam.PCollection, beam.PCollection) {
+	s = s.Scope("fhirio.Search")
+	return search(s, fhirStorePath, searchQueries, nil)
+}
+
+// This is useful as an entry point for testing because we can provide a fake FHIR store client.
+func search(s beam.Scope, fhirStorePath string, searchQueries beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
+	return beam.ParDo2(s, &searchResourcesFn{fnCommonVariables: fnCommonVariables{client: client}, FhirStorePath: fhirStorePath}, searchQueries)
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/search_test.go b/sdks/go/pkg/beam/io/fhirio/search_test.go
new file mode 100644
index 00000000000..7b2a2287d9b
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/search_test.go
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+	"bytes"
+	"io"
+	"net/http"
+	"strconv"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestSearch_Errors(t *testing.T) {
+	testCases := []struct {
+		name           string
+		client         fhirStoreClient
+		containedError string
+	}{
+		{
+			name:           "Search request returns error",
+			client:         requestReturnErrorFakeClient,
+			containedError: fakeRequestReturnErrorMessage,
+		},
+		{
+			name:           "Search request returns bad status",
+			client:         badStatusFakeClient,
+			containedError: strconv.Itoa(http.StatusForbidden),
+		},
+		{
+			name:           "Search request response body fails to be read",
+			client:         bodyReaderErrorFakeClient,
+			containedError: fakeBodyReaderErrorMessage,
+		},
+		{
+			name:           "Search request response body failed to be decoded",
+			client:         emptyResponseBodyFakeClient,
+			containedError: io.EOF.Error(),
+		},
+	}
+
+	for _, testCase := range testCases {
+		t.Run(testCase.name, func(t *testing.T) {
+			p, s, testSearchQueryPCollection := ptest.CreateList([]SearchQuery{{}})
+			resources, failedSearches := search(s, "any", testSearchQueryPCollection, testCase.client)
+			passert.Empty(s, resources)
+			passert.Count(s, failedSearches, "", 1)
+			passert.True(s, failedSearches, func(errorMsg string) bool {
+				return strings.Contains(errorMsg, testCase.containedError)
+			})
+			pipelineResult := ptest.RunAndValidate(t, p)
+			err := validateCounter(pipelineResult, errorCounterName, 1)
+			if err != nil {
+				t.Fatalf("validateCounter returned error [%v]", err.Error())
+			}
+		})
+	}
+}
+
+func TestSearch_Pagination(t *testing.T) {
+	paginationFakeClient := &fakeFhirStoreClient{
+		fakeSearch: func(s, s2 string, m map[string]string, pageToken string) (*http.Response, error) {
+			if pageToken == "theNextPageToken" {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{"resourceType": "Patient", "id": "2"}}], "link": []}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			} else {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{"resourceType": "Patient", "id": "1"}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			}
+		},
+	}
+	p, s, testSearchQuery := ptest.CreateList([]SearchQuery{{}})
+	resourcesFound, failedSearches := search(s, "any", testSearchQuery, paginationFakeClient)
+	passert.Empty(s, failedSearches)
+	passert.Count(s, resourcesFound, "", 1)
+	resourcesFoundWithoutIdentifier := beam.DropKey(s, resourcesFound)
+	passert.True(s, resourcesFoundWithoutIdentifier, func(resourcesFound []string) bool {
+		return len(resourcesFound) == 2
+	})
+	pipelineResult := ptest.RunAndValidate(t, p)
+	err := validateCounter(pipelineResult, successCounterName, 1)
+	if err != nil {
+		t.Fatalf("validateCounter returned error [%v]", err.Error())
+	}
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/utils_test.go b/sdks/go/pkg/beam/io/fhirio/utils_test.go
index a8ffefd207c..d97364c5288 100644
--- a/sdks/go/pkg/beam/io/fhirio/utils_test.go
+++ b/sdks/go/pkg/beam/io/fhirio/utils_test.go
@@ -16,8 +16,10 @@
 package fhirio
 
 import (
+	"bytes"
 	"errors"
 	"fmt"
+	"io"
 	"net/http"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
@@ -26,21 +28,29 @@ import (
 var (
 	fakeRequestReturnErrorMessage = "internal error"
 	requestReturnErrorFakeClient  = &fakeFhirStoreClient{
-		fakeReadResources: func(resource string) (*http.Response, error) {
+		fakeReadResources: func(string) (*http.Response, error) {
 			return nil, errors.New(fakeRequestReturnErrorMessage)
 		},
-		fakeExecuteBundles: func(storePath string, bundle []byte) (*http.Response, error) {
+		fakeExecuteBundles: func(string, []byte) (*http.Response, error) {
+			return nil, errors.New(fakeRequestReturnErrorMessage)
+		},
+		fakeSearch: func(string, string, map[string]string, string) (*http.Response, error) {
 			return nil, errors.New(fakeRequestReturnErrorMessage)
 		},
 	}
 
-	fakeBadStatus         = "403 Forbidden"
-	badStatusFakeResponse = &http.Response{Status: fakeBadStatus}
-	badStatusFakeClient   = &fakeFhirStoreClient{
-		fakeReadResources: func(resource string) (*http.Response, error) {
+	badStatusFakeResponse = &http.Response{
+		Body:       io.NopCloser(bytes.NewBufferString("response")),
+		StatusCode: http.StatusForbidden,
+	}
+	badStatusFakeClient = &fakeFhirStoreClient{
+		fakeReadResources: func(string) (*http.Response, error) {
+			return badStatusFakeResponse, nil
+		},
+		fakeExecuteBundles: func(string, []byte) (*http.Response, error) {
 			return badStatusFakeResponse, nil
 		},
-		fakeExecuteBundles: func(storePath string, bundle []byte) (*http.Response, error) {
+		fakeSearch: func(string, string, map[string]string, string) (*http.Response, error) {
 			return badStatusFakeResponse, nil
 		},
 	}
@@ -51,20 +61,39 @@ var (
 			fakeRead: func([]byte) (int, error) {
 				return 0, errors.New(fakeBodyReaderErrorMessage)
 			},
-		}, Status: "200 Ok"}
+		},
+		StatusCode: http.StatusOK,
+	}
 	bodyReaderErrorFakeClient = &fakeFhirStoreClient{
-		fakeReadResources: func(resource string) (*http.Response, error) {
+		fakeReadResources: func(string) (*http.Response, error) {
 			return bodyReaderErrorFakeResponse, nil
 		},
-		fakeExecuteBundles: func(storePath string, bundle []byte) (*http.Response, error) {
+		fakeExecuteBundles: func(string, []byte) (*http.Response, error) {
+			return bodyReaderErrorFakeResponse, nil
+		},
+		fakeSearch: func(string, string, map[string]string, string) (*http.Response, error) {
 			return bodyReaderErrorFakeResponse, nil
 		},
 	}
+
+	emptyBodyReaderFakeResponse = &http.Response{
+		Body:       io.NopCloser(bytes.NewBuffer(nil)),
+		StatusCode: http.StatusOK,
+	}
+	emptyResponseBodyFakeClient = &fakeFhirStoreClient{
+		fakeExecuteBundles: func(string, []byte) (*http.Response, error) {
+			return emptyBodyReaderFakeResponse, nil
+		},
+		fakeSearch: func(string, string, map[string]string, string) (*http.Response, error) {
+			return emptyBodyReaderFakeResponse, nil
+		},
+	}
 )
 
 type fakeFhirStoreClient struct {
 	fakeReadResources  func(string) (*http.Response, error)
-	fakeExecuteBundles func(storePath string, bundle []byte) (*http.Response, error)
+	fakeExecuteBundles func(string, []byte) (*http.Response, error)
+	fakeSearch         func(string, string, map[string]string, string) (*http.Response, error)
 }
 
 func (c *fakeFhirStoreClient) executeBundle(storePath string, bundle []byte) (*http.Response, error) {
@@ -75,27 +104,27 @@ func (c *fakeFhirStoreClient) readResource(resourcePath string) (*http.Response,
 	return c.fakeReadResources(resourcePath)
 }
 
+func (c *fakeFhirStoreClient) search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error) {
+	return c.fakeSearch(storePath, resourceType, queries, pageToken)
+}
+
 // Useful to fake the Body of a http.Response.
 type fakeReaderCloser struct {
+	io.Closer
 	fakeRead func([]byte) (int, error)
 }
 
-func (*fakeReaderCloser) Close() error {
-	return nil
-}
-
 func (m *fakeReaderCloser) Read(b []byte) (int, error) {
 	return m.fakeRead(b)
 }
 
-func validateResourceErrorCounter(pipelineResult beam.PipelineResult, expectedCount int) error {
+func validateCounter(pipelineResult beam.PipelineResult, expectedCounterName string, expectedCount int) error {
 	counterResults := pipelineResult.Metrics().AllMetrics().Counters()
 	if len(counterResults) != 1 {
 		return fmt.Errorf("counterResults got length %v, expected %v", len(counterResults), 1)
 	}
 	counterResult := counterResults[0]
 
-	expectedCounterName := "fhirio/resource_error_count"
 	if counterResult.Name() != expectedCounterName {
 		return fmt.Errorf("counterResult.Name() is '%v', expected '%v'", counterResult.Name(), expectedCounterName)
 	}
diff --git a/sdks/go/test/integration/io/fhirio/fhirio_test.go b/sdks/go/test/integration/io/fhirio/fhirio_test.go
index b9274598f5a..023ec62290c 100644
--- a/sdks/go/test/integration/io/fhirio/fhirio_test.go
+++ b/sdks/go/test/integration/io/fhirio/fhirio_test.go
@@ -48,6 +48,7 @@ const (
 )
 
 var (
+	backoffDuration        = [...]time.Duration{time.Second, 5 * time.Second, 10 * time.Second}
 	storeService           *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
 	storeManagementService *healthcare.ProjectsLocationsDatasetsFhirStoresService
 )
@@ -177,6 +178,22 @@ func extractResourcePathFrom(resourceLocationURL string) (string, error) {
 	return resourceLocationURL[startIdx:endIdx], nil
 }
 
+// Useful to prevent flaky results.
+func runWithBackoffRetries(t *testing.T, p *beam.Pipeline) error {
+	t.Helper()
+
+	var err error
+	for attempt := 0; attempt < len(backoffDuration); attempt++ {
+		err = ptest.Run(p)
+		if err == nil {
+			break
+		}
+		t.Logf("backoff %v after failure", backoffDuration[attempt])
+		time.Sleep(backoffDuration[attempt])
+	}
+	return err
+}
+
 func TestFhirIO_Read(t *testing.T) {
 	integration.CheckFilters(t)
 	checkFlags(t)
@@ -225,9 +242,40 @@ func TestFhirIO_ExecuteBundles(t *testing.T) {
 	passert.True(s, failures, func(errorMsg string) bool {
 		return strings.Contains(errorMsg, strconv.Itoa(http.StatusBadRequest))
 	})
+
 	ptest.RunAndValidate(t, p)
 }
 
+func TestFhirIO_Search(t *testing.T) {
+	integration.CheckFilters(t)
+	checkFlags(t)
+
+	fhirStorePath, _, teardownFhirStore := setupFhirStoreWithData(t)
+	defer teardownFhirStore()
+
+	searchQueries := []fhirio.SearchQuery{
+		{},
+		{ResourceType: "Patient"},
+		{ResourceType: "Patient", Parameters: map[string]string{"gender": "female", "family:contains": "Smith"}},
+		{ResourceType: "Encounter"},
+	}
+
+	p, s, searchQueriesCol := ptest.CreateList(searchQueries)
+	searchResult, deadLetter := fhirio.Search(s, fhirStorePath, searchQueriesCol)
+	passert.Empty(s, deadLetter)
+	passert.Count(s, searchResult, "", len(searchQueries))
+
+	resourcesFoundCount := beam.ParDo(s, func(identifier string, resourcesFound []string) int {
+		return len(resourcesFound)
+	}, searchResult)
+	passert.Equals(s, resourcesFoundCount, 4, 2, 1, 0)
+
+	err := runWithBackoffRetries(t, p)
+	if err != nil {
+		t.Fatalf("Pipeline assertions failed: %v", err)
+	}
+}
+
 func TestMain(m *testing.M) {
 	flag.Parse()
 	beam.Init()