You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/20 22:41:22 UTC

[GitHub] [beam] ihji opened a new pull request, #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

ihji opened a new pull request, #17418:
URL: https://github.com/apache/beam/pull/17418

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji merged pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
ihji merged PR #17418:
URL: https://github.com/apache/beam/pull/17418


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17418:
URL: https://github.com/apache/beam/pull/17418#issuecomment-1106944958

   @chamikaramj PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r858163647


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -268,20 +268,31 @@ public OutputT expand(InputT input) {
     Row argsRow = buildOrGetArgsRow();
     Row kwargsRow = buildOrGetKwargsRow();
     try {
-      Schema payloadSchema =
-          Schema.of(
-              Schema.Field.of("constructor", Schema.FieldType.STRING),
-              Schema.Field.of("args", Schema.FieldType.row(argsRow.getSchema())),
-              Schema.Field.of("kwargs", Schema.FieldType.row(kwargsRow.getSchema())));
+      Schema.Builder schemaBuilder = Schema.builder();
+      schemaBuilder.addStringField("constructor");
+      if (argsRow.getValues().size() > 0) {
+        schemaBuilder.addRowField("args", argsRow.getSchema());
+      }
+      if (kwargsRow.getValues().size() > 0) {
+        schemaBuilder.addRowField("kwargs", kwargsRow.getSchema());
+      }
+      Schema payloadSchema = schemaBuilder.build();
       payloadSchema.setUUID(UUID.randomUUID());
-      Row payloadRow =
-          Row.withSchema(payloadSchema).addValues(fullyQualifiedName, argsRow, kwargsRow).build();
+      Row.Builder payloadRowBuilder = Row.withSchema(payloadSchema);
+      payloadRowBuilder.addValue(fullyQualifiedName);
+      if (argsRow.getValues().size() > 0) {

Review Comment:
   Done.



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -65,8 +66,10 @@
   private @Nullable Object @NonNull [] argsArray;
   private @Nullable Row providedKwargsRow;
 
-  private ExternalPythonTransform(String fullyQualifiedName, int expansionPort) {
+  private ExternalPythonTransform(
+      String fullyQualifiedName, String expansionHost, int expansionPort) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r854698330


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -271,29 +279,21 @@ public OutputT expand(InputT input) {
                   ByteString.copyFrom(
                       CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema), payloadRow)))
               .build();
-      try (AutoCloseable p = service.start()) {
-        PythonService.waitForPort("localhost", port, 15000);
-        PTransform<PInput, PCollectionTuple> transform =
-            External.<PInput, Object>of(
-                    "beam:transforms:python:fully_qualified_named",
-                    payload.toByteArray(),
-                    "localhost:" + port)
-                .withMultiOutputs();
-        PCollectionTuple outputs;
-        if (input instanceof PCollection) {
-          outputs = ((PCollection<?>) input).apply(transform);
-        } else if (input instanceof PCollectionTuple) {
-          outputs = ((PCollectionTuple) input).apply(transform);
-        } else if (input instanceof PBegin) {
-          outputs = ((PBegin) input).apply(transform);
-        } else {
-          throw new RuntimeException("Unhandled input type " + input.getClass());
-        }
-        Set<TupleTag<?>> tags = outputs.getAll().keySet();
-        if (tags.size() == 1) {
-          return (OutputT) outputs.get(Iterables.getOnlyElement(tags));
-        } else {
-          return (OutputT) outputs;
+      if (expansionPort > 0) {
+        PythonService.waitForPort("localhost", expansionPort, 15000);

Review Comment:
   Still there's no guarantee that the specified service is already up when we send the expansion request. Sometimes expansion services need a few seconds to be fully launched.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r856943846


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -65,8 +66,10 @@
   private @Nullable Object @NonNull [] argsArray;
   private @Nullable Row providedKwargsRow;
 
-  private ExternalPythonTransform(String fullyQualifiedName, int expansionPort) {
+  private ExternalPythonTransform(
+      String fullyQualifiedName, String expansionHost, int expansionPort) {

Review Comment:
   Still unsure why the API is different from Python where we take a single "expansion_service" address: https://github.com/apache/beam/blob/3f2e3c7c9eccb9d40370cbc70e9a451a4b5573f5/sdks/python/apache_beam/transforms/external.py#L417



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #17418:
URL: https://github.com/apache/beam/pull/17418#issuecomment-1107634579

   # [Codecov](https://codecov.io/gh/apache/beam/pull/17418?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17418](https://codecov.io/gh/apache/beam/pull/17418?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3f2e3c7) into [master](https://codecov.io/gh/apache/beam/commit/c6972f4b8d9a03f4639395f60f18e740dce91398?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c6972f4) will **increase** coverage by `0.09%`.
   > The diff coverage is `82.03%`.
   
   > :exclamation: Current head 3f2e3c7 differs from pull request most recent head 4c4ca1b. Consider uploading reports for the commit 4c4ca1b to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #17418      +/-   ##
   ==========================================
   + Coverage   73.83%   73.92%   +0.09%     
   ==========================================
     Files         686      689       +3     
     Lines       90143    90397     +254     
   ==========================================
   + Hits        66555    66826     +271     
   + Misses      22406    22387      -19     
   - Partials     1182     1184       +2     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `49.87% <92.85%> (+0.31%)` | :arrow_up: |
   | python | `83.64% <81.40%> (-0.04%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/17418?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/metrics/metrics.go](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL21ldHJpY3MvbWV0cmljcy5nbw==) | `49.36% <0.00%> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/exec/fn\_arity.go](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9mbl9hcml0eS5nbw==) | `7.69% <ø> (+1.21%)` | :arrow_up: |
   | [sdks/go/pkg/beam/core/runtime/harness/session.go](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvaGFybmVzcy9zZXNzaW9uLmdv) | `19.51% <ø> (ø)` | |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `90.49% <ø> (-0.03%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `63.76% <63.63%> (+0.13%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `91.84% <70.21%> (-0.81%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <75.00%> (-0.63%)` | :arrow_down: |
   | [sdks/python/apache\_beam/typehints/batch.py](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL2JhdGNoLnB5) | `82.83% <82.83%> (ø)` | |
   | [.../python/apache\_beam/ml/inference/sklearn\_loader.py](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvaW5mZXJlbmNlL3NrbGVhcm5fbG9hZGVyLnB5) | `92.68% <92.68%> (ø)` | |
   | [sdks/go/pkg/beam/core/graph/coder/panes.go](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL2dyYXBoL2NvZGVyL3BhbmVzLmdv) | `81.25% <100.00%> (+81.25%)` | :arrow_up: |
   | ... and [20 more](https://codecov.io/gh/apache/beam/pull/17418/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/17418?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/17418?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [c6972f4...4c4ca1b](https://codecov.io/gh/apache/beam/pull/17418?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
ihji commented on PR #17418:
URL: https://github.com/apache/beam/pull/17418#issuecomment-1104525642

   R: @chamikaramj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r854665303


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -64,8 +65,9 @@
   private @Nullable Object @NonNull [] argsArray;
   private @Nullable Row providedKwargsRow;
 
-  private ExternalPythonTransform(String fullyQualifiedName) {
+  private ExternalPythonTransform(String fullyQualifiedName, int expansionPort) {

Review Comment:
   Can we change to an address so that expansion service can be hosted remotely (similar to Python) ?



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -271,29 +279,21 @@ public OutputT expand(InputT input) {
                   ByteString.copyFrom(
                       CoderUtils.encodeToByteArray(RowCoder.of(payloadSchema), payloadRow)))
               .build();
-      try (AutoCloseable p = service.start()) {
-        PythonService.waitForPort("localhost", port, 15000);
-        PTransform<PInput, PCollectionTuple> transform =
-            External.<PInput, Object>of(
-                    "beam:transforms:python:fully_qualified_named",
-                    payload.toByteArray(),
-                    "localhost:" + port)
-                .withMultiOutputs();
-        PCollectionTuple outputs;
-        if (input instanceof PCollection) {
-          outputs = ((PCollection<?>) input).apply(transform);
-        } else if (input instanceof PCollectionTuple) {
-          outputs = ((PCollectionTuple) input).apply(transform);
-        } else if (input instanceof PBegin) {
-          outputs = ((PBegin) input).apply(transform);
-        } else {
-          throw new RuntimeException("Unhandled input type " + input.getClass());
-        }
-        Set<TupleTag<?>> tags = outputs.getAll().keySet();
-        if (tags.size() == 1) {
-          return (OutputT) outputs.get(Iterables.getOnlyElement(tags));
-        } else {
-          return (OutputT) outputs;
+      if (expansionPort > 0) {
+        PythonService.waitForPort("localhost", expansionPort, 15000);

Review Comment:
   Why do we have to do this when port/address is specified ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ihji commented on a diff in pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
ihji commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r854696465


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -64,8 +65,9 @@
   private @Nullable Object @NonNull [] argsArray;
   private @Nullable Row providedKwargsRow;
 
-  private ExternalPythonTransform(String fullyQualifiedName) {
+  private ExternalPythonTransform(String fullyQualifiedName, int expansionPort) {

Review Comment:
   Also added expansionHost parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #17418: [BEAM-14343] Allow expansion service override in ExternalPythonTransform

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #17418:
URL: https://github.com/apache/beam/pull/17418#discussion_r856943846


##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -65,8 +66,10 @@
   private @Nullable Object @NonNull [] argsArray;
   private @Nullable Row providedKwargsRow;
 
-  private ExternalPythonTransform(String fullyQualifiedName, int expansionPort) {
+  private ExternalPythonTransform(
+      String fullyQualifiedName, String expansionHost, int expansionPort) {

Review Comment:
   Still unsure shy the API is different from Python where we take a single "expansion_service" address: https://github.com/apache/beam/blob/3f2e3c7c9eccb9d40370cbc70e9a451a4b5573f5/sdks/python/apache_beam/transforms/external.py#L417



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/ExternalPythonTransform.java:
##########
@@ -268,20 +268,31 @@ public OutputT expand(InputT input) {
     Row argsRow = buildOrGetArgsRow();
     Row kwargsRow = buildOrGetKwargsRow();
     try {
-      Schema payloadSchema =
-          Schema.of(
-              Schema.Field.of("constructor", Schema.FieldType.STRING),
-              Schema.Field.of("args", Schema.FieldType.row(argsRow.getSchema())),
-              Schema.Field.of("kwargs", Schema.FieldType.row(kwargsRow.getSchema())));
+      Schema.Builder schemaBuilder = Schema.builder();
+      schemaBuilder.addStringField("constructor");
+      if (argsRow.getValues().size() > 0) {
+        schemaBuilder.addRowField("args", argsRow.getSchema());
+      }
+      if (kwargsRow.getValues().size() > 0) {
+        schemaBuilder.addRowField("kwargs", kwargsRow.getSchema());
+      }
+      Schema payloadSchema = schemaBuilder.build();
       payloadSchema.setUUID(UUID.randomUUID());
-      Row payloadRow =
-          Row.withSchema(payloadSchema).addValues(fullyQualifiedName, argsRow, kwargsRow).build();
+      Row.Builder payloadRowBuilder = Row.withSchema(payloadSchema);
+      payloadRowBuilder.addValue(fullyQualifiedName);
+      if (argsRow.getValues().size() > 0) {

Review Comment:
   Please add a comment and a unit test so that this doesn't get broken by a future change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org