You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/03 19:04:07 UTC

[GitHub] [beam] lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.

lostluck commented on a change in pull request #11305: [BEAM-9577] Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403251324
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize.go
 ##########
 @@ -31,15 +31,148 @@ import (
 
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+	"github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"github.com/golang/protobuf/proto"
+)
+
+// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the pipeline_v1 proto
+const (
+	URNStagingTo      = "beam:artifact:role:staging_to:v1"
+	NoArtifactsStaged = "__no_artifacts_staged__"
 )
 
 // Materialize is a convenience helper for ensuring that all artifacts are
 // present and uncorrupted. It interprets each artifact name as a relative
 // path under the dest directory. It does not retrieve valid artifacts already
 // present.
-func Materialize(ctx context.Context, endpoint string, rt string, dest string) ([]*pb.ArtifactMetadata, error) {
+// TODO(BEAM-9577): Return a mapping of filename to dependency, rather than []*pb.ArtifactMetadata.
+// TODO(BEAM-9577): Leverage richness of roles rather than magic names to understand artifacts.
+func Materialize(ctx context.Context, endpoint string, dependencies []*pipeline_v1.ArtifactInformation, rt string, dest string) ([]*pb.ArtifactMetadata, error) {
+	if len(dependencies) > 0 {
+		return newMaterialize(ctx, endpoint, dependencies, dest)
+	} else if rt == "" || rt == NoArtifactsStaged {
+		return []*pb.ArtifactMetadata{}, nil
+	} else {
+		return legacyMaterialize(ctx, endpoint, rt, dest)
+	}
+}
+
+func newMaterialize(ctx context.Context, endpoint string, dependencies []*pipeline_v1.ArtifactInformation, dest string) ([]*pb.ArtifactMetadata, error) {
+	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
+	if err != nil {
+		return nil, err
+	}
+	defer cc.Close()
+
+	return newMaterializeWithClient(ctx, pb.NewArtifactRetrievalServiceClient(cc), dependencies, dest)
+}
+
+func newMaterializeWithClient(ctx context.Context, client pb.ArtifactRetrievalServiceClient, dependencies []*pipeline_v1.ArtifactInformation, dest string) ([]*pb.ArtifactMetadata, error) {
+	resolution, err := client.ResolveArtifact(ctx, &pb.ResolveArtifactRequest{Artifacts: dependencies})
+	if err != nil {
+		return nil, err
+	}
+
+	var md []*pb.ArtifactMetadata
+	var list []retrievable
+	for _, dep := range resolution.Replacements {
+		path, err := extractStagingToPath(dep)
+		if err != nil {
+			return nil, err
+		}
+		md = append(md, &pb.ArtifactMetadata{
+			Name: path,
+		})
+
+		list = append(list, &artifact{
+			client: client,
+			dep:    dep,
+		})
+	}
+
+	return md, MultiRetrieve(ctx, 10, list, dest)
+}
+
+func extractStagingToPath(artifact *pipeline_v1.ArtifactInformation) (string, error) {
+	if artifact.RoleUrn != URNStagingTo {
+		return "", errors.Errorf("Unsupported artifact role %s", artifact.RoleUrn)
+	}
+	role := pipeline_v1.ArtifactStagingToRolePayload{}
+	if err := proto.Unmarshal(artifact.RolePayload, &role); err != nil {
+		return "", err
+	}
+	return role.StagedName, nil
+}
+
+type artifact struct {
+	client pb.ArtifactRetrievalServiceClient
+	dep    *pipeline_v1.ArtifactInformation
+}
+
+func (a artifact) retrieve(ctx context.Context, dest string) error {
+	path, err := extractStagingToPath(a.dep)
+	if err != nil {
+		return err
+	}
+
+	filename := filepath.Join(dest, filepath.FromSlash(path))
+
+	_, err = os.Stat(filename)
+	if err == nil {
+		if err = os.Remove(filename); err != nil {
+			return errors.Errorf("failed to delete: %v (remove: %v)", filename, err)
+		}
+	} else if !os.IsNotExist(err) {
+		return errors.Wrapf(err, "failed to stat %v", filename)
+	}
+
+	stream, err := a.client.GetArtifact(ctx, &pb.GetArtifactRequest{Artifact: a.dep})
+	if err != nil {
+		return err
+	}
+
+	fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0755)
+	if err != nil {
+		return err
+	}
+	w := bufio.NewWriter(fd)
+
+	err = writeChunks(stream, w)
+	if err != nil {
+		fd.Close() // drop any buffered content
+		return errors.Wrapf(err, "failed to retrieve chunk for %v", filename)
+	}
+	if err := w.Flush(); err != nil {
+		fd.Close()
+		return errors.Wrapf(err, "failed to flush chunks for %v", filename)
+	}
+	if err := fd.Close(); err != nil {
+		return err
+	}
+
+	return nil
 
 Review comment:
   ```suggestion
   	return fd.Close()
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services