You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/31 17:44:01 UTC

[GitHub] [beam] lostluck commented on a change in pull request #17216: Simplify specifying additional dependencies in Go SDK in XLang IOs

lostluck commented on a change in pull request #17216:
URL: https://github.com/apache/beam/pull/17216#discussion_r839821647



##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
##########
@@ -68,6 +70,225 @@ func SetDefaultRepositoryURL(repoURL string) error {
 	return defaultJarGetter.setRepositoryURL(repoURL)
 }
 
+func buildJarName(artifactId, version string) string {
+	return fmt.Sprintf("%s-%s.jar", artifactId, version)
+}
+
+func getMavenJar(artifactID, groupID, version string) string {
+	return strings.Join([]string{string(apacheRepository), strings.ReplaceAll(groupID, ".", "/"), artifactID, version, buildJarName(artifactID, version)}, "/")
+}
+
+func expandJar(jar string) string {
+	if jarExists(jar) {
+		return jar
+	} else if strings.HasPrefix(jar, "http://") || strings.HasPrefix(jar, "https://") {
+		return jar
+	} else {
+		components := strings.Split(jar, ":")
+		groupID, artifactID, version := components[0], components[1], components[2]
+		path := getMavenJar(artifactID, groupID, version)
+		return path
+	}
+}
+
+func getLocalJar(url string) (string, error) {
+	jarName := path.Base(url)
+	usr, _ := user.Current()
+	cacheDir := filepath.Join(usr.HomeDir, jarCache[2:])
+	jarPath := filepath.Join(cacheDir, jarName)
+
+	if jarExists(jarPath) {
+		return jarPath, nil
+	}
+
+	resp, err := http.Get(string(url))
+	if err != nil {
+		return "", err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		return "", fmt.Errorf("failed to connect to %v: received non 200 response code, got %v", url, resp.StatusCode)
+	}
+
+	file, err := os.Create(jarPath)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in create: %v", err))

Review comment:
       Prefer using fmt.Errorf instead of layering. 
   Please use `%w` for the error parameter as well.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
##########
@@ -68,6 +70,225 @@ func SetDefaultRepositoryURL(repoURL string) error {
 	return defaultJarGetter.setRepositoryURL(repoURL)
 }
 
+func buildJarName(artifactId, version string) string {
+	return fmt.Sprintf("%s-%s.jar", artifactId, version)
+}
+
+func getMavenJar(artifactID, groupID, version string) string {
+	return strings.Join([]string{string(apacheRepository), strings.ReplaceAll(groupID, ".", "/"), artifactID, version, buildJarName(artifactID, version)}, "/")
+}
+
+func expandJar(jar string) string {
+	if jarExists(jar) {
+		return jar
+	} else if strings.HasPrefix(jar, "http://") || strings.HasPrefix(jar, "https://") {
+		return jar
+	} else {
+		components := strings.Split(jar, ":")
+		groupID, artifactID, version := components[0], components[1], components[2]
+		path := getMavenJar(artifactID, groupID, version)
+		return path
+	}
+}
+
+func getLocalJar(url string) (string, error) {
+	jarName := path.Base(url)
+	usr, _ := user.Current()
+	cacheDir := filepath.Join(usr.HomeDir, jarCache[2:])
+	jarPath := filepath.Join(cacheDir, jarName)
+
+	if jarExists(jarPath) {
+		return jarPath, nil
+	}
+
+	resp, err := http.Get(string(url))
+	if err != nil {
+		return "", err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		return "", fmt.Errorf("failed to connect to %v: received non 200 response code, got %v", url, resp.StatusCode)
+	}
+
+	file, err := os.Create(jarPath)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in create: %v", err))
+	}
+
+	_, err = io.Copy(file, resp.Body)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in copy: %v", err))
+	}
+
+	return jarPath, nil
+}
+
+func extractJar(source, dest string) error {
+	reader, err := zip.OpenReader(source)
+	if err != nil {
+		return fmt.Errorf("error extracting jar extractJar(%s,%s)= %v", source, dest, err)
+	}
+
+	if err := os.MkdirAll(dest, 0700); err != nil {
+		return fmt.Errorf("error creating directory %s in extractJar(%s,%s)= %v", dest, source, dest, err)

Review comment:
       This one is very good, replace the `=` with `:` and the %v for the error parameter with %w.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -143,18 +144,26 @@ func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.Expans
 	return res, nil
 }
 
-func startAutomatedJavaExpansionService(gradleTarget string) (stopFunc func() error, address string, err error) {
+func startAutomatedJavaExpansionService(gradleTarget string, classpath []string) (stopFunc func() error, address string, err error) {
 	jarPath, err := expansionx.GetBeamJar(gradleTarget, core.SdkVersion)
 	if err != nil {
 		return nil, "", err
 	}
+
+	if len(classpath) > 0 {
+		jarPath, err = expansionx.MakeJar(jarPath, classpath)
+		if err != nil {
+			return nil, "", err
+		}
+	}
+
 	serviceRunner, err := expansionx.NewExpansionServiceRunner(jarPath, "")
 	if err != nil {
-		return nil, "", err
+		return nil, "", fmt.Errorf("error in new expansion service: %v", err)
 	}
 	err = serviceRunner.StartService()
 	if err != nil {
-		return nil, "", err
+		return nil, "", fmt.Errorf("error in start: %v", err)

Review comment:
       As added, the extra information is OK, but doesn't help explain where things are going wrong. 
   Consider also including the gradle target, or the jarpath that the call was trying to make.
   
   This is the superpower of Go's explicit approach to error handling over just a stack trace: You can include the parameters used along the way to improve the context in the error messages.
   
   

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -143,18 +144,26 @@ func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.Expans
 	return res, nil
 }
 
-func startAutomatedJavaExpansionService(gradleTarget string) (stopFunc func() error, address string, err error) {
+func startAutomatedJavaExpansionService(gradleTarget string, classpath []string) (stopFunc func() error, address string, err error) {

Review comment:
       Question: Is the `classpath` slice representing a single class path, or does each element in the slice represent another class path.
   
   If it's the former, leave it as is.
   If it's the latter, rename it to `classpaths` to indicate that it represents more than one.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expand.go
##########
@@ -143,18 +144,26 @@ func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.Expans
 	return res, nil
 }
 
-func startAutomatedJavaExpansionService(gradleTarget string) (stopFunc func() error, address string, err error) {
+func startAutomatedJavaExpansionService(gradleTarget string, classpath []string) (stopFunc func() error, address string, err error) {
 	jarPath, err := expansionx.GetBeamJar(gradleTarget, core.SdkVersion)
 	if err != nil {
 		return nil, "", err
 	}
+
+	if len(classpath) > 0 {
+		jarPath, err = expansionx.MakeJar(jarPath, classpath)
+		if err != nil {
+			return nil, "", err
+		}
+	}
+
 	serviceRunner, err := expansionx.NewExpansionServiceRunner(jarPath, "")
 	if err != nil {
-		return nil, "", err
+		return nil, "", fmt.Errorf("error in new expansion service: %v", err)

Review comment:
       When producing an error with `fmt.Errorf` prefer `%w` over `%v` for the error parameter, so the error is wrapped.
   
   Here and everywhere.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
##########
@@ -68,6 +70,225 @@ func SetDefaultRepositoryURL(repoURL string) error {
 	return defaultJarGetter.setRepositoryURL(repoURL)
 }
 
+func buildJarName(artifactId, version string) string {
+	return fmt.Sprintf("%s-%s.jar", artifactId, version)
+}
+
+func getMavenJar(artifactID, groupID, version string) string {
+	return strings.Join([]string{string(apacheRepository), strings.ReplaceAll(groupID, ".", "/"), artifactID, version, buildJarName(artifactID, version)}, "/")
+}
+
+func expandJar(jar string) string {
+	if jarExists(jar) {
+		return jar
+	} else if strings.HasPrefix(jar, "http://") || strings.HasPrefix(jar, "https://") {
+		return jar
+	} else {
+		components := strings.Split(jar, ":")
+		groupID, artifactID, version := components[0], components[1], components[2]
+		path := getMavenJar(artifactID, groupID, version)
+		return path
+	}
+}
+
+func getLocalJar(url string) (string, error) {
+	jarName := path.Base(url)
+	usr, _ := user.Current()
+	cacheDir := filepath.Join(usr.HomeDir, jarCache[2:])
+	jarPath := filepath.Join(cacheDir, jarName)
+
+	if jarExists(jarPath) {
+		return jarPath, nil
+	}
+
+	resp, err := http.Get(string(url))
+	if err != nil {
+		return "", err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		return "", fmt.Errorf("failed to connect to %v: received non 200 response code, got %v", url, resp.StatusCode)
+	}
+
+	file, err := os.Create(jarPath)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in create: %v", err))
+	}
+
+	_, err = io.Copy(file, resp.Body)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in copy: %v", err))

Review comment:
       Same here.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
##########
@@ -68,6 +70,225 @@ func SetDefaultRepositoryURL(repoURL string) error {
 	return defaultJarGetter.setRepositoryURL(repoURL)
 }
 
+func buildJarName(artifactId, version string) string {
+	return fmt.Sprintf("%s-%s.jar", artifactId, version)
+}
+
+func getMavenJar(artifactID, groupID, version string) string {
+	return strings.Join([]string{string(apacheRepository), strings.ReplaceAll(groupID, ".", "/"), artifactID, version, buildJarName(artifactID, version)}, "/")
+}
+
+func expandJar(jar string) string {
+	if jarExists(jar) {
+		return jar
+	} else if strings.HasPrefix(jar, "http://") || strings.HasPrefix(jar, "https://") {
+		return jar
+	} else {
+		components := strings.Split(jar, ":")
+		groupID, artifactID, version := components[0], components[1], components[2]
+		path := getMavenJar(artifactID, groupID, version)
+		return path
+	}
+}
+
+func getLocalJar(url string) (string, error) {
+	jarName := path.Base(url)
+	usr, _ := user.Current()
+	cacheDir := filepath.Join(usr.HomeDir, jarCache[2:])
+	jarPath := filepath.Join(cacheDir, jarName)
+
+	if jarExists(jarPath) {
+		return jarPath, nil
+	}
+
+	resp, err := http.Get(string(url))
+	if err != nil {
+		return "", err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		return "", fmt.Errorf("failed to connect to %v: received non 200 response code, got %v", url, resp.StatusCode)
+	}
+
+	file, err := os.Create(jarPath)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in create: %v", err))
+	}
+
+	_, err = io.Copy(file, resp.Body)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in copy: %v", err))
+	}
+
+	return jarPath, nil
+}
+
+func extractJar(source, dest string) error {
+	reader, err := zip.OpenReader(source)
+	if err != nil {
+		return fmt.Errorf("error extracting jar extractJar(%s,%s)= %v", source, dest, err)
+	}
+
+	if err := os.MkdirAll(dest, 0700); err != nil {
+		return fmt.Errorf("error creating directory %s in extractJar(%s,%s)= %v", dest, source, dest, err)
+	}
+
+	for _, file := range reader.File {
+		fileName := filepath.Join(dest, file.Name)
+		if file.FileInfo().IsDir() {
+			os.MkdirAll(fileName, 0700)
+			continue
+		}
+
+		f, err := file.Open()
+		if err != nil {
+			return fmt.Errorf("error opening file %s: %v", file.Name, err)
+		}
+		defer f.Close()
+
+		tf, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0777)
+		if err != nil {
+			return fmt.Errorf("error opening file %s: %v", fileName, err)
+		}
+		defer tf.Close()
+
+		if _, err := io.Copy(tf, f); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func packJar(source, dest string) error {
+	jar, err := os.Create(dest)
+	if err != nil {
+		return fmt.Errorf("error creating jar packJar(%s,%s)=%v", source, dest, err)
+	}
+	defer jar.Close()
+
+	jarFile := zip.NewWriter(jar)
+	defer jarFile.Close()
+
+	fileInfo, err := os.Stat(source)
+	if err != nil {
+		return fmt.Errorf("source path %s doesn't exist: %v", source, err)
+	}
+
+	var sourceDir string
+	if fileInfo.IsDir() {
+		sourceDir = filepath.Base(source)
+	}
+
+	err = filepath.Walk(source, func(path string, fileInfo os.FileInfo, err error) error {
+		if err != nil {
+			return fmt.Errorf("error accesing path %s: %v", path, err)
+		}
+		fileHeader, err := zip.FileInfoHeader(fileInfo)
+		if err != nil {
+			return fmt.Errorf("error getting FileInfoHeader: %v", err)
+		}
+
+		if sourceDir != "" {
+			fileHeader.Name = filepath.Join(sourceDir, strings.TrimPrefix(path, source))
+		}
+
+		if fileInfo.IsDir() {
+			fileHeader.Name += "/"
+		} else {
+			fileHeader.Method = zip.Deflate
+		}
+
+		writer, err := jarFile.CreateHeader(fileHeader)
+		if err != nil {
+			return fmt.Errorf("error creating jarFile header: %v", err)
+		}
+		if fileInfo.IsDir() {
+			return nil
+		}
+
+		f, err := os.Open(path)
+		if err != nil {
+			return fmt.Errorf("error opening file %s: %v", path, err)
+		}
+		defer f.Close()
+
+		if _, err = io.Copy(writer, f); err != nil {
+			return fmt.Errorf("error copying file %s: %v", path, err)
+		}
+		return nil
+	})
+	return err
+}
+
+// MakeJar fetches additional classpath JARs and adds it to the classpath of
+// main JAR file and compiles a fresh JAR.
+func MakeJar(mainJar string, classpath []string) (string, error) {
+	usr, _ := user.Current()
+	cacheDir := filepath.Join(usr.HomeDir, jarCache[2:])
+
+	// fetch jars required in classpath
+	classpathJars := []string{}
+	for _, jar := range classpath {
+		path := expandJar(jar)
+		if j, err := getLocalJar(path); err == nil {
+			classpathJars = append(classpathJars, j)
+		} else {
+			return "", errors.New(fmt.Sprintf("error in getLocal(): %v", err))
+		}
+	}
+
+	// classpath jars should have relative path
+	relClasspath := []string{}
+	for _, path := range classpathJars {
+		relPath, err := filepath.Rel(cacheDir, path)
+		if err != nil {
+			return "", fmt.Errorf("error in creating relative path: %v", err)
+		}
+		relClasspath = append(relClasspath, relPath)
+	}
+
+	tmpDir := filepath.Join(cacheDir, "tmpDir")
+
+	if err := extractJar(mainJar, tmpDir); err != nil {
+		return "", errors.New(fmt.Sprintf("error in extractJar(): %v", err))
+	}
+
+	b, err := os.ReadFile(tmpDir + "/META-INF/MANIFEST.MF")
+	if err != nil {
+		return "", fmt.Errorf("error readingf: %v", err)
+	}
+
+	// trim the empty lines present at the end of MANIFEST.MF file.
+	str := strings.Split(string(b), "\n")
+	str = str[:len(str)-2]

Review comment:
       While Go is pretty terse, it generally avoids just 'b' and 'str' in contexts where they're ambiguous, or know better. 
   
   `m` would be better for `b`, but there's no harm with going with `manifest` as that's what the bytes are.
   `str` doesn't explain anything, just that it's a string, but it's not a string, it's a []string.  The name would be improved as `lines` or `manifestLines`, since that's what they are.

##########
File path: sdks/go/pkg/beam/core/graph/xlang.go
##########
@@ -66,6 +66,7 @@ type ExternalTransform struct {
 	Urn           string
 	Payload       []byte
 	ExpansionAddr string
+	Classpath     []string

Review comment:
       Same question here about whether this is a single classpath, or multiple. If multiple, please make the field plural.
   

##########
File path: sdks/go/pkg/beam/xlang.go
##########
@@ -173,6 +212,7 @@ func TryCrossLanguage(
 	expansionAddr string,
 	namedInputs map[string]PCollection,
 	namedOutputTypes map[string]FullType,
+	classpath []string,

Review comment:
       This is a breaking change we shouldn't make for a one off. If anything, we should break it harder than this so we can avoid breaking it in the future.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
##########
@@ -68,6 +70,225 @@ func SetDefaultRepositoryURL(repoURL string) error {
 	return defaultJarGetter.setRepositoryURL(repoURL)
 }
 
+func buildJarName(artifactId, version string) string {
+	return fmt.Sprintf("%s-%s.jar", artifactId, version)
+}
+
+func getMavenJar(artifactID, groupID, version string) string {
+	return strings.Join([]string{string(apacheRepository), strings.ReplaceAll(groupID, ".", "/"), artifactID, version, buildJarName(artifactID, version)}, "/")
+}
+
+func expandJar(jar string) string {
+	if jarExists(jar) {
+		return jar
+	} else if strings.HasPrefix(jar, "http://") || strings.HasPrefix(jar, "https://") {
+		return jar
+	} else {
+		components := strings.Split(jar, ":")
+		groupID, artifactID, version := components[0], components[1], components[2]
+		path := getMavenJar(artifactID, groupID, version)
+		return path
+	}
+}
+
+func getLocalJar(url string) (string, error) {
+	jarName := path.Base(url)
+	usr, _ := user.Current()
+	cacheDir := filepath.Join(usr.HomeDir, jarCache[2:])
+	jarPath := filepath.Join(cacheDir, jarName)
+
+	if jarExists(jarPath) {
+		return jarPath, nil
+	}
+
+	resp, err := http.Get(string(url))
+	if err != nil {
+		return "", err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		return "", fmt.Errorf("failed to connect to %v: received non 200 response code, got %v", url, resp.StatusCode)
+	}
+
+	file, err := os.Create(jarPath)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in create: %v", err))
+	}
+
+	_, err = io.Copy(file, resp.Body)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in copy: %v", err))
+	}
+
+	return jarPath, nil
+}
+
+func extractJar(source, dest string) error {
+	reader, err := zip.OpenReader(source)
+	if err != nil {
+		return fmt.Errorf("error extracting jar extractJar(%s,%s)= %v", source, dest, err)
+	}
+
+	if err := os.MkdirAll(dest, 0700); err != nil {
+		return fmt.Errorf("error creating directory %s in extractJar(%s,%s)= %v", dest, source, dest, err)
+	}
+
+	for _, file := range reader.File {
+		fileName := filepath.Join(dest, file.Name)
+		if file.FileInfo().IsDir() {
+			os.MkdirAll(fileName, 0700)
+			continue
+		}
+
+		f, err := file.Open()
+		if err != nil {
+			return fmt.Errorf("error opening file %s: %v", file.Name, err)
+		}
+		defer f.Close()
+
+		tf, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0777)
+		if err != nil {
+			return fmt.Errorf("error opening file %s: %v", fileName, err)

Review comment:
       These are good error messages, but consider clarifying which one is the source, and which one is the dest.

##########
File path: sdks/go.mod
##########
@@ -23,33 +23,38 @@ module github.com/apache/beam/sdks/v2
 go 1.16
 
 require (
-	cloud.google.com/go/bigquery v1.17.0
-	cloud.google.com/go/datastore v1.5.0
-	cloud.google.com/go/pubsub v1.11.0-beta.schemas
-	cloud.google.com/go/storage v1.15.0
+	cloud.google.com/go/bigquery v1.29.0

Review comment:
       What is your workflow? I wouldn't expect to see go.mod to be changed for every PR. One should be running `go mod tidy` when a new import is added, but that shouldn't be forcing upgrading of arbitrary deps on nearly every PR.

##########
File path: sdks/go/pkg/beam/core/runtime/xlangx/expansionx/download.go
##########
@@ -68,6 +70,225 @@ func SetDefaultRepositoryURL(repoURL string) error {
 	return defaultJarGetter.setRepositoryURL(repoURL)
 }
 
+func buildJarName(artifactId, version string) string {
+	return fmt.Sprintf("%s-%s.jar", artifactId, version)
+}
+
+func getMavenJar(artifactID, groupID, version string) string {
+	return strings.Join([]string{string(apacheRepository), strings.ReplaceAll(groupID, ".", "/"), artifactID, version, buildJarName(artifactID, version)}, "/")
+}
+
+func expandJar(jar string) string {
+	if jarExists(jar) {
+		return jar
+	} else if strings.HasPrefix(jar, "http://") || strings.HasPrefix(jar, "https://") {
+		return jar
+	} else {
+		components := strings.Split(jar, ":")
+		groupID, artifactID, version := components[0], components[1], components[2]
+		path := getMavenJar(artifactID, groupID, version)
+		return path
+	}
+}
+
+func getLocalJar(url string) (string, error) {
+	jarName := path.Base(url)
+	usr, _ := user.Current()
+	cacheDir := filepath.Join(usr.HomeDir, jarCache[2:])
+	jarPath := filepath.Join(cacheDir, jarName)
+
+	if jarExists(jarPath) {
+		return jarPath, nil
+	}
+
+	resp, err := http.Get(string(url))
+	if err != nil {
+		return "", err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		return "", fmt.Errorf("failed to connect to %v: received non 200 response code, got %v", url, resp.StatusCode)
+	}
+
+	file, err := os.Create(jarPath)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in create: %v", err))
+	}
+
+	_, err = io.Copy(file, resp.Body)
+	if err != nil {
+		return "", errors.New(fmt.Sprintf("error in copy: %v", err))
+	}
+
+	return jarPath, nil
+}
+
+func extractJar(source, dest string) error {
+	reader, err := zip.OpenReader(source)
+	if err != nil {
+		return fmt.Errorf("error extracting jar extractJar(%s,%s)= %v", source, dest, err)

Review comment:
       This is a fairly good error message, but it's being a bit redundant as "extracting jar" is pretty clear from having `extractJar` being mentioned. But it's not clear that what failed was opening the jar file.
   
   The '=' doesn't help much in this context. A reader seeing this in their logs won't necessarily know that error is the only output.
   
   Finally, %w, so that the error is properly wrapped. This allows the helpers in the [errors](https://pkg.go.dev/errors) package to work, for us and our users.
   
   ```suggestion
   		return fmt.Errorf("error opening jar for extractJar(%s,%s): %w", source, dest, err)
   ```

##########
File path: sdks/go/pkg/beam/xlang.go
##########
@@ -68,6 +68,45 @@ func CrossLanguagePayload(pl interface{}) []byte {
 	return bytes
 }
 
+// CrossLanguageWithClasspath is a low-level transform for executing cross-language transforms written in other
+// SDKs. Because this is low-level, it is recommended to use one of the higher-level IO-specific
+// wrappers where available. These can be found in the pkg/beam/io/xlang subdirectory.
+// CrossLanguage is useful for executing cross-language transforms which do not have any existing
+// IO wrappers. The functions beam.CrossLanguage() and beam.CrossLanguageWithClasspath() are similar except for
+// additional classpath dependencies that the later one accepts.
+//
+// Note: This should be specifically used when using an automated expansion service where you want to add additional
+// custom dependencies in the form of classpath to the expansion service.
+//
+// Usage requires an address for an expansion service accessible during pipeline construction, a
+// URN identifying the desired transform, an optional payload with configuration information,
+// input and output names, and classpath dependencies as an array of string. It outputs a map of named output PCollections.
+//
+// Example
+//
+// This example shows using CrossLanguageWithClasspath to execute the JDBC cross-language transform.
+//
+//    pl := beam.CrossLanguagePayload(JdbcConfigSchema)
+//    expansionAddr := xlang.UseAutomatedJavaExpansionService(gradleTargetOfExpansionService)
+//    readURN := ""
+//    classpath := []string{"org.postgresql:postgresql:42.3.3"}
+// 	  result := beam.CrossLanguageWithClasspath(s, readURN, pl, expansionAddr, nil, beam.UnnamedOutput(typex.New(outT)), classpath)
+func CrossLanguageWithClasspath(
+	s Scope,
+	urn string,
+	payload []byte,
+	expansionAddr string,
+	namedInputs map[string]PCollection,
+	namedOutputTypes map[string]FullType,
+	classpath []string,
+) map[string]PCollection {

Review comment:
       Instead of a new specific purpose method (in this case same as before but with a classpath parameter), lets try building it out as a new general purpose one. 
   
   It should always have the scope, urn, payload, namedInputs, namedOutputs parameters, and a variadic xlangOption typed parameter.
   
   This way we can have expansionAddr and class path become *optional* parameters, and enable some of the automatic start up stuff more directly, and less hacked in like it is presently.
   
   No need to fully replace things under the hood for the others at this time, but I wouldn't say no.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org