You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "robertwb (via GitHub)" <gi...@apache.org> on 2023/08/31 16:01:46 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #28210: Upgrade transforms without upgrading the pipelines

robertwb commented on code in PR #28210:
URL: https://github.com/apache/beam/pull/28210#discussion_r1311827147


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java:
##########
@@ -84,6 +91,16 @@ public FunctionSpec translate(
       }
     }
 
+    @Override
+    public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {

Review Comment:
   I wonder if these should be in an optional interface, or if default null-returning implementations should be provided. (We could argue that it should be strongly encouraged.)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java:
##########
@@ -62,6 +64,11 @@ private CombinePerKeyPayloadTranslator() {}
 
     @Override
     public String getUrn(Combine.PerKey<?, ?, ?> transform) {
+      return getUrn();

Review Comment:
   Should this be in the base class?



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -426,6 +437,85 @@ public String getUrn(PTransform transform) {
       return KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
     }
 
+    private int findAvailablePort() throws IOException {
+      ServerSocket s = new ServerSocket(0);
+      try {
+        return s.getLocalPort();
+      } finally {
+        s.close();
+        try {
+          // Some systems don't free the port for future use immediately.
+          Thread.sleep(100);
+        } catch (InterruptedException exn) {
+          // ignore
+        }
+      }
+    }
+
+    public FunctionSpec getSpecViaTransformService(
+        AppliedPTransform<?, ?, ?> originalAppliedPTransform,
+        TransformPayloadTranslator originalPayloadTranslator)
+        throws IOException {
+
+      ExternalTranslationOptions externalTranslationOptions =
+          originalAppliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+
+      Row configRow =
+          originalPayloadTranslator.toConfigRow(originalAppliedPTransform.getTransform());
+
+      ByteStringOutputStream outputStream = new ByteStringOutputStream();
+      try {
+        RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      String urn = originalPayloadTranslator.getUrn(originalAppliedPTransform.getTransform());
+      ExternalTransforms.ExternalConfigurationPayload payload =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true))
+              .setPayload(outputStream.toByteString())
+              .build();
+
+      String serviceAddress = null;
+      TransformServiceLauncher service = null;
+
+      try {
+        if (externalTranslationOptions.getTransformServiceAddress() != null) {
+          serviceAddress = externalTranslationOptions.getTransformServiceAddress();
+        } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) {
+          String projectName = UUID.randomUUID().toString();
+          service = TransformServiceLauncher.forProject(projectName, this.findAvailablePort());
+          service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+          // Starting the transform service.
+          service.start();
+          // Waiting the service to be ready.
+          service.waitTillUp(15000);
+        } else {
+          throw new IllegalArgumentException(
+              "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service");
+        }
+
+        ExpandableTransform externalTransform =
+            External.of(urn, payload.toByteArray(), serviceAddress);
+
+        PCollectionTuple input = PCollectionTuple.empty(originalAppliedPTransform.getPipeline());
+        for (TupleTag<?> tag : originalAppliedPTransform.getInputs().keySet()) {
+          input = input.and(tag.getId(), originalAppliedPTransform.getInputs().get(tag));
+        }
+        externalTransform.expand(input);
+
+        return externalTransform.getExpandedTransform().getSpec();

Review Comment:
   How do the outputs get wired up correctly?



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;
+          try {
+            urn = translator.getUrn();
+            if (urn == null) {
+              LOG.info(
+                  "Could not load the TransformPayloadTranslator "
+                      + translator
+                      + " to the Expansion Service.");
+              continue;
+            }
+          } catch (Exception e) {

Review Comment:
   Why would this happen? Feels that this is something exceptional enough to propagate up. 



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {

Review Comment:
   When would this happen?



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;
+          try {
+            urn = translator.getUrn();
+            if (urn == null) {
+              LOG.info(
+                  "Could not load the TransformPayloadTranslator "
+                      + translator
+                      + " to the Expansion Service.");
+              continue;
+            }
+          } catch (Exception e) {
+            LOG.info(
+                "Could not load the TransformPayloadTranslator "
+                    + translator
+                    + " to the Expansion Service.");
+            continue;
+          }
+
+          if (deprecatedTransformURNs.contains(urn)) {
+            continue;
+          }
+          final String finalUrn = urn;
+          TransformProvider transformProvider =
+              spec -> {
+                try {
+                  ExternalConfigurationPayload payload =
+                      ExternalConfigurationPayload.parseFrom(spec.getPayload());
+                  Row configRow =
+                      RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema()))
+                          .decode(new ByteArrayInputStream(payload.getPayload().toByteArray()));
+                  @Nullable PTransform transformFromRow = translator.fromConfigRow(configRow);
+                  if (transformFromRow != null) {

Review Comment:
   Prefer making this not nullable and throwing a NotImplementedError rather than having to both check for null and catch the exception? 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -426,6 +437,85 @@ public String getUrn(PTransform transform) {
       return KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
     }
 
+    private int findAvailablePort() throws IOException {
+      ServerSocket s = new ServerSocket(0);
+      try {
+        return s.getLocalPort();
+      } finally {
+        s.close();
+        try {
+          // Some systems don't free the port for future use immediately.
+          Thread.sleep(100);
+        } catch (InterruptedException exn) {
+          // ignore
+        }
+      }
+    }
+
+    public FunctionSpec getSpecViaTransformService(
+        AppliedPTransform<?, ?, ?> originalAppliedPTransform,
+        TransformPayloadTranslator originalPayloadTranslator)
+        throws IOException {
+
+      ExternalTranslationOptions externalTranslationOptions =
+          originalAppliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+
+      Row configRow =
+          originalPayloadTranslator.toConfigRow(originalAppliedPTransform.getTransform());
+
+      ByteStringOutputStream outputStream = new ByteStringOutputStream();
+      try {
+        RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      String urn = originalPayloadTranslator.getUrn(originalAppliedPTransform.getTransform());
+      ExternalTransforms.ExternalConfigurationPayload payload =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true))
+              .setPayload(outputStream.toByteString())
+              .build();
+
+      String serviceAddress = null;
+      TransformServiceLauncher service = null;
+
+      try {
+        if (externalTranslationOptions.getTransformServiceAddress() != null) {
+          serviceAddress = externalTranslationOptions.getTransformServiceAddress();
+        } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) {
+          String projectName = UUID.randomUUID().toString();
+          service = TransformServiceLauncher.forProject(projectName, this.findAvailablePort());
+          service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+          // Starting the transform service.
+          service.start();
+          // Waiting the service to be ready.
+          service.waitTillUp(15000);
+        } else {
+          throw new IllegalArgumentException(
+              "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service");
+        }
+
+        ExpandableTransform externalTransform =
+            External.of(urn, payload.toByteArray(), serviceAddress);

Review Comment:
   serviceAddress might be unset. (Perhaps don't set it to null above to let the compiler check this for you.)



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -435,10 +525,24 @@ public RunnerApi.PTransform translate(
       RunnerApi.PTransform.Builder transformBuilder =
           translateAppliedPTransform(appliedPTransform, subtransforms, components);
 
-      FunctionSpec spec =
-          KNOWN_PAYLOAD_TRANSLATORS
-              .get(appliedPTransform.getTransform().getClass())
-              .translate(appliedPTransform, components);
+      TransformPayloadTranslator payloadTranslator =
+          KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
+
+      ExternalTranslationOptions externalTranslationOptions =
+          appliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+      List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride();
+
+      PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+      FunctionSpec spec = null;
+      if (getUrn(transform) != null && urnsToOverride.contains(getUrn(transform))) {
+        // Expand using the transform service.
+        spec = getSpecViaTransformService(appliedPTransform, payloadTranslator);
+      } else {
+        // Expand locally.

Review Comment:
   This isn't an expand. Maybe "translate directly."



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -435,10 +525,24 @@ public RunnerApi.PTransform translate(
       RunnerApi.PTransform.Builder transformBuilder =
           translateAppliedPTransform(appliedPTransform, subtransforms, components);
 
-      FunctionSpec spec =
-          KNOWN_PAYLOAD_TRANSLATORS
-              .get(appliedPTransform.getTransform().getClass())
-              .translate(appliedPTransform, components);
+      TransformPayloadTranslator payloadTranslator =
+          KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
+
+      ExternalTranslationOptions externalTranslationOptions =
+          appliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+      List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride();
+
+      PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+      FunctionSpec spec = null;
+      if (getUrn(transform) != null && urnsToOverride.contains(getUrn(transform))) {

Review Comment:
   I have to say it does feel a bit odd to do this substitution at translation time. Did you consider doing this as a separate pass post-translation rather than intertwining the two?



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java:
##########
@@ -46,6 +54,22 @@ public FunctionSpec translate(
         AppliedPTransform<?, ?, Impulse> application, SdkComponents components) throws IOException {
       return FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
     }
+
+    @Override
+    public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {
+      Impulse impulse = (Impulse) pTransform;
+      System.out.println("Found impulse transform: " + impulse);

Review Comment:
   Remove debugging.



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;
+          try {
+            urn = translator.getUrn();
+            if (urn == null) {
+              LOG.info(

Review Comment:
   Maybe the warning should be softer, that this translator does not have a tranform-independent URN? 



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;

Review Comment:
   I think we can leave this unset and the compiler is smart enough to see the assignment or continue. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org