You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/22 07:30:17 UTC

[GitHub] [beam] lnogueir opened a new pull request, #21979: Add Search transform to Go FhirIO

lnogueir opened a new pull request, #21979:
URL: https://github.com/apache/beam/pull/21979

   Porting [Search](https://github.com/apache/beam/blob/2574f9106c75e413310dcb7756952889da3fe7cd/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L1807) transform from Java to Go FhirIO.
   
   Buganizer ticket: [b/236784940](https://b.corp.google.com/issues/236784940)
   
   Fixes #21977
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lnogueir commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
lnogueir commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1169251706

   R: @DanKotowski


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1162750231

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] youngoli commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
youngoli commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r908028045


##########
sdks/go/test/integration/io/fhirio/fhirio_test.go:
##########
@@ -228,6 +229,101 @@ func TestFhirIO_ExecuteBundles(t *testing.T) {
 	ptest.RunAndValidate(t, p)
 }
 
+func TestFhirIO_Search_SelectAll(t *testing.T) {

Review Comment:
   Integration tests are expensive to spin up and test for each pipeline used, so I would encourage limiting the number you have as much as possible. In this situation I think you can probably consolidate all the stuff you're testing to one pipeline since they all use the same FhirStore data, just use multiple Search transforms, or consolidate testing coverage through one Search transform (ex. testing ResourceType, parameters, and pagination all with one search query). And if any of these tests work just fine as unit tests, that's a better alternative (pagination for example strikes me as suited for a unit test).



##########
sdks/go/pkg/beam/io/fhirio/common.go:
##########
@@ -23,31 +23,33 @@ import (
 	"context"
 	"io"
 	"net/http"
-	"regexp"
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"google.golang.org/api/googleapi"
 	"google.golang.org/api/healthcare/v1"
 	"google.golang.org/api/option"
 )
 
 const (
-	UserAgent        = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
-	baseMetricPrefix = "fhirio/"
+	UserAgent             = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
+	baseMetricPrefix      = "fhirio/"
+	pageTokenParameterKey = "_page_token"
 )
 
-func executeRequestAndRecordLatency(ctx context.Context, latencyMs *beam.Distribution, requestSupplier func() (*http.Response, error)) (*http.Response, error) {
+func executeAndRecordLatency[T any](ctx context.Context, latencyMs *beam.Distribution, executionSupplier func() (T, error)) (T, error) {

Review Comment:
   @lostluck Are we good to use generics in Go SDK IOs?



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(query SearchQuery) ([]string, error) {
+	var (
+		allResources, resourcesInPage []string
+		nextPageToken                 string
+		err                           error
+	)
+	for resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, ""); nextPageToken != ""; resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, nextPageToken) {

Review Comment:
   style nit: This line is so long it impacts legibility.
   
   I think you should either split this apart to multiple lines (while loop style) or use much shorter variable names (Go style conventions usually recommend local variables of only a few letters anyway). I think renaming to `res`, `pageRes`, and `token` is still equally descriptive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] msbukal commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
msbukal commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r910184554


##########
sdks/go/pkg/beam/io/fhirio/common.go:
##########
@@ -58,38 +62,34 @@ func extractBodyFrom(response *http.Response) (string, error) {
 	return string(bodyBytes), nil
 }
 
-func isBadStatusCode(status string) bool {
-	// 2XXs are successes, otherwise failure.
-	isMatch, err := regexp.MatchString("^2\\d{2}", status)
-	if err != nil {
-		return true
-	}
-	return !isMatch
+type fhirStoreClient interface {
+	readResource(resourcePath string) (*http.Response, error)
+	executeBundle(storePath string, bundle []byte) (*http.Response, error)
+	search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error)
 }
 
-type fhirioFnCommon struct {
-	client                fhirStoreClient
-	resourcesErrorCount   beam.Counter
-	resourcesSuccessCount beam.Counter
-	latencyMs             beam.Distribution
+type fhirStoreClientImpl struct {
+	fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
 }
 
-func (fnc *fhirioFnCommon) setup(namespace string) {
-	if fnc.client == nil {
-		fnc.client = newFhirStoreClient()
-	}
-	fnc.resourcesErrorCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_error_count")
-	fnc.resourcesSuccessCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_success_count")
-	fnc.latencyMs = beam.NewDistribution(namespace, baseMetricPrefix+"latency_ms")
+func (c *fhirStoreClientImpl) readResource(resourcePath string) (*http.Response, error) {
+	return c.fhirService.Read(resourcePath).Do()
 }
 
-type fhirStoreClient interface {
-	readResource(resourcePath string) (*http.Response, error)
-	executeBundle(storePath string, bundle []byte) (*http.Response, error)
+func (c *fhirStoreClientImpl) executeBundle(storePath string, bundle []byte) (*http.Response, error) {
+	return c.fhirService.ExecuteBundle(storePath, bytes.NewReader(bundle)).Do()
 }
 
-type fhirStoreClientImpl struct {
-	fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
+func (c *fhirStoreClientImpl) search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error) {
+	queryParams := make([]googleapi.CallOption, 0)
+	for key, value := range queries {
+		queryParams = append(queryParams, googleapi.QueryParameter(key, value))
+	}
+
+	if pageToken != "" {
+		queryParams = append(queryParams, googleapi.QueryParameter(pageTokenParameterKey, pageToken))
+	}
+	return c.fhirService.Search(storePath, &healthcare.SearchResourcesRequest{ResourceType: resourceType}).Do(queryParams...)

Review Comment:
   Ah... did you get this pattern from an example text somewhere? (If you did, could you link me it?)
   
   It's unintended behaviour to be able to specify the resource type like this, and this caused a large outage for us a couple months back. I fixed this in the Java implementation (https://github.com/apache/beam/pull/17315, see more details at b/227655182)
   
   Could you fix this, if resourceType is empty then you use [.Search](https://pkg.go.dev/google.golang.org/api/healthcare/v1#ProjectsLocationsDatasetsFhirStoresFhirService.Search), otherwise use [.SearchType](https://pkg.go.dev/google.golang.org/api/healthcare/v1#ProjectsLocationsDatasetsFhirStoresFhirService.SearchType)



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(query SearchQuery) ([]string, error) {
+	allResources := make([]string, 0)
+	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(query, "")
+	for nextPageToken != "" {
+		allResources = append(allResources, resourcesInPage...)
+		resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, nextPageToken)
+	}
+	// Must add resources from last page.
+	allResources = append(allResources, resourcesInPage...)
+	return allResources, err
+}
+
+// Performs a search request retrieving results only from the page identified by
+// `pageToken`. If `pageToken` is the empty string it will retrieve the results
+// from the first page.
+func (fn *searchResourcesFn) searchResourcesPaginated(query SearchQuery, pageToken string) ([]string, string, error) {
+	response, err := fn.client.search(fn.FhirStorePath, query.ResourceType, query.Parameters, pageToken)
+	if err != nil {
+		return nil, "", err
+	}
+
+	body, err := extractBodyFrom(response)
+	if err != nil {
+		return nil, "", err
+	}
+
+	var bodyFields struct {
+		Entries []interface{}        `json:"entry"`
+		Links   []responseLinkFields `json:"link"`
+	}
+	err = json.NewDecoder(strings.NewReader(body)).Decode(&bodyFields)
+	if err != nil {
+		return nil, "", err
+	}
+
+	resourcesFoundInPage := mapEntryToString(bodyFields.Entries)
+	return resourcesFoundInPage, extractNextPageTokenFrom(bodyFields.Links), nil
+}
+
+func mapEntryToString(entries []interface{}) []string {
+	stringifiedEntries := make([]string, 0)
+	for _, entry := range entries {
+		entryBytes, err := json.Marshal(entry)
+		if err != nil {
+			continue
+		}
+		stringifiedEntries = append(stringifiedEntries, string(entryBytes))
+	}
+	return stringifiedEntries
+}
+
+func extractNextPageTokenFrom(searchResponseLinks []responseLinkFields) string {
+	for _, link := range searchResponseLinks {
+		// The link with relation field valued "next" contains the page token
+		if link.Relation != "next" {
+			continue
+		}
+
+		parsedUrl, err := url.Parse(link.Url)
+		if err != nil {
+			break

Review Comment:
   Same here, should log the error.



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {

Review Comment:
   Sorry I'm confused, where are `emitFoundResources` and `emitDeadLetter` defined and implemented? Is this autoprovided by beam?



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(query SearchQuery) ([]string, error) {
+	allResources := make([]string, 0)
+	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(query, "")

Review Comment:
   I think this would read cleaner if you did
   
   ```
   	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(query, "")
   	allResources = append(allResources, resourcesInPage...)
   	for nextPageToken != "" {
   		resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, nextPageToken)
   		allResources = append(allResources, resourcesInPage...)
   	}
   ```
   
   then we add the resources we just queried, instead of pairing "the last search" and "the next search".



##########
sdks/go/pkg/beam/io/fhirio/search_test.go:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+	"bytes"
+	"io"
+	"net/http"
+	"strconv"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestSearch_Errors(t *testing.T) {
+	testCases := []struct {
+		name           string
+		client         fhirStoreClient
+		containedError string
+	}{
+		{
+			name:           "Search request returns error",
+			client:         requestReturnErrorFakeClient,
+			containedError: fakeRequestReturnErrorMessage,
+		},
+		{
+			name:           "Search request returns bad status",
+			client:         badStatusFakeClient,
+			containedError: strconv.Itoa(http.StatusForbidden),
+		},
+		{
+			name:           "Search request response body fails to be read",
+			client:         bodyReaderErrorFakeClient,
+			containedError: fakeBodyReaderErrorMessage,
+		},
+		{
+			name:           "Search request response body failed to be decoded",
+			client:         emptyResponseBodyFakeClient,
+			containedError: io.EOF.Error(),
+		},
+	}
+
+	for _, testCase := range testCases {
+		t.Run(testCase.name, func(t *testing.T) {
+			p, s, testSearchQueryPCollection := ptest.CreateList([]SearchQuery{{}})
+			resources, failedSearches := search(s, "any", testSearchQueryPCollection, testCase.client)
+			passert.Empty(s, resources)
+			passert.Count(s, failedSearches, "", 1)
+			passert.True(s, failedSearches, func(errorMsg string) bool {
+				return strings.Contains(errorMsg, testCase.containedError)
+			})
+			pipelineResult := ptest.RunAndValidate(t, p)
+			err := validateCounter(pipelineResult, errorCounterName, 1)
+			if err != nil {
+				t.Fatalf("validateCounter returned error [%v]", err.Error())
+			}
+		})
+	}
+}
+
+func TestSearch_Pagination(t *testing.T) {
+	paginationFakeClient := &fakeFhirStoreClient{
+		fakeSearch: func(s, s2 string, m map[string]string, pageToken string) (*http.Response, error) {
+			if pageToken == "theNextPageToken" {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": []}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			} else {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			}
+		},
+	}
+	p, s, testSearchQuery := ptest.CreateList([]SearchQuery{{}})
+	resourcesFound, failedSearches := search(s, "any", testSearchQuery, paginationFakeClient)
+	passert.Empty(s, failedSearches)
+	passert.Count(s, resourcesFound, "", 1)
+	resourcesFoundWithoutIdentifier := beam.DropKey(s, resourcesFound)
+	passert.True(s, resourcesFoundWithoutIdentifier, func(resourcesFound []string) bool {
+		return len(resourcesFound) == 2

Review Comment:
   How is resources found equal to 2 when you don't return any resources in the fake client?



##########
sdks/go/test/integration/io/fhirio/fhirio_test.go:
##########
@@ -153,6 +153,7 @@ func populateStore(storePath string) []string {
 			resourcePaths = append(resourcePaths, resourcePath)
 		}
 	}
+	time.Sleep(time.Second) // give some time for data to propagate. prevents flaky results

Review Comment:
   Now that we've added search... If indexing is overloaded/slow, this could take a lot longer than a second. Can we add a follow-up maybe to make the search test retry with a backoff? And maybe make the default here like 30 seconds, to prevent flakes.



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(query SearchQuery) ([]string, error) {
+	allResources := make([]string, 0)
+	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(query, "")
+	for nextPageToken != "" {
+		allResources = append(allResources, resourcesInPage...)
+		resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, nextPageToken)
+	}
+	// Must add resources from last page.
+	allResources = append(allResources, resourcesInPage...)
+	return allResources, err
+}
+
+// Performs a search request retrieving results only from the page identified by
+// `pageToken`. If `pageToken` is the empty string it will retrieve the results
+// from the first page.
+func (fn *searchResourcesFn) searchResourcesPaginated(query SearchQuery, pageToken string) ([]string, string, error) {
+	response, err := fn.client.search(fn.FhirStorePath, query.ResourceType, query.Parameters, pageToken)
+	if err != nil {
+		return nil, "", err
+	}
+
+	body, err := extractBodyFrom(response)
+	if err != nil {
+		return nil, "", err
+	}
+
+	var bodyFields struct {
+		Entries []interface{}        `json:"entry"`
+		Links   []responseLinkFields `json:"link"`
+	}
+	err = json.NewDecoder(strings.NewReader(body)).Decode(&bodyFields)
+	if err != nil {
+		return nil, "", err
+	}
+
+	resourcesFoundInPage := mapEntryToString(bodyFields.Entries)
+	return resourcesFoundInPage, extractNextPageTokenFrom(bodyFields.Links), nil
+}
+
+func mapEntryToString(entries []interface{}) []string {
+	stringifiedEntries := make([]string, 0)
+	for _, entry := range entries {
+		entryBytes, err := json.Marshal(entry)
+		if err != nil {
+			continue

Review Comment:
   Hmm we should at least log an error here somehow, if we silently drop an entry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lnogueir commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
lnogueir commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r910242981


##########
sdks/go/pkg/beam/io/fhirio/search_test.go:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+	"bytes"
+	"io"
+	"net/http"
+	"strconv"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestSearch_Errors(t *testing.T) {
+	testCases := []struct {
+		name           string
+		client         fhirStoreClient
+		containedError string
+	}{
+		{
+			name:           "Search request returns error",
+			client:         requestReturnErrorFakeClient,
+			containedError: fakeRequestReturnErrorMessage,
+		},
+		{
+			name:           "Search request returns bad status",
+			client:         badStatusFakeClient,
+			containedError: strconv.Itoa(http.StatusForbidden),
+		},
+		{
+			name:           "Search request response body fails to be read",
+			client:         bodyReaderErrorFakeClient,
+			containedError: fakeBodyReaderErrorMessage,
+		},
+		{
+			name:           "Search request response body failed to be decoded",
+			client:         emptyResponseBodyFakeClient,
+			containedError: io.EOF.Error(),
+		},
+	}
+
+	for _, testCase := range testCases {
+		t.Run(testCase.name, func(t *testing.T) {
+			p, s, testSearchQueryPCollection := ptest.CreateList([]SearchQuery{{}})
+			resources, failedSearches := search(s, "any", testSearchQueryPCollection, testCase.client)
+			passert.Empty(s, resources)
+			passert.Count(s, failedSearches, "", 1)
+			passert.True(s, failedSearches, func(errorMsg string) bool {
+				return strings.Contains(errorMsg, testCase.containedError)
+			})
+			pipelineResult := ptest.RunAndValidate(t, p)
+			err := validateCounter(pipelineResult, errorCounterName, 1)
+			if err != nil {
+				t.Fatalf("validateCounter returned error [%v]", err.Error())
+			}
+		})
+	}
+}
+
+func TestSearch_Pagination(t *testing.T) {
+	paginationFakeClient := &fakeFhirStoreClient{
+		fakeSearch: func(s, s2 string, m map[string]string, pageToken string) (*http.Response, error) {
+			if pageToken == "theNextPageToken" {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": []}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			} else {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			}
+		},
+	}
+	p, s, testSearchQuery := ptest.CreateList([]SearchQuery{{}})
+	resourcesFound, failedSearches := search(s, "any", testSearchQuery, paginationFakeClient)
+	passert.Empty(s, failedSearches)
+	passert.Count(s, resourcesFound, "", 1)
+	resourcesFoundWithoutIdentifier := beam.DropKey(s, resourcesFound)
+	passert.True(s, resourcesFoundWithoutIdentifier, func(resourcesFound []string) bool {
+		return len(resourcesFound) == 2

Review Comment:
   The fake client actually returns one dummy resource (i.e. `"entry": [{"resource":{}}]`) on each page, which is why it finds two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] youngoli commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
youngoli commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r911540966


##########
sdks/go/test/integration/io/fhirio/fhirio_test.go:
##########
@@ -177,6 +178,22 @@ func extractResourcePathFrom(resourceLocationUrl string) (string, error) {
 	return resourceLocationUrl[startIdx:endIdx], nil
 }
 
+// Useful to prevent flaky results.
+func runWithBackoffRetries(t *testing.T, p *beam.Pipeline) error {

Review Comment:
   I'm still hesitating with this one only because rerunning a pipeline on Dataflow is _slow_. This isn't a problem on the Direct runner, but on Dataflow rerunning a pipeline is around a 5 minute affair even for a really simple pipeline.
   
   So this function will work. In the sense that 5 minutes is definitely going to be long enough to fix the flakiness.  But waiting a guaranteed 30 seconds before running the pipeline is probably still better overall.
   
   By the way, I should probably mention that tests within a package in Go run in sequence by default. So the reason I'm being very conscious of every additional integration test in this file is because each one will make the entire test suite take ~5 minutes longer to finish (on Dataflow at least).
   
   Anyway, I'm still gonna merge this because this isn't bad enough to block anything. But if you can think of any way to fix the flakiness without rerunning the whole pipeline it would probably be much better for performance on Dataflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1162750237

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1162750229

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1162750236

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lnogueir commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
lnogueir commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r910363672


##########
sdks/go/pkg/beam/io/fhirio/common.go:
##########
@@ -58,38 +62,34 @@ func extractBodyFrom(response *http.Response) (string, error) {
 	return string(bodyBytes), nil
 }
 
-func isBadStatusCode(status string) bool {
-	// 2XXs are successes, otherwise failure.
-	isMatch, err := regexp.MatchString("^2\\d{2}", status)
-	if err != nil {
-		return true
-	}
-	return !isMatch
+type fhirStoreClient interface {
+	readResource(resourcePath string) (*http.Response, error)
+	executeBundle(storePath string, bundle []byte) (*http.Response, error)
+	search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error)
 }
 
-type fhirioFnCommon struct {
-	client                fhirStoreClient
-	resourcesErrorCount   beam.Counter
-	resourcesSuccessCount beam.Counter
-	latencyMs             beam.Distribution
+type fhirStoreClientImpl struct {
+	fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
 }
 
-func (fnc *fhirioFnCommon) setup(namespace string) {
-	if fnc.client == nil {
-		fnc.client = newFhirStoreClient()
-	}
-	fnc.resourcesErrorCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_error_count")
-	fnc.resourcesSuccessCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_success_count")
-	fnc.latencyMs = beam.NewDistribution(namespace, baseMetricPrefix+"latency_ms")
+func (c *fhirStoreClientImpl) readResource(resourcePath string) (*http.Response, error) {
+	return c.fhirService.Read(resourcePath).Do()
 }
 
-type fhirStoreClient interface {
-	readResource(resourcePath string) (*http.Response, error)
-	executeBundle(storePath string, bundle []byte) (*http.Response, error)
+func (c *fhirStoreClientImpl) executeBundle(storePath string, bundle []byte) (*http.Response, error) {
+	return c.fhirService.ExecuteBundle(storePath, bytes.NewReader(bundle)).Do()
 }
 
-type fhirStoreClientImpl struct {
-	fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
+func (c *fhirStoreClientImpl) search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error) {
+	queryParams := make([]googleapi.CallOption, 0)
+	for key, value := range queries {
+		queryParams = append(queryParams, googleapi.QueryParameter(key, value))
+	}
+
+	if pageToken != "" {
+		queryParams = append(queryParams, googleapi.QueryParameter(pageTokenParameterKey, pageToken))
+	}
+	return c.fhirService.Search(storePath, &healthcare.SearchResourcesRequest{ResourceType: resourceType}).Do(queryParams...)

Review Comment:
   Ah, I didn't notice that. Good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] msbukal commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
msbukal commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r911380472


##########
sdks/go/test/integration/io/fhirio/fhirio_test.go:
##########
@@ -190,7 +206,10 @@ func TestFhirIO_Read(t *testing.T) {
 	passert.Empty(s, failedReads)
 	passert.Count(s, resources, "", len(testResourcePaths))
 
-	ptest.RunAndValidate(t, p)
+	err := runWithBackoffRetries(t, p)

Review Comment:
   Don't need to retry in Read



##########
sdks/go/pkg/beam/io/fhirio/search_test.go:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+	"bytes"
+	"io"
+	"net/http"
+	"strconv"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestSearch_Errors(t *testing.T) {
+	testCases := []struct {
+		name           string
+		client         fhirStoreClient
+		containedError string
+	}{
+		{
+			name:           "Search request returns error",
+			client:         requestReturnErrorFakeClient,
+			containedError: fakeRequestReturnErrorMessage,
+		},
+		{
+			name:           "Search request returns bad status",
+			client:         badStatusFakeClient,
+			containedError: strconv.Itoa(http.StatusForbidden),
+		},
+		{
+			name:           "Search request response body fails to be read",
+			client:         bodyReaderErrorFakeClient,
+			containedError: fakeBodyReaderErrorMessage,
+		},
+		{
+			name:           "Search request response body failed to be decoded",
+			client:         emptyResponseBodyFakeClient,
+			containedError: io.EOF.Error(),
+		},
+	}
+
+	for _, testCase := range testCases {
+		t.Run(testCase.name, func(t *testing.T) {
+			p, s, testSearchQueryPCollection := ptest.CreateList([]SearchQuery{{}})
+			resources, failedSearches := search(s, "any", testSearchQueryPCollection, testCase.client)
+			passert.Empty(s, resources)
+			passert.Count(s, failedSearches, "", 1)
+			passert.True(s, failedSearches, func(errorMsg string) bool {
+				return strings.Contains(errorMsg, testCase.containedError)
+			})
+			pipelineResult := ptest.RunAndValidate(t, p)
+			err := validateCounter(pipelineResult, errorCounterName, 1)
+			if err != nil {
+				t.Fatalf("validateCounter returned error [%v]", err.Error())
+			}
+		})
+	}
+}
+
+func TestSearch_Pagination(t *testing.T) {
+	paginationFakeClient := &fakeFhirStoreClient{
+		fakeSearch: func(s, s2 string, m map[string]string, pageToken string) (*http.Response, error) {
+			if pageToken == "theNextPageToken" {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": []}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			} else {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			}
+		},
+	}
+	p, s, testSearchQuery := ptest.CreateList([]SearchQuery{{}})
+	resourcesFound, failedSearches := search(s, "any", testSearchQuery, paginationFakeClient)
+	passert.Empty(s, failedSearches)
+	passert.Count(s, resourcesFound, "", 1)
+	resourcesFoundWithoutIdentifier := beam.DropKey(s, resourcesFound)
+	passert.True(s, resourcesFoundWithoutIdentifier, func(resourcesFound []string) bool {
+		return len(resourcesFound) == 2

Review Comment:
   Ah I see, the resource is empty.
   
   Just to make this a bit more readable (in-case I miss the extra `{}`) could you make it something simple like
   
   first page: `{"entry": [{"resource":{"resourceType": "Patient", "id": "1"}}], "link": []}`
   second page: `{"entry": [{"resource":{"resourceType": "Patient", "id": "2"}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`
   
   Don't need to verify the contents.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] youngoli merged pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
youngoli merged PR #21979:
URL: https://github.com/apache/beam/pull/21979


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lnogueir commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
lnogueir commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r908662786


##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(query SearchQuery) ([]string, error) {
+	var (
+		allResources, resourcesInPage []string
+		nextPageToken                 string
+		err                           error
+	)
+	for resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, ""); nextPageToken != ""; resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, nextPageToken) {

Review Comment:
   Agreed. This line was bothering me too. Changed to while loop style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r908907181


##########
sdks/go/pkg/beam/io/fhirio/common.go:
##########
@@ -23,31 +23,33 @@ import (
 	"context"
 	"io"
 	"net/http"
-	"regexp"
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"google.golang.org/api/googleapi"
 	"google.golang.org/api/healthcare/v1"
 	"google.golang.org/api/option"
 )
 
 const (
-	UserAgent        = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
-	baseMetricPrefix = "fhirio/"
+	UserAgent             = "apache-beam-io-google-cloud-platform-healthcare/" + core.SdkVersion
+	baseMetricPrefix      = "fhirio/"
+	pageTokenParameterKey = "_page_token"
 )
 
-func executeRequestAndRecordLatency(ctx context.Context, latencyMs *beam.Distribution, requestSupplier func() (*http.Response, error)) (*http.Response, error) {
+func executeAndRecordLatency[T any](ctx context.Context, latencyMs *beam.Distribution, executionSupplier func() (T, error)) (T, error) {

Review Comment:
   Yes! Especially for repetitive internal details. If the generics are accessible from the user side API, clear documentation needs to set expectations.
   
   If things aren't repetitive, the main question in that case is "why make it generic?". It's easy enough to add later, and difficult to remove if we decide against it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lnogueir commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
lnogueir commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r910244188


##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(query SearchQuery) ([]string, error) {
+	allResources := make([]string, 0)
+	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(query, "")

Review Comment:
   Agreed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1162752591

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] msbukal commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
msbukal commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r911382704


##########
sdks/go/pkg/beam/io/fhirio/search_test.go:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+	"bytes"
+	"io"
+	"net/http"
+	"strconv"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestSearch_Errors(t *testing.T) {
+	testCases := []struct {
+		name           string
+		client         fhirStoreClient
+		containedError string
+	}{
+		{
+			name:           "Search request returns error",
+			client:         requestReturnErrorFakeClient,
+			containedError: fakeRequestReturnErrorMessage,
+		},
+		{
+			name:           "Search request returns bad status",
+			client:         badStatusFakeClient,
+			containedError: strconv.Itoa(http.StatusForbidden),
+		},
+		{
+			name:           "Search request response body fails to be read",
+			client:         bodyReaderErrorFakeClient,
+			containedError: fakeBodyReaderErrorMessage,
+		},
+		{
+			name:           "Search request response body failed to be decoded",
+			client:         emptyResponseBodyFakeClient,
+			containedError: io.EOF.Error(),
+		},
+	}
+
+	for _, testCase := range testCases {
+		t.Run(testCase.name, func(t *testing.T) {
+			p, s, testSearchQueryPCollection := ptest.CreateList([]SearchQuery{{}})
+			resources, failedSearches := search(s, "any", testSearchQueryPCollection, testCase.client)
+			passert.Empty(s, resources)
+			passert.Count(s, failedSearches, "", 1)
+			passert.True(s, failedSearches, func(errorMsg string) bool {
+				return strings.Contains(errorMsg, testCase.containedError)
+			})
+			pipelineResult := ptest.RunAndValidate(t, p)
+			err := validateCounter(pipelineResult, errorCounterName, 1)
+			if err != nil {
+				t.Fatalf("validateCounter returned error [%v]", err.Error())
+			}
+		})
+	}
+}
+
+func TestSearch_Pagination(t *testing.T) {
+	paginationFakeClient := &fakeFhirStoreClient{
+		fakeSearch: func(s, s2 string, m map[string]string, pageToken string) (*http.Response, error) {
+			if pageToken == "theNextPageToken" {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": []}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			} else {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			}
+		},
+	}
+	p, s, testSearchQuery := ptest.CreateList([]SearchQuery{{}})
+	resourcesFound, failedSearches := search(s, "any", testSearchQuery, paginationFakeClient)
+	passert.Empty(s, failedSearches)
+	passert.Count(s, resourcesFound, "", 1)
+	resourcesFoundWithoutIdentifier := beam.DropKey(s, resourcesFound)
+	passert.True(s, resourcesFoundWithoutIdentifier, func(resourcesFound []string) bool {
+		return len(resourcesFound) == 2

Review Comment:
   Ah I see, the resource is empty.
   
   Just to make this a bit more readable (in-case I miss the extra `{}`) could you make it something simple like
   
   first page: `{"entry": [{"resource":{"resourceType": "Patient", "id": "2"}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`
   second page: `{"entry": [{"resource":{"resourceType": "Patient", "id": "1"}}], "link": []}`
   
   Don't need to verify the contents.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1162750230

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lnogueir commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
lnogueir commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1162751693

   R: @youngoli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lnogueir commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
lnogueir commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1162751810

   R: @msbukal 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1162753280

   # [Codecov](https://codecov.io/gh/apache/beam/pull/21979?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#21979](https://codecov.io/gh/apache/beam/pull/21979?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (be689e1) into [master](https://codecov.io/gh/apache/beam/commit/2574f9106c75e413310dcb7756952889da3fe7cd?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2574f91) will **decrease** coverage by `0.05%`.
   > The diff coverage is `10.58%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #21979      +/-   ##
   ==========================================
   - Coverage   73.99%   73.93%   -0.06%     
   ==========================================
     Files         703      704       +1     
     Lines       92934    93014      +80     
   ==========================================
   + Hits        68764    68768       +4     
   - Misses      22904    22980      +76     
     Partials     1266     1266              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `50.82% <10.58%> (-0.14%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/21979?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/io/fhirio/search.go](https://codecov.io/gh/apache/beam/pull/21979/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9pby9maGlyaW8vc2VhcmNoLmdv) | `5.63% <5.63%> (ø)` | |
   | [sdks/go/pkg/beam/io/fhirio/common.go](https://codecov.io/gh/apache/beam/pull/21979/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9pby9maGlyaW8vY29tbW9uLmdv) | `47.82% <25.00%> (-11.64%)` | :arrow_down: |
   | [sdks/go/pkg/beam/io/fhirio/execute\_bundles.go](https://codecov.io/gh/apache/beam/pull/21979/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9pby9maGlyaW8vZXhlY3V0ZV9idW5kbGVzLmdv) | `56.71% <100.00%> (ø)` | |
   | [sdks/go/pkg/beam/io/fhirio/read.go](https://codecov.io/gh/apache/beam/pull/21979/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9pby9maGlyaW8vcmVhZC5nbw==) | `82.75% <100.00%> (ø)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/21979?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/21979?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [2574f91...be689e1](https://codecov.io/gh/apache/beam/pull/21979?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lnogueir commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
lnogueir commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r910241054


##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {

Review Comment:
   Yep, it's provided by beam. Explanation can be found in the [beam programming](https://beam.apache.org/documentation/programming-guide) guide section 4.2.1.2.



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {

Review Comment:
   Yep, it's provided by beam. Explanation can be found in the [beam programming guide](https://beam.apache.org/documentation/programming-guide) section 4.2.1.2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] youngoli commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
youngoli commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1169456820

   Looks good to me. I can merge whenever, just say the word.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] youngoli commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
youngoli commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1169457012

   Run Go Postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] youngoli commented on pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
youngoli commented on PR #21979:
URL: https://github.com/apache/beam/pull/21979#issuecomment-1171660696

   Run Go Postcommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] msbukal commented on a diff in pull request #21979: Add Search transform to Go FhirIO

Posted by GitBox <gi...@apache.org>.
msbukal commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r911382704


##########
sdks/go/pkg/beam/io/fhirio/search_test.go:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+	"bytes"
+	"io"
+	"net/http"
+	"strconv"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestSearch_Errors(t *testing.T) {
+	testCases := []struct {
+		name           string
+		client         fhirStoreClient
+		containedError string
+	}{
+		{
+			name:           "Search request returns error",
+			client:         requestReturnErrorFakeClient,
+			containedError: fakeRequestReturnErrorMessage,
+		},
+		{
+			name:           "Search request returns bad status",
+			client:         badStatusFakeClient,
+			containedError: strconv.Itoa(http.StatusForbidden),
+		},
+		{
+			name:           "Search request response body fails to be read",
+			client:         bodyReaderErrorFakeClient,
+			containedError: fakeBodyReaderErrorMessage,
+		},
+		{
+			name:           "Search request response body failed to be decoded",
+			client:         emptyResponseBodyFakeClient,
+			containedError: io.EOF.Error(),
+		},
+	}
+
+	for _, testCase := range testCases {
+		t.Run(testCase.name, func(t *testing.T) {
+			p, s, testSearchQueryPCollection := ptest.CreateList([]SearchQuery{{}})
+			resources, failedSearches := search(s, "any", testSearchQueryPCollection, testCase.client)
+			passert.Empty(s, resources)
+			passert.Count(s, failedSearches, "", 1)
+			passert.True(s, failedSearches, func(errorMsg string) bool {
+				return strings.Contains(errorMsg, testCase.containedError)
+			})
+			pipelineResult := ptest.RunAndValidate(t, p)
+			err := validateCounter(pipelineResult, errorCounterName, 1)
+			if err != nil {
+				t.Fatalf("validateCounter returned error [%v]", err.Error())
+			}
+		})
+	}
+}
+
+func TestSearch_Pagination(t *testing.T) {
+	paginationFakeClient := &fakeFhirStoreClient{
+		fakeSearch: func(s, s2 string, m map[string]string, pageToken string) (*http.Response, error) {
+			if pageToken == "theNextPageToken" {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": []}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			} else {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			}
+		},
+	}
+	p, s, testSearchQuery := ptest.CreateList([]SearchQuery{{}})
+	resourcesFound, failedSearches := search(s, "any", testSearchQuery, paginationFakeClient)
+	passert.Empty(s, failedSearches)
+	passert.Count(s, resourcesFound, "", 1)
+	resourcesFoundWithoutIdentifier := beam.DropKey(s, resourcesFound)
+	passert.True(s, resourcesFoundWithoutIdentifier, func(resourcesFound []string) bool {
+		return len(resourcesFound) == 2

Review Comment:
   Ah I see, the resource is empty.
   
   Just to make this a bit more readable (in-case I miss the extra `{}`) could you make it something simple like
   
   first page: `{"entry": [{"resource":{"resourceType": "Patient", "id": "1"}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`
   second page: `{"entry": [{"resource":{"resourceType": "Patient", "id": "2"}}], "link": []}`
   
   Don't need to verify the contents.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org