You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/02/02 19:38:24 UTC

[GitHub] [beam] youngoli commented on a change in pull request #16671: [BEAM-13399, BEAM-13683] Eagerly materialize artifacts for automated expansion service, add feature to SQL transform

youngoli commented on a change in pull request #16671:
URL: https://github.com/apache/beam/pull/16671#discussion_r797930535



##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -140,3 +142,67 @@ func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.Expans
 	}
 	return res, nil
 }
+
+func startAutomatedExpansionService(gradleTarget string) (func() error, string, error) {

Review comment:
       Nit: I think for documentation purposes, some named return values would be really nice here. Naming the return values `stopFunc` and `address` would make it very clear what they do without requiring a comment.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/registry.go
##########
@@ -257,9 +262,21 @@ func Require(expansionAddr string) string {
 	return hardOverrideNamespace + Separator + expansionAddr
 }
 
+// UseAutomatedExpansionService takes a gradle target and creates a
+// tagged string to indicate that it should be used to start up an
+// automated expansion service for a gross-language expansion.

Review comment:
       Typo: `gross-language expansion`. 😂  

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -140,3 +142,67 @@ func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.Expans
 	}
 	return res, nil
 }
+
+func startAutomatedExpansionService(gradleTarget string) (func() error, string, error) {
+	jarPath, err := expansionx.GetBeamJar(gradleTarget, core.SdkVersion)
+	if err != nil {
+		return nil, "", err
+	}
+	serviceRunner, err := expansionx.NewExpansionServiceRunner(jarPath, "")
+	if err != nil {
+		return nil, "", err
+	}
+	err = serviceRunner.StartService()
+	if err != nil {
+		return nil, "", err
+	}
+	return serviceRunner.StopService, serviceRunner.Endpoint(), nil
+}
+
+// QueryAutomatedExpansionService submits an external transform to be expanded by the
+// expansion service and then eagerly materializes the artifacts for staging. The given
+// transform should be the external transform, and the components are any additional
+// components necessary for the pipeline snippet.
+//
+// The address to be queried is determined by the Config field of the HandlerParams after
+// the prefix tag indicating the automated service is in use.
+func QueryAutomatedExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.ExpansionResponse, error) {
+	// Strip auto: tag to get Gradle target
+	tag, target := parseAddr(p.Config)
+
+	stopFunc, address, err := startAutomatedExpansionService(target)
+	if err != nil {
+		return nil, err
+	}
+	defer stopFunc()
+
+	p.Config = address
+
+	res, err := QueryExpansionService(ctx, p)
+	if err != nil {
+		return nil, err
+	}
+
+	exp := &graph.ExpandedTransform{
+		Components:   res.GetComponents(),
+		Transform:    res.GetTransform(),
+		Requirements: res.GetRequirements(),
+	}
+
+	p.ext.Expanded = exp
+	// Put correct expansion address into edge
+	p.edge.External.ExpansionAddr = address

Review comment:
       (No change required) Changing the address here and then changing it right back after feels a little hacky to me, but I don't see any good alternatives at the moment. I just think it's another sign that at some point we'll need to come back to this and polish it further (along with retaining expansion services so using the same transform multiple times doesn't result in handling duplicate jars and artifacts).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org