You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/29 16:46:20 UTC

[GitHub] [beam] msbukal commented on a diff in pull request #21979: Add Search transform to Go FhirIO

msbukal commented on code in PR #21979:
URL: https://github.com/apache/beam/pull/21979#discussion_r910184554


##########
sdks/go/pkg/beam/io/fhirio/common.go:
##########
@@ -58,38 +62,34 @@ func extractBodyFrom(response *http.Response) (string, error) {
 	return string(bodyBytes), nil
 }
 
-func isBadStatusCode(status string) bool {
-	// 2XXs are successes, otherwise failure.
-	isMatch, err := regexp.MatchString("^2\\d{2}", status)
-	if err != nil {
-		return true
-	}
-	return !isMatch
+type fhirStoreClient interface {
+	readResource(resourcePath string) (*http.Response, error)
+	executeBundle(storePath string, bundle []byte) (*http.Response, error)
+	search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error)
 }
 
-type fhirioFnCommon struct {
-	client                fhirStoreClient
-	resourcesErrorCount   beam.Counter
-	resourcesSuccessCount beam.Counter
-	latencyMs             beam.Distribution
+type fhirStoreClientImpl struct {
+	fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
 }
 
-func (fnc *fhirioFnCommon) setup(namespace string) {
-	if fnc.client == nil {
-		fnc.client = newFhirStoreClient()
-	}
-	fnc.resourcesErrorCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_error_count")
-	fnc.resourcesSuccessCount = beam.NewCounter(namespace, baseMetricPrefix+"resource_success_count")
-	fnc.latencyMs = beam.NewDistribution(namespace, baseMetricPrefix+"latency_ms")
+func (c *fhirStoreClientImpl) readResource(resourcePath string) (*http.Response, error) {
+	return c.fhirService.Read(resourcePath).Do()
 }
 
-type fhirStoreClient interface {
-	readResource(resourcePath string) (*http.Response, error)
-	executeBundle(storePath string, bundle []byte) (*http.Response, error)
+func (c *fhirStoreClientImpl) executeBundle(storePath string, bundle []byte) (*http.Response, error) {
+	return c.fhirService.ExecuteBundle(storePath, bytes.NewReader(bundle)).Do()
 }
 
-type fhirStoreClientImpl struct {
-	fhirService *healthcare.ProjectsLocationsDatasetsFhirStoresFhirService
+func (c *fhirStoreClientImpl) search(storePath, resourceType string, queries map[string]string, pageToken string) (*http.Response, error) {
+	queryParams := make([]googleapi.CallOption, 0)
+	for key, value := range queries {
+		queryParams = append(queryParams, googleapi.QueryParameter(key, value))
+	}
+
+	if pageToken != "" {
+		queryParams = append(queryParams, googleapi.QueryParameter(pageTokenParameterKey, pageToken))
+	}
+	return c.fhirService.Search(storePath, &healthcare.SearchResourcesRequest{ResourceType: resourceType}).Do(queryParams...)

Review Comment:
   Ah... did you get this pattern from an example text somewhere? (If you did, could you link me it?)
   
   It's unintended behaviour to be able to specify the resource type like this, and this caused a large outage for us a couple months back. I fixed this in the Java implementation (https://github.com/apache/beam/pull/17315, see more details at b/227655182)
   
   Could you fix this, if resourceType is empty then you use [.Search](https://pkg.go.dev/google.golang.org/api/healthcare/v1#ProjectsLocationsDatasetsFhirStoresFhirService.Search), otherwise use [.SearchType](https://pkg.go.dev/google.golang.org/api/healthcare/v1#ProjectsLocationsDatasetsFhirStoresFhirService.SearchType)



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(query SearchQuery) ([]string, error) {
+	allResources := make([]string, 0)
+	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(query, "")
+	for nextPageToken != "" {
+		allResources = append(allResources, resourcesInPage...)
+		resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, nextPageToken)
+	}
+	// Must add resources from last page.
+	allResources = append(allResources, resourcesInPage...)
+	return allResources, err
+}
+
+// Performs a search request retrieving results only from the page identified by
+// `pageToken`. If `pageToken` is the empty string it will retrieve the results
+// from the first page.
+func (fn *searchResourcesFn) searchResourcesPaginated(query SearchQuery, pageToken string) ([]string, string, error) {
+	response, err := fn.client.search(fn.FhirStorePath, query.ResourceType, query.Parameters, pageToken)
+	if err != nil {
+		return nil, "", err
+	}
+
+	body, err := extractBodyFrom(response)
+	if err != nil {
+		return nil, "", err
+	}
+
+	var bodyFields struct {
+		Entries []interface{}        `json:"entry"`
+		Links   []responseLinkFields `json:"link"`
+	}
+	err = json.NewDecoder(strings.NewReader(body)).Decode(&bodyFields)
+	if err != nil {
+		return nil, "", err
+	}
+
+	resourcesFoundInPage := mapEntryToString(bodyFields.Entries)
+	return resourcesFoundInPage, extractNextPageTokenFrom(bodyFields.Links), nil
+}
+
+func mapEntryToString(entries []interface{}) []string {
+	stringifiedEntries := make([]string, 0)
+	for _, entry := range entries {
+		entryBytes, err := json.Marshal(entry)
+		if err != nil {
+			continue
+		}
+		stringifiedEntries = append(stringifiedEntries, string(entryBytes))
+	}
+	return stringifiedEntries
+}
+
+func extractNextPageTokenFrom(searchResponseLinks []responseLinkFields) string {
+	for _, link := range searchResponseLinks {
+		// The link with relation field valued "next" contains the page token
+		if link.Relation != "next" {
+			continue
+		}
+
+		parsedUrl, err := url.Parse(link.Url)
+		if err != nil {
+			break

Review Comment:
   Same here, should log the error.



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {

Review Comment:
   Sorry I'm confused, where are `emitFoundResources` and `emitDeadLetter` defined and implemented? Is this autoprovided by beam?



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(query SearchQuery) ([]string, error) {
+	allResources := make([]string, 0)
+	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(query, "")

Review Comment:
   I think this would read cleaner if you did
   
   ```
   	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(query, "")
   	allResources = append(allResources, resourcesInPage...)
   	for nextPageToken != "" {
   		resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, nextPageToken)
   		allResources = append(allResources, resourcesInPage...)
   	}
   ```
   
   then we add the resources we just queried, instead of pairing "the last search" and "the next search".



##########
sdks/go/pkg/beam/io/fhirio/search_test.go:
##########
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+	"bytes"
+	"io"
+	"net/http"
+	"strconv"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestSearch_Errors(t *testing.T) {
+	testCases := []struct {
+		name           string
+		client         fhirStoreClient
+		containedError string
+	}{
+		{
+			name:           "Search request returns error",
+			client:         requestReturnErrorFakeClient,
+			containedError: fakeRequestReturnErrorMessage,
+		},
+		{
+			name:           "Search request returns bad status",
+			client:         badStatusFakeClient,
+			containedError: strconv.Itoa(http.StatusForbidden),
+		},
+		{
+			name:           "Search request response body fails to be read",
+			client:         bodyReaderErrorFakeClient,
+			containedError: fakeBodyReaderErrorMessage,
+		},
+		{
+			name:           "Search request response body failed to be decoded",
+			client:         emptyResponseBodyFakeClient,
+			containedError: io.EOF.Error(),
+		},
+	}
+
+	for _, testCase := range testCases {
+		t.Run(testCase.name, func(t *testing.T) {
+			p, s, testSearchQueryPCollection := ptest.CreateList([]SearchQuery{{}})
+			resources, failedSearches := search(s, "any", testSearchQueryPCollection, testCase.client)
+			passert.Empty(s, resources)
+			passert.Count(s, failedSearches, "", 1)
+			passert.True(s, failedSearches, func(errorMsg string) bool {
+				return strings.Contains(errorMsg, testCase.containedError)
+			})
+			pipelineResult := ptest.RunAndValidate(t, p)
+			err := validateCounter(pipelineResult, errorCounterName, 1)
+			if err != nil {
+				t.Fatalf("validateCounter returned error [%v]", err.Error())
+			}
+		})
+	}
+}
+
+func TestSearch_Pagination(t *testing.T) {
+	paginationFakeClient := &fakeFhirStoreClient{
+		fakeSearch: func(s, s2 string, m map[string]string, pageToken string) (*http.Response, error) {
+			if pageToken == "theNextPageToken" {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": []}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			} else {
+				return &http.Response{
+					Body:       io.NopCloser(bytes.NewBufferString(`{"entry": [{"resource":{}}], "link": [{"relation":"next", "url":"https://healthcare.googleapis.com?_page_token=theNextPageToken"}]}`)),
+					StatusCode: http.StatusOK,
+				}, nil
+			}
+		},
+	}
+	p, s, testSearchQuery := ptest.CreateList([]SearchQuery{{}})
+	resourcesFound, failedSearches := search(s, "any", testSearchQuery, paginationFakeClient)
+	passert.Empty(s, failedSearches)
+	passert.Count(s, resourcesFound, "", 1)
+	resourcesFoundWithoutIdentifier := beam.DropKey(s, resourcesFound)
+	passert.True(s, resourcesFoundWithoutIdentifier, func(resourcesFound []string) bool {
+		return len(resourcesFound) == 2

Review Comment:
   How is resources found equal to 2 when you don't return any resources in the fake client?



##########
sdks/go/test/integration/io/fhirio/fhirio_test.go:
##########
@@ -153,6 +153,7 @@ func populateStore(storePath string) []string {
 			resourcePaths = append(resourcePaths, resourcePath)
 		}
 	}
+	time.Sleep(time.Second) // give some time for data to propagate. prevents flaky results

Review Comment:
   Now that we've added search... If indexing is overloaded/slow, this could take a lot longer than a second. Can we add a follow-up maybe to make the search test retry with a backoff? And maybe make the default here like 30 seconds, to prevent flakes.



##########
sdks/go/pkg/beam/io/fhirio/search.go:
##########
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+	"context"
+	"encoding/json"
+	"net/url"
+	"strings"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+	register.DoFn4x0[context.Context, SearchQuery, func(string, []string), func(string)]((*searchResourcesFn)(nil))
+	register.Emitter1[string]()
+	register.Emitter2[string, []string]()
+}
+
+// SearchQuery concisely represents a FHIR search query, and should be used as
+// input type for the Search transform.
+type SearchQuery struct {
+	// An identifier for the query, if there is source information to propagate
+	// through the pipeline.
+	Identifier string
+	// Search will be performed only on resources of type ResourceType. If not set
+	// (i.e. ""), the search is performed across all resources.
+	ResourceType string
+	// Query parameters for a FHIR search request as per https://www.hl7.org/fhir/search.html.
+	Parameters map[string]string
+}
+
+type responseLinkFields struct {
+	Relation string `json:"relation"`
+	Url      string `json:"url"`
+}
+
+type searchResourcesFn struct {
+	fnCommonVariables
+	// Path to FHIR store where search will be performed.
+	FhirStorePath string
+}
+
+func (fn searchResourcesFn) String() string {
+	return "searchResourcesFn"
+}
+
+func (fn *searchResourcesFn) Setup() {
+	fn.fnCommonVariables.setup(fn.String())
+}
+
+func (fn *searchResourcesFn) ProcessElement(ctx context.Context, query SearchQuery, emitFoundResources func(string, []string), emitDeadLetter func(string)) {
+	resourcesFound, err := executeAndRecordLatency(ctx, &fn.latencyMs, func() ([]string, error) {
+		return fn.searchResources(query)
+	})
+	if err != nil {
+		fn.resourcesErrorCount.Inc(ctx, 1)
+		emitDeadLetter(errors.Wrapf(err, "error occurred while performing search for query: [%v]", query).Error())
+		return
+	}
+
+	fn.resourcesSuccessCount.Inc(ctx, 1)
+	emitFoundResources(query.Identifier, resourcesFound)
+}
+
+func (fn *searchResourcesFn) searchResources(query SearchQuery) ([]string, error) {
+	allResources := make([]string, 0)
+	resourcesInPage, nextPageToken, err := fn.searchResourcesPaginated(query, "")
+	for nextPageToken != "" {
+		allResources = append(allResources, resourcesInPage...)
+		resourcesInPage, nextPageToken, err = fn.searchResourcesPaginated(query, nextPageToken)
+	}
+	// Must add resources from last page.
+	allResources = append(allResources, resourcesInPage...)
+	return allResources, err
+}
+
+// Performs a search request retrieving results only from the page identified by
+// `pageToken`. If `pageToken` is the empty string it will retrieve the results
+// from the first page.
+func (fn *searchResourcesFn) searchResourcesPaginated(query SearchQuery, pageToken string) ([]string, string, error) {
+	response, err := fn.client.search(fn.FhirStorePath, query.ResourceType, query.Parameters, pageToken)
+	if err != nil {
+		return nil, "", err
+	}
+
+	body, err := extractBodyFrom(response)
+	if err != nil {
+		return nil, "", err
+	}
+
+	var bodyFields struct {
+		Entries []interface{}        `json:"entry"`
+		Links   []responseLinkFields `json:"link"`
+	}
+	err = json.NewDecoder(strings.NewReader(body)).Decode(&bodyFields)
+	if err != nil {
+		return nil, "", err
+	}
+
+	resourcesFoundInPage := mapEntryToString(bodyFields.Entries)
+	return resourcesFoundInPage, extractNextPageTokenFrom(bodyFields.Links), nil
+}
+
+func mapEntryToString(entries []interface{}) []string {
+	stringifiedEntries := make([]string, 0)
+	for _, entry := range entries {
+		entryBytes, err := json.Marshal(entry)
+		if err != nil {
+			continue

Review Comment:
   Hmm we should at least log an error here somehow, if we silently drop an entry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org