You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/21 00:20:37 UTC

[GitHub] [beam] chamikaramj commented on a diff in pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

chamikaramj commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r854665303


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -64,8 +65,9 @@
   private @Nullable Object @NonNull [] argsArray;
   private @Nullable Row providedKwargsRow;
 
-  private ExternalPythonTransform(String fullyQualifiedName) {
+  private ExternalPythonTransform(String fullyQualifiedName, int expansionPort) {

Review Comment:
   Can we change to an address so that expansion service can be hosted remotely (similar to Python) ?



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -271,29 +279,21 @@ public OutputT expand(InputT input) {
                   ByteString.copyFrom(
                       CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema), payloadRow)))
               .build();
-      try (AutoCloseable p = service.start()) {
-        PythonService.waitForPort("localhost", port, 15000);
-        PTransform<PInput, PCollectionTuple> transform =
-            External.<PInput, Object>of(
-                    "beam:transforms:python:fully_qualified_named",
-                    payload.toByteArray(),
-                    "localhost:" + port)
-                .withMultiOutputs();
-        PCollectionTuple outputs;
-        if (input instanceof PCollection) {
-          outputs = ((PCollection<?>) input).apply(transform);
-        } else if (input instanceof PCollectionTuple) {
-          outputs = ((PCollectionTuple) input).apply(transform);
-        } else if (input instanceof PBegin) {
-          outputs = ((PBegin) input).apply(transform);
-        } else {
-          throw new RuntimeException("Unhandled input type " + input.getClass());
-        }
-        Set<TupleTag<?>> tags = outputs.getAll().keySet();
-        if (tags.size() == 1) {
-          return (OutputT) outputs.get(Iterables.getOnlyElement(tags));
-        } else {
-          return (OutputT) outputs;
+      if (expansionPort > 0) {
+        PythonService.waitForPort("localhost", expansionPort, 15000);

Review Comment:
   Why do we have to do this when port/address is specified ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org