You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/06 22:01:56 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #22132: Support dependencies and remote registration in the typescript SDK.

robertwb commented on code in PR #22132:
URL: https://github.com/apache/beam/pull/22132#discussion_r915284957


##########
sdks/typescript/boot.go:
##########
@@ -87,7 +90,71 @@ func main() {
 		log.Fatalf("Failed to convert pipeline options: %v", err)
 	}
 
-	// (2) Invoke the Node entrypoint, passing the Fn API container contract info as flags.
+	// (2) Retrieve and install the staged packages.
+
+	dir := filepath.Join(*semiPersistDir, *id, "staged")
+	artifacts, err := artifact.Materialize(ctx, *artifactEndpoint, info.GetDependencies(), info.GetRetrievalToken(), dir)
+	if err != nil {
+		log.Fatalf("Failed to retrieve staged files: %v", err)
+	}
+
+	// Create a package.json that names given dependencies as overrides.
+	npmOverrides := make(map[string]string)
+	for _, v := range artifacts {
+		name, _ := artifact.MustExtractFilePayload(v)
+		path := filepath.Join(dir, name)
+		if v.RoleUrn == "beam:artifact:type:npm_dep:v1" {
+			// Npm cannot handle arbitrary suffixes.
+			suffixedPath := path + ".tar"
+			e := os.Rename(path, suffixedPath)
+			if e != nil {
+				log.Fatal(e)
+			}
+			npmOverrides[string(v.RolePayload)] = suffixedPath
+		}
+	}
+	if len(npmOverrides) > 0 {

Review Comment:
   This handles the case where the "version" of the dependency is "file:some/path/on/the/user/machine." Notably, until we release official npms, and likely thereafter for development, this includes apache-beam itself. 



##########
sdks/python/apache_beam/runners/dataflow/internal/apiclient.py:
##########
@@ -660,35 +665,42 @@ def _stage_resources(self, pipeline, options):
               'Found duplicated artifact sha256: %s (%s)',
               type_payload.path,
               type_payload.sha256)
-          staged_name = staged_hashes[type_payload.sha256]
-          dep.role_payload = beam_runner_api_pb2.ArtifactStagingToRolePayload(
-              staged_name=staged_name).SerializeToString()
+          remote_name = staged_hashes[type_payload.sha256]
+          if is_staged_role:
+            # We should not be overriding this, as dep.role_payload.staged_name
+            # refers to the desired name on the worker, whereas staged_name
+            # refers to its placement in a distributed filesystem.

Review Comment:
   We're doing the wrong thing here per the spec, but I'm not sure if it's safe to change given Dataflow's expectations (especially if this is used in the Runner v1 case) so I'm leaving that as is. Good point about adding a TODO, dropping that here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org