You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/08 22:32:11 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #22004: [BEAM-22003] Allow merging consecutive external transforms in Java SDK

robertwb commented on code in PR #22004:
URL: https://github.com/apache/beam/pull/22004#discussion_r917163121


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java:
##########
@@ -76,25 +81,36 @@
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 public class External {
-  private static final String EXPANDED_TRANSFORM_BASE_NAME = "external";
+  protected static final String EXPANDED_TRANSFORM_BASE_NAME = "external";
   private static final String IMPULSE_PREFIX = "IMPULSE";
   private static AtomicInteger namespaceCounter = new AtomicInteger(0);
 
   private static final ExpansionServiceClientFactory DEFAULT =
       DefaultExpansionServiceClientFactory.create(
           endPoint -> ManagedChannelBuilder.forTarget(endPoint.getUrl()).usePlaintext().build());
 
-  private static int getFreshNamespaceIndex() {
+  public static int getFreshNamespaceIndex() {

Review Comment:
   I don't think this (or EXPANDED_TRANSFORM_BASE_NAME) should be exposed. If we have to expose something, we could expose getFreshNamespace() that returns the concatenation, but best to not expose these internal implementation details at all. 



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java:
##########
@@ -435,6 +437,15 @@ public OutputT expand(InputT input) {
     }
   }
 
+  public List<External.ExpansionInfo> getExpansionInfoList() {

Review Comment:
   This shouldn't be needed (it doesn't overload anything, right?)



##########
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java:
##########
@@ -221,6 +221,17 @@ protected void pythonDependenciesTest(Pipeline pipeline) {
               .apply(External.of(TEST_PYTHON_BS4_URN, new byte[] {}, expansionAddr));
       PAssert.that(col).containsInAnyOrder("The Dormouse's story");
     }
+
+    protected void combineMultipleTransformTest(Pipeline pipeline) throws IOException {
+      PCollection<String> col =
+          pipeline
+              .apply(Create.of(1L, 2L, 3L))
+              .apply(
+                  External.of(
+                      External.of("map_to_union_types", new byte[] {}, expansionAddr),

Review Comment:
   Reading this test, I wouldn't know where that is defined. Similar for TEST_PREFIX_URN. Let's make this self-contained, e.g. 
   
   ```
   pipeline
   .apply(Create.of(-1, 0, 1))
   .apply(PythonMap.viaMapFn("lambda x: 'negative' if x < 0 else x"))
   .apply(PythonMap.viaMapFn("type"))
   .apply(PythonMap.viaMapFn("str"));
   ```
   
   which will result in "str", "int", "int".



##########
sdks/java/extensions/python/src/main/java/org/apache/beam/sdk/extensions/python/PythonExternalTransform.java:
##########
@@ -435,6 +437,15 @@ public OutputT expand(InputT input) {
     }
   }
 
+  public List<External.ExpansionInfo> getExpansionInfoList() {
+    return ImmutableList.of(
+        External.ExpansionInfo.create(
+            "beam:transforms:python:fully_qualified_named",
+            generatePayload().toByteArray(),
+            expansionService,
+            External.getFreshNamespaceIndex()));

Review Comment:
   This has side effects, shouldn't be called multiple times (especially from a getter) as it returns a different value each time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org