You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/03 17:36:42 UTC

[GitHub] [beam] robertwb opened a new pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

robertwb opened a new pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305
 
 
   Falls back to legacy staging if no dependencies are present and the
   retrieval token is non-trivial.
   
   Currently the dependencies must be of role staging_to. Future work
   to handle more expressive roles.
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
robertwb commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#issuecomment-608763196
 
 
   Yeah, I'm tracking that bug. This doesn't make things worse. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403290897
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,156 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
+	} else {
+		return nil, errors.Errorf("Unsupported artifact %v", request.Artifact)
+	}
+}
+
+func (fake fakeGetArtifactResponse) Recv() (*pb.GetArtifactResponse, error) {
 
 Review comment:
   Yes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#issuecomment-608705326
 
 
   That appears to have been the case since https://github.com/apache/beam/pull/11305

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#issuecomment-608587473
 
 
   Run Go Postcommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403292561
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,154 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
 
 Review comment:
   You need to return the pointer instance here so that the pointer methods can satisfy the interface.
   
   Value methods are "promoted" to the derived pointer type, but not the other way around.
   
   From Effective Go:
    The rule about pointers vs. values for receivers is that value methods can be invoked on pointers and values, but pointer methods can only be invoked on pointers.
   
   This rule arises because pointer methods can modify the receiver; invoking them on a value would cause the method to receive a copy of the value, so any modifications would be discarded. The language therefore disallows this mistake. There is a handy exception, though. When the value is addressable, the language takes care of the common case of invoking a pointer method on a value by inserting the address operator automatically. In our example, the variable b is addressable, so we can call its Write method with just b.Write. The compiler will rewrite that to (&b).Write for us.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403281683
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,156 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
+	} else {
+		return nil, errors.Errorf("Unsupported artifact %v", request.Artifact)
+	}
+}
+
+func (fake fakeGetArtifactResponse) Recv() (*pb.GetArtifactResponse, error) {
+	if *fake.index < len(fake.data) {
+		*fake.index += 1
+		return &pb.GetArtifactResponse{Data: fake.data[*fake.index-1 : *fake.index]}, nil
+	} else {
+		return nil, io.EOF
+	}
+}
+
+type fakeGetArtifactResponse struct {
+	data  []byte
+	index *int
 
 Review comment:
   That is what I tried at first, but it complained that it didn't satisfy the interface. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403265533
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,156 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
+	} else {
+		return nil, errors.Errorf("Unsupported artifact %v", request.Artifact)
+	}
 
 Review comment:
   ```suggestion
   	}
   	return nil, errors.Errorf("Unsupported artifact %v", request.Artifact)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#issuecomment-608678863
 
 
   Run Go PostCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403292561
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,154 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
 
 Review comment:
   You need to return the pointer instance here so that the pointer methods can satisfy the interface.
   
   Value methods are "promoted" to the derived pointer type, but not the other way around.
   
   From Effective Go:
    The rule about pointers vs. values for receivers is that value methods can be invoked on pointers and values, but pointer methods can only be invoked on pointers.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck edited a comment on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck edited a comment on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#issuecomment-608705326
 
 
   That appears to have been the case since https://github.com/apache/beam/pull/11305
   
   Edit: Whoopes, wrong queue. That's this PR.
   
   It's been broken since this PR PostCommit run
   https://builds.apache.org/job/beam_PostCommit_Go/6624/
   
   But then it was complaining about the legacy service missing. This at least moves things forward a bit.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403265387
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,156 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
+	} else {
+		return nil, errors.Errorf("Unsupported artifact %v", request.Artifact)
+	}
+}
+
+func (fake fakeGetArtifactResponse) Recv() (*pb.GetArtifactResponse, error) {
+	if *fake.index < len(fake.data) {
+		*fake.index += 1
+		return &pb.GetArtifactResponse{Data: fake.data[*fake.index-1 : *fake.index]}, nil
+	} else {
+		return nil, io.EOF
+	}
 
 Review comment:
   ```suggestion
   	} 
   	return nil, io.EOF
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403284304
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,156 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
+	} else {
+		return nil, errors.Errorf("Unsupported artifact %v", request.Artifact)
+	}
+}
+
+func (fake fakeGetArtifactResponse) Recv() (*pb.GetArtifactResponse, error) {
 
 Review comment:
   It's the object that represents the response stream (and is returned by the actual client). Would naming it `fakeGetArtifactResponseStream` be better? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb merged pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403268545
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,156 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
+	} else {
+		return nil, errors.Errorf("Unsupported artifact %v", request.Artifact)
+	}
+}
+
+func (fake fakeGetArtifactResponse) Recv() (*pb.GetArtifactResponse, error) {
 
 Review comment:
   I think It's a bit confusing that this is called a response when it's a fake Client which returns responses. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] robertwb commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403320150
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,154 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
 
 Review comment:
   Thanks for the helpful explication. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck edited a comment on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck edited a comment on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#issuecomment-608705326
 
 
   That appears to have been the case since https://github.com/apache/beam/pull/11305
   
   Edit: Whoopes, wrong queue. That's this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403251324
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize.go
 ##########
 @@ -31,15 +31,148 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"github.com/golang/protobuf/proto"
+)
+
+// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto
+const (
+	URNStagingTo      = "beam:artifact:role:staging_to:v1"
+	NoArtifactsStaged = "__no_artifacts_staged__"
 )
 
 // Materialize is a convenience helper for ensuring that all artifacts are
 // present and uncorrupted. It interprets each artifact name as a relative
 // path under the dest directory. It does not retrieve valid artifacts already
 // present.
-func Materialize(ctx context.Context, endpoint string, rt string, dest string) ([]*pb.ArtifactMetadata, error) {
+// TODO(BEAM-9577): Return a mapping of filename to dependency, rather than []*pb.ArtifactMetadata.
+// TODO(BEAM-9577): Leverage richness of roles rather than magic names to understand artifacts.
+func Materialize(ctx context.Context, endpoint string, dependencies []*pipeline_v1.ArtifactInformation, rt string, dest string) ([]*pb.ArtifactMetadata, error) {
+	if len(dependencies) > 0 {
+		return newMaterialize(ctx, endpoint, dependencies, dest)
+	} else if rt == "" || rt == NoArtifactsStaged {
+		return []*pb.ArtifactMetadata{}, nil
+	} else {
+		return legacyMaterialize(ctx, endpoint, rt, dest)
+	}
+}
+
+func newMaterialize(ctx context.Context, endpoint string, dependencies []*pipeline_v1.ArtifactInformation, dest string) ([]*pb.ArtifactMetadata, error) {
+	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
+	if err != nil {
+		return nil, err
+	}
+	defer cc.Close()
+
+	return newMaterializeWithClient(ctx, pb.NewArtifactRetrievalServiceClient(cc), dependencies, dest)
+}
+
+func newMaterializeWithClient(ctx context.Context, client pb.ArtifactRetrievalServiceClient, dependencies []*pipeline_v1.ArtifactInformation, dest string) ([]*pb.ArtifactMetadata, error) {
+	resolution, err := client.ResolveArtifact(ctx, &pb.ResolveArtifactRequest{Artifacts: dependencies})
+	if err != nil {
+		return nil, err
+	}
+
+	var md []*pb.ArtifactMetadata
+	var list []retrievable
+	for _, dep := range resolution.Replacements {
+		path, err := extractStagingToPath(dep)
+		if err != nil {
+			return nil, err
+		}
+		md = append(md, &pb.ArtifactMetadata{
+			Name: path,
+		})
+
+		list = append(list, &artifact{
+			client: client,
+			dep:    dep,
+		})
+	}
+
+	return md, MultiRetrieve(ctx, 10, list, dest)
+}
+
+func extractStagingToPath(artifact *pipeline_v1.ArtifactInformation) (string, error) {
+	if artifact.RoleUrn != URNStagingTo {
+		return "", errors.Errorf("Unsupported artifact role %s", artifact.RoleUrn)
+	}
+	role := pipeline_v1.ArtifactStagingToRolePayload{}
+	if err := proto.Unmarshal(artifact.RolePayload, &role); err != nil {
+		return "", err
+	}
+	return role.StagedName, nil
+}
+
+type artifact struct {
+	client pb.ArtifactRetrievalServiceClient
+	dep    *pipeline_v1.ArtifactInformation
+}
+
+func (a artifact) retrieve(ctx context.Context, dest string) error {
+	path, err := extractStagingToPath(a.dep)
+	if err != nil {
+		return err
+	}
+
+	filename := filepath.Join(dest, filepath.FromSlash(path))
+
+	_, err = os.Stat(filename)
+	if err == nil {
+		if err = os.Remove(filename); err != nil {
+			return errors.Errorf("failed to delete: %v (remove: %v)", filename, err)
+		}
+	} else if !os.IsNotExist(err) {
+		return errors.Wrapf(err, "failed to stat %v", filename)
+	}
+
+	stream, err := a.client.GetArtifact(ctx, &pb.GetArtifactRequest{Artifact: a.dep})
+	if err != nil {
+		return err
+	}
+
+	fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0755)
+	if err != nil {
+		return err
+	}
+	w := bufio.NewWriter(fd)
+
+	err = writeChunks(stream, w)
+	if err != nil {
+		fd.Close() // drop any buffered content
+		return errors.Wrapf(err, "failed to retrieve chunk for %v", filename)
+	}
+	if err := w.Flush(); err != nil {
+		fd.Close()
+		return errors.Wrapf(err, "failed to flush chunks for %v", filename)
+	}
+	if err := fd.Close(); err != nil {
+		return err
+	}
+
+	return nil
 
 Review comment:
   ```suggestion
   	return fd.Close()
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403292756
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,156 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
+	} else {
+		return nil, errors.Errorf("Unsupported artifact %v", request.Artifact)
+	}
+}
+
+func (fake fakeGetArtifactResponse) Recv() (*pb.GetArtifactResponse, error) {
+	if *fake.index < len(fake.data) {
+		*fake.index += 1
+		return &pb.GetArtifactResponse{Data: fake.data[*fake.index-1 : *fake.index]}, nil
+	} else {
+		return nil, io.EOF
+	}
+}
+
+type fakeGetArtifactResponse struct {
+	data  []byte
+	index *int
 
 Review comment:
   Answered at the definition of the instance.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403270941
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize_test.go
 ##########
 @@ -148,6 +153,156 @@ func stage(ctx context.Context, scl pb.LegacyArtifactStagingServiceClient, t *te
 	return md
 }
 
+// Test for new artifact retrieval.
+
+func TestNewRetrieveWithManyFiles(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.resolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func TestNewRetrieveWithResolution(t *testing.T) {
+	expected := map[string]string{"a.txt": "a", "b.txt": "bbb", "c.txt": "cccccccc"}
+
+	client := fakeRetrievalService{
+		artifacts: expected,
+	}
+
+	dest := makeTempDir(t)
+	defer os.RemoveAll(dest)
+	ctx := grpcx.WriteWorkerID(context.Background(), "worker")
+
+	mds, err := newMaterializeWithClient(ctx, client, client.unresolvedArtifacts(), dest)
+	if err != nil {
+		t.Fatalf("materialize failed: %v", err)
+	}
+
+	checkStagedFiles(mds, dest, expected, t)
+}
+
+func checkStagedFiles(mds []*pb.ArtifactMetadata, dest string, expected map[string]string, t *testing.T) {
+	if len(mds) != len(expected) {
+		t.Errorf("wrong number of artifacts staged %v vs %v", len(mds), len(expected))
+	}
+	for _, md := range mds {
+		filename := filepath.Join(dest, filepath.FromSlash(md.Name))
+		fd, err := os.Open(filename)
+		if err != nil {
+			t.Errorf("error opening file %v", err)
+		}
+		defer fd.Close()
+
+		data := make([]byte, 1<<20)
+		n, err := fd.Read(data)
+		if err != nil {
+			t.Errorf("error reading file %v", err)
+		}
+
+		if string(data[:n]) != expected[md.Name] {
+			t.Errorf("missmatched contents for %v: '%s' vs '%s'", md.Name, string(data[:n]), expected[md.Name])
+		}
+	}
+}
+
+type fakeRetrievalService struct {
+	artifacts map[string]string // name -> content
+}
+
+func (fake fakeRetrievalService) resolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	var artifacts []*pipeline_v1.ArtifactInformation
+	for name, contents := range fake.artifacts {
+		payload, _ := proto.Marshal(&pipeline_v1.ArtifactStagingToRolePayload{
+			StagedName: name})
+		artifacts = append(artifacts, &pipeline_v1.ArtifactInformation{
+			TypeUrn:     "resolved",
+			TypePayload: []byte(contents),
+			RoleUrn:     URNStagingTo,
+			RolePayload: payload,
+		})
+	}
+	return artifacts
+}
+
+func (fake fakeRetrievalService) unresolvedArtifacts() []*pipeline_v1.ArtifactInformation {
+	return []*pipeline_v1.ArtifactInformation{
+		&pipeline_v1.ArtifactInformation{
+			TypeUrn: "unresolved",
+		},
+	}
+}
+
+func (fake fakeRetrievalService) ResolveArtifact(ctx context.Context, request *pb.ResolveArtifactRequest, opts ...grpc.CallOption) (*pb.ResolveArtifactResponse, error) {
+	response := pb.ResolveArtifactResponse{}
+	for _, dep := range request.Artifacts {
+		if dep.TypeUrn == "unresolved" {
+			response.Replacements = append(response.Replacements, fake.resolvedArtifacts()...)
+		} else {
+			response.Replacements = append(response.Replacements, dep)
+		}
+	}
+	return &response, nil
+}
+
+func (fake fakeRetrievalService) GetArtifact(ctx context.Context, request *pb.GetArtifactRequest, opts ...grpc.CallOption) (pb.ArtifactRetrievalService_GetArtifactClient, error) {
+	var index int
+	if request.Artifact.TypeUrn == "resolved" {
+		return fakeGetArtifactResponse{data: request.Artifact.TypePayload, index: &index}, nil
+	} else {
+		return nil, errors.Errorf("Unsupported artifact %v", request.Artifact)
+	}
+}
+
+func (fake fakeGetArtifactResponse) Recv() (*pb.GetArtifactResponse, error) {
+	if *fake.index < len(fake.data) {
+		*fake.index += 1
+		return &pb.GetArtifactResponse{Data: fake.data[*fake.index-1 : *fake.index]}, nil
+	} else {
+		return nil, io.EOF
+	}
+}
+
+type fakeGetArtifactResponse struct {
+	data  []byte
+	index *int
 
 Review comment:
   Why a pointer to the index rather than make the methods on the pointer to the struct? The implementation seems to be trying to avoid the implicit value method copying, instead of just using pointer methods. 
   
   See https://golang.org/doc/effective_go.html#methods for more information.
   
   tldr; replacing all the (fake fakeGetArtifactResponse) with (fake *fakeGetArtifactResponse) puts the method on the pointer, which means that repeated calls have access to the full state, and you don't need the indirections to have index increment as expected.
   
   This also applies to the other fakes or interfaces you've written, but given that you're only modifying reference state if at all (eg. maps, or pointers), then it doesn't matter so much that a value method is being used.
   
   The general rule is that when in doubt, use a pointer receiver.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lostluck commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#issuecomment-608704872
 
 
   In the post commit, it doesn't look the dataflow jobs still don't like the current situation. They claim "no artifacts staged". 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services