You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/22 01:36:42 UTC

[GitHub] [beam] lostluck commented on a change in pull request #11490: [BEAM-9577] Use new artifact API to stage go artifacts.

lostluck commented on a change in pull request #11490:
URL: https://github.com/apache/beam/pull/11490#discussion_r412599227



##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,128 @@ package runnerlib
 
 import (
 	"context"
+	"io"
+	"os"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files ...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken string, err error) {
 	ctx = grpcx.WriteWorkerID(ctx, id)
 	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
 	if err != nil {
 		return "", errors.WithContext(err, "connecting to artifact service")
 	}
 	defer cc.Close()
 
+	err = StageViaPorableApi(ctx, cc, binary, st)
+
+	if err == nil {
+		return "", err
+	} else {
+		return StageViaLegacyApi(ctx, cc, binary, st)
+	}
+}
+
+func StageViaPorableApi(ctx context.Context, cc *grpc.ClientConn, binary, st string) error {

Review comment:
       Typo: Portable

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -93,14 +96,32 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
 			panic(fmt.Sprintf(
 				"Failed to serialize Environment payload %v for config %v: %v", payload, config, err))
 		}
+
 		return &pipepb.Environment{
 			Urn:          urn,
 			Payload:      serializedPayload,
 			Capabilities: goCapabilities(),
+			Dependencies: []*pipepb.ArtifactInformation{
+				&pipepb.ArtifactInformation{
+					TypeUrn: URNArtifactGoWorker,
+					RoleUrn: URNArtifactStagingTo,
+					RolePayload: MarshalOrPanic(&pipepb.ArtifactStagingToRolePayload{
+						StagedName: "worker",
+					}),
+				},
+			},
 		}
 	}
 }
 
+func MarshalOrPanic(msg proto.Message) []byte {

Review comment:
       Conventionally this should be named MustMarshal if it's going to panic.

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,128 @@ package runnerlib
 
 import (
 	"context"
+	"io"
+	"os"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files ...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken string, err error) {
 	ctx = grpcx.WriteWorkerID(ctx, id)
 	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
 	if err != nil {
 		return "", errors.WithContext(err, "connecting to artifact service")
 	}
 	defer cc.Close()
 
+	err = StageViaPorableApi(ctx, cc, binary, st)
+
+	if err == nil {
+		return "", err
+	} else {
+		return StageViaLegacyApi(ctx, cc, binary, st)
+	}
+}
+
+func StageViaPorableApi(ctx context.Context, cc *grpc.ClientConn, binary, st string) error {
+	client := jobpb.NewArtifactStagingServiceClient(cc)
+
+	stream, err := client.ReverseArtifactRetrievalService(context.Background())
+	if err != nil {
+		return err
+	}
+
+	if err := stream.Send(&jobpb.ArtifactResponseWrapper{StagingToken: st}); err != nil {
+		return err
+	}
+
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		switch request := in.Request.(type) {
+		case *jobpb.ArtifactRequestWrapper_ResolveArtifact:
+			err = stream.Send(&jobpb.ArtifactResponseWrapper{
+				Response: &jobpb.ArtifactResponseWrapper_ResolveArtifactResponse{
+					&jobpb.ResolveArtifactsResponse{
+						Replacements: request.ResolveArtifact.Artifacts,
+					},
+				}})
+			if err != nil {
+				return err
+			}
+
+		case *jobpb.ArtifactRequestWrapper_GetArtifact:
+			TypeUrn := request.GetArtifact.Artifact.TypeUrn

Review comment:
       While not incorrect, it's a bit odd to see a capitalized variable name in Go because a capital letter indicates the identifier is Exported from the package. 
   
   From a readability standpoint the capital initial tells the reader that the scope of this variable extends beyond this package, which isn't the case for a local variable in function scope. At the definition line (here) this is fine, but at later uses, it would be easy to make the mistake, miamatchign assumptions.
   
   MixedCaps are correct though.
   
   Consider typeUrn instead.
   
   As a second note: you can put the definition inline in the switch header
   
   switch typeUrn := ... ; typeUrn {
   case ...
   
   which limits its scope to just the switch.

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,128 @@ package runnerlib
 
 import (
 	"context"
+	"io"
+	"os"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files ...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken string, err error) {
 	ctx = grpcx.WriteWorkerID(ctx, id)
 	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
 	if err != nil {
 		return "", errors.WithContext(err, "connecting to artifact service")
 	}
 	defer cc.Close()
 
+	err = StageViaPorableApi(ctx, cc, binary, st)
+
+	if err == nil {
+		return "", err
+	} else {
+		return StageViaLegacyApi(ctx, cc, binary, st)
+	}
+}
+
+func StageViaPorableApi(ctx context.Context, cc *grpc.ClientConn, binary, st string) error {
+	client := jobpb.NewArtifactStagingServiceClient(cc)
+
+	stream, err := client.ReverseArtifactRetrievalService(context.Background())
+	if err != nil {
+		return err
+	}
+
+	if err := stream.Send(&jobpb.ArtifactResponseWrapper{StagingToken: st}); err != nil {
+		return err
+	}
+
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		switch request := in.Request.(type) {
+		case *jobpb.ArtifactRequestWrapper_ResolveArtifact:
+			err = stream.Send(&jobpb.ArtifactResponseWrapper{
+				Response: &jobpb.ArtifactResponseWrapper_ResolveArtifactResponse{
+					&jobpb.ResolveArtifactsResponse{
+						Replacements: request.ResolveArtifact.Artifacts,
+					},
+				}})
+			if err != nil {
+				return err
+			}
+
+		case *jobpb.ArtifactRequestWrapper_GetArtifact:
+			TypeUrn := request.GetArtifact.Artifact.TypeUrn
+			switch TypeUrn {
+			case graphx.URNArtifactGoWorker:
+				StageFile(binary, stream)
+
+			default:
+				return errors.Errorf("Request has unexpected artifact type %s", TypeUrn)
+			}
+
+		default:
+			return errors.Errorf("Request has unexpected type %T", request)

Review comment:
       Lint Nit: due to how errors are used in go, via prepending context, it's preferred to not capitalize error strings, so they end up reading better with the prepended context.
   
   Here and everywhere else Errorf is used.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org