You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/22 00:17:07 UTC

[GitHub] [beam] robertwb opened a new pull request #11490: [BEAM-9577] Use new artifact API to stage go artifacts.

robertwb opened a new pull request #11490:
URL: https://github.com/apache/beam/pull/11490


   Also fixes a bug this exposed in python provisioning service.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on issue #11490: [BEAM-9577] Use new artifact API to stage go artifacts.

Posted by GitBox <gi...@apache.org>.
lostluck commented on issue #11490:
URL: https://github.com/apache/beam/pull/11490#issuecomment-617920039


   Otherwise LGTM.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on issue #11490: [BEAM-9577] Use new artifact API to stage go artifacts.

Posted by GitBox <gi...@apache.org>.
robertwb commented on issue #11490:
URL: https://github.com/apache/beam/pull/11490#issuecomment-617476116


   R: @lostluck 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11490: [BEAM-9577] Use new artifact API to stage go artifacts.

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11490:
URL: https://github.com/apache/beam/pull/11490#discussion_r412599227



##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,128 @@ package runnerlib
 
 import (
 	"context"
+	"io"
+	"os"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files ...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken string, err error) {
 	ctx = grpcx.WriteWorkerID(ctx, id)
 	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
 	if err != nil {
 		return "", errors.WithContext(err, "connecting to artifact service")
 	}
 	defer cc.Close()
 
+	err = StageViaPorableApi(ctx, cc, binary, st)
+
+	if err == nil {
+		return "", err
+	} else {
+		return StageViaLegacyApi(ctx, cc, binary, st)
+	}
+}
+
+func StageViaPorableApi(ctx context.Context, cc *grpc.ClientConn, binary, st string) error {

Review comment:
       Typo: Portable

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -93,14 +96,32 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
 			panic(fmt.Sprintf(
 				"Failed to serialize Environment payload %v for config %v: %v", payload, config, err))
 		}
+
 		return &pipepb.Environment{
 			Urn:          urn,
 			Payload:      serializedPayload,
 			Capabilities: goCapabilities(),
+			Dependencies: []*pipepb.ArtifactInformation{
+				&pipepb.ArtifactInformation{
+					TypeUrn: URNArtifactGoWorker,
+					RoleUrn: URNArtifactStagingTo,
+					RolePayload: MarshalOrPanic(&pipepb.ArtifactStagingToRolePayload{
+						StagedName: "worker",
+					}),
+				},
+			},
 		}
 	}
 }
 
+func MarshalOrPanic(msg proto.Message) []byte {

Review comment:
       Conventionally this should be named MustMarshal if it's going to panic.

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,128 @@ package runnerlib
 
 import (
 	"context"
+	"io"
+	"os"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files ...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken string, err error) {
 	ctx = grpcx.WriteWorkerID(ctx, id)
 	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
 	if err != nil {
 		return "", errors.WithContext(err, "connecting to artifact service")
 	}
 	defer cc.Close()
 
+	err = StageViaPorableApi(ctx, cc, binary, st)
+
+	if err == nil {
+		return "", err
+	} else {
+		return StageViaLegacyApi(ctx, cc, binary, st)
+	}
+}
+
+func StageViaPorableApi(ctx context.Context, cc *grpc.ClientConn, binary, st string) error {
+	client := jobpb.NewArtifactStagingServiceClient(cc)
+
+	stream, err := client.ReverseArtifactRetrievalService(context.Background())
+	if err != nil {
+		return err
+	}
+
+	if err := stream.Send(&jobpb.ArtifactResponseWrapper{StagingToken: st}); err != nil {
+		return err
+	}
+
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		switch request := in.Request.(type) {
+		case *jobpb.ArtifactRequestWrapper_ResolveArtifact:
+			err = stream.Send(&jobpb.ArtifactResponseWrapper{
+				Response: &jobpb.ArtifactResponseWrapper_ResolveArtifactResponse{
+					&jobpb.ResolveArtifactsResponse{
+						Replacements: request.ResolveArtifact.Artifacts,
+					},
+				}})
+			if err != nil {
+				return err
+			}
+
+		case *jobpb.ArtifactRequestWrapper_GetArtifact:
+			TypeUrn := request.GetArtifact.Artifact.TypeUrn

Review comment:
       While not incorrect, it's a bit odd to see a capitalized variable name in Go because a capital letter indicates the identifier is Exported from the package. 
   
   From a readability standpoint the capital initial tells the reader that the scope of this variable extends beyond this package, which isn't the case for a local variable in function scope. At the definition line (here) this is fine, but at later uses, it would be easy to make the mistake, miamatchign assumptions.
   
   MixedCaps are correct though.
   
   Consider typeUrn instead.
   
   As a second note: you can put the definition inline in the switch header
   
   switch typeUrn := ... ; typeUrn {
   case ...
   
   which limits its scope to just the switch.

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,128 @@ package runnerlib
 
 import (
 	"context"
+	"io"
+	"os"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files ...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken string, err error) {
 	ctx = grpcx.WriteWorkerID(ctx, id)
 	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
 	if err != nil {
 		return "", errors.WithContext(err, "connecting to artifact service")
 	}
 	defer cc.Close()
 
+	err = StageViaPorableApi(ctx, cc, binary, st)
+
+	if err == nil {
+		return "", err
+	} else {
+		return StageViaLegacyApi(ctx, cc, binary, st)
+	}
+}
+
+func StageViaPorableApi(ctx context.Context, cc *grpc.ClientConn, binary, st string) error {
+	client := jobpb.NewArtifactStagingServiceClient(cc)
+
+	stream, err := client.ReverseArtifactRetrievalService(context.Background())
+	if err != nil {
+		return err
+	}
+
+	if err := stream.Send(&jobpb.ArtifactResponseWrapper{StagingToken: st}); err != nil {
+		return err
+	}
+
+	for {
+		in, err := stream.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		switch request := in.Request.(type) {
+		case *jobpb.ArtifactRequestWrapper_ResolveArtifact:
+			err = stream.Send(&jobpb.ArtifactResponseWrapper{
+				Response: &jobpb.ArtifactResponseWrapper_ResolveArtifactResponse{
+					&jobpb.ResolveArtifactsResponse{
+						Replacements: request.ResolveArtifact.Artifacts,
+					},
+				}})
+			if err != nil {
+				return err
+			}
+
+		case *jobpb.ArtifactRequestWrapper_GetArtifact:
+			TypeUrn := request.GetArtifact.Artifact.TypeUrn
+			switch TypeUrn {
+			case graphx.URNArtifactGoWorker:
+				StageFile(binary, stream)
+
+			default:
+				return errors.Errorf("Request has unexpected artifact type %s", TypeUrn)
+			}
+
+		default:
+			return errors.Errorf("Request has unexpected type %T", request)

Review comment:
       Lint Nit: due to how errors are used in go, via prepending context, it's preferred to not capitalize error strings, so they end up reading better with the prepended context.
   
   Here and everywhere else Errorf is used.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on issue #11490: [BEAM-9577] Use new artifact API to stage go artifacts.

Posted by GitBox <gi...@apache.org>.
robertwb commented on issue #11490:
URL: https://github.com/apache/beam/pull/11490#issuecomment-617909296


   Thanks for the comments. Still getting used to the capitalization conventions. PTAL.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #11490: [BEAM-9577] Use new artifact API to stage go artifacts.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #11490:
URL: https://github.com/apache/beam/pull/11490#discussion_r413201320



##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,127 @@ package runnerlib
 
 import (
 	"context"
+	"io"
+	"os"
 	"time"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/artifact"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+	"google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files ...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) (retrievalToken string, err error) {
 	ctx = grpcx.WriteWorkerID(ctx, id)
 	cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
 	if err != nil {
 		return "", errors.WithContext(err, "connecting to artifact service")
 	}
 	defer cc.Close()
 
+	err = StageViaPortableApi(ctx, cc, binary, st)
+
+	if err == nil {
+		return "", err
+	} else {

Review comment:
       To me it depends on whether it is "one of two+ options" (in which case an explicit else clause is clearer) or "exit early vs. fallback" (in which case not having the else is clearer). Removing the else clause for now; hopefully soon we'll be able to simply remove the fallback altogether. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org