You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "chamikaramj (via GitHub)" <gi...@apache.org> on 2023/09/13 04:32:29 UTC

[GitHub] [beam] chamikaramj commented on a diff in pull request #28210: Upgrade transforms without upgrading the pipelines

chamikaramj commented on code in PR #28210:
URL: https://github.com/apache/beam/pull/28210#discussion_r1323756130


##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ImpulseTranslation.java:
##########
@@ -46,6 +54,22 @@ public FunctionSpec translate(
         AppliedPTransform<?, ?, Impulse> application, SdkComponents components) throws IOException {
       return FunctionSpec.newBuilder().setUrn(getUrn(application.getTransform())).build();
     }
+
+    @Override
+    public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {
+      Impulse impulse = (Impulse) pTransform;
+      System.out.println("Found impulse transform: " + impulse);

Review Comment:
   Done.



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java:
##########
@@ -62,6 +64,11 @@ private CombinePerKeyPayloadTranslator() {}
 
     @Override
     public String getUrn(Combine.PerKey<?, ?, ?> transform) {
+      return getUrn();

Review Comment:
   Yeah, moved.



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java:
##########
@@ -84,6 +91,16 @@ public FunctionSpec translate(
       }
     }
 
+    @Override
+    public @Nullable Row toConfigRow(PTransform<?, ?> pTransform) {

Review Comment:
   I don't think these need  a separate interface but good point regarding providing default implementations. Made "toConfigRow" and "fromConfigRow" default methods of the interface so that sub-classes can choose to not implement them. 



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -426,6 +437,85 @@ public String getUrn(PTransform transform) {
       return KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
     }
 
+    private int findAvailablePort() throws IOException {
+      ServerSocket s = new ServerSocket(0);
+      try {
+        return s.getLocalPort();
+      } finally {
+        s.close();
+        try {
+          // Some systems don't free the port for future use immediately.
+          Thread.sleep(100);
+        } catch (InterruptedException exn) {
+          // ignore
+        }
+      }
+    }
+
+    public FunctionSpec getSpecViaTransformService(
+        AppliedPTransform<?, ?, ?> originalAppliedPTransform,
+        TransformPayloadTranslator originalPayloadTranslator)
+        throws IOException {
+
+      ExternalTranslationOptions externalTranslationOptions =
+          originalAppliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+
+      Row configRow =
+          originalPayloadTranslator.toConfigRow(originalAppliedPTransform.getTransform());
+
+      ByteStringOutputStream outputStream = new ByteStringOutputStream();
+      try {
+        RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      String urn = originalPayloadTranslator.getUrn(originalAppliedPTransform.getTransform());
+      ExternalTransforms.ExternalConfigurationPayload payload =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true))
+              .setPayload(outputStream.toByteString())
+              .build();
+
+      String serviceAddress = null;
+      TransformServiceLauncher service = null;
+
+      try {
+        if (externalTranslationOptions.getTransformServiceAddress() != null) {
+          serviceAddress = externalTranslationOptions.getTransformServiceAddress();
+        } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) {
+          String projectName = UUID.randomUUID().toString();
+          service = TransformServiceLauncher.forProject(projectName, this.findAvailablePort());
+          service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+          // Starting the transform service.
+          service.start();
+          // Waiting the service to be ready.
+          service.waitTillUp(15000);
+        } else {
+          throw new IllegalArgumentException(
+              "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service");
+        }
+
+        ExpandableTransform externalTransform =
+            External.of(urn, payload.toByteArray(), serviceAddress);
+
+        PCollectionTuple input = PCollectionTuple.empty(originalAppliedPTransform.getPipeline());
+        for (TupleTag<?> tag : originalAppliedPTransform.getInputs().keySet()) {
+          input = input.and(tag.getId(), originalAppliedPTransform.getInputs().get(tag));
+        }
+        externalTransform.expand(input);
+
+        return externalTransform.getExpandedTransform().getSpec();

Review Comment:
   Added logic to wire inputs and outputs correctly (in PipelineTranslator now).



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {

Review Comment:
   Seems like root Nodes of the graph may not have transforms set.



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -435,10 +525,24 @@ public RunnerApi.PTransform translate(
       RunnerApi.PTransform.Builder transformBuilder =
           translateAppliedPTransform(appliedPTransform, subtransforms, components);
 
-      FunctionSpec spec =
-          KNOWN_PAYLOAD_TRANSLATORS
-              .get(appliedPTransform.getTransform().getClass())
-              .translate(appliedPTransform, components);
+      TransformPayloadTranslator payloadTranslator =
+          KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
+
+      ExternalTranslationOptions externalTranslationOptions =
+          appliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+      List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride();
+
+      PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+      FunctionSpec spec = null;
+      if (getUrn(transform) != null && urnsToOverride.contains(getUrn(transform))) {
+        // Expand using the transform service.
+        spec = getSpecViaTransformService(appliedPTransform, payloadTranslator);
+      } else {
+        // Expand locally.

Review Comment:
   This line was reverted.



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -426,6 +437,85 @@ public String getUrn(PTransform transform) {
       return KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
     }
 
+    private int findAvailablePort() throws IOException {
+      ServerSocket s = new ServerSocket(0);
+      try {
+        return s.getLocalPort();
+      } finally {
+        s.close();
+        try {
+          // Some systems don't free the port for future use immediately.
+          Thread.sleep(100);
+        } catch (InterruptedException exn) {
+          // ignore
+        }
+      }
+    }
+
+    public FunctionSpec getSpecViaTransformService(
+        AppliedPTransform<?, ?, ?> originalAppliedPTransform,
+        TransformPayloadTranslator originalPayloadTranslator)
+        throws IOException {
+
+      ExternalTranslationOptions externalTranslationOptions =
+          originalAppliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+
+      Row configRow =
+          originalPayloadTranslator.toConfigRow(originalAppliedPTransform.getTransform());
+
+      ByteStringOutputStream outputStream = new ByteStringOutputStream();
+      try {
+        RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      String urn = originalPayloadTranslator.getUrn(originalAppliedPTransform.getTransform());
+      ExternalTransforms.ExternalConfigurationPayload payload =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true))
+              .setPayload(outputStream.toByteString())
+              .build();
+
+      String serviceAddress = null;
+      TransformServiceLauncher service = null;
+
+      try {
+        if (externalTranslationOptions.getTransformServiceAddress() != null) {
+          serviceAddress = externalTranslationOptions.getTransformServiceAddress();
+        } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) {
+          String projectName = UUID.randomUUID().toString();
+          service = TransformServiceLauncher.forProject(projectName, this.findAvailablePort());
+          service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+          // Starting the transform service.
+          service.start();
+          // Waiting the service to be ready.
+          service.waitTillUp(15000);
+        } else {
+          throw new IllegalArgumentException(
+              "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service");
+        }
+
+        ExpandableTransform externalTransform =
+            External.of(urn, payload.toByteArray(), serviceAddress);

Review Comment:
   (Also this logic was moved to PipelineTranslator).



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -426,6 +437,85 @@ public String getUrn(PTransform transform) {
       return KNOWN_PAYLOAD_TRANSLATORS.get(transform.getClass()).getUrn(transform);
     }
 
+    private int findAvailablePort() throws IOException {
+      ServerSocket s = new ServerSocket(0);
+      try {
+        return s.getLocalPort();
+      } finally {
+        s.close();
+        try {
+          // Some systems don't free the port for future use immediately.
+          Thread.sleep(100);
+        } catch (InterruptedException exn) {
+          // ignore
+        }
+      }
+    }
+
+    public FunctionSpec getSpecViaTransformService(
+        AppliedPTransform<?, ?, ?> originalAppliedPTransform,
+        TransformPayloadTranslator originalPayloadTranslator)
+        throws IOException {
+
+      ExternalTranslationOptions externalTranslationOptions =
+          originalAppliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+
+      Row configRow =
+          originalPayloadTranslator.toConfigRow(originalAppliedPTransform.getTransform());
+
+      ByteStringOutputStream outputStream = new ByteStringOutputStream();
+      try {
+        RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      String urn = originalPayloadTranslator.getUrn(originalAppliedPTransform.getTransform());
+      ExternalTransforms.ExternalConfigurationPayload payload =
+          ExternalTransforms.ExternalConfigurationPayload.newBuilder()
+              .setSchema(SchemaTranslation.schemaToProto(configRow.getSchema(), true))
+              .setPayload(outputStream.toByteString())
+              .build();
+
+      String serviceAddress = null;
+      TransformServiceLauncher service = null;
+
+      try {
+        if (externalTranslationOptions.getTransformServiceAddress() != null) {
+          serviceAddress = externalTranslationOptions.getTransformServiceAddress();
+        } else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) {
+          String projectName = UUID.randomUUID().toString();
+          service = TransformServiceLauncher.forProject(projectName, this.findAvailablePort());
+          service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());
+
+          // Starting the transform service.
+          service.start();
+          // Waiting the service to be ready.
+          service.waitTillUp(15000);
+        } else {
+          throw new IllegalArgumentException(
+              "Either option TransformServiceAddress or option TransformServiceBeamVersion should be provided to override a transform using the transform service");
+        }
+
+        ExpandableTransform externalTransform =
+            External.of(urn, payload.toByteArray(), serviceAddress);

Review Comment:
   Added a check to confirm that it's not null (either should be provided by user or should be the address of the Transform Service we start here). 



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;
+          try {
+            urn = translator.getUrn();
+            if (urn == null) {
+              LOG.info(
+                  "Could not load the TransformPayloadTranslator "
+                      + translator
+                      + " to the Expansion Service.");
+              continue;
+            }
+          } catch (Exception e) {

Review Comment:
   This is  general catch all to prevent a single incorrectly implemented PayloadTranslator available in the system from crashing to whole service. I think it's safer to catch such exceptions and continue to load the rest of the transforms (we do the same for Schema-aware transforms). Note that some of these implementations may come from custom/user libraries.



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;
+          try {
+            urn = translator.getUrn();
+            if (urn == null) {
+              LOG.info(
+                  "Could not load the TransformPayloadTranslator "
+                      + translator
+                      + " to the Expansion Service.");
+              continue;
+            }
+          } catch (Exception e) {
+            LOG.info(
+                "Could not load the TransformPayloadTranslator "
+                    + translator
+                    + " to the Expansion Service.");
+            continue;
+          }
+
+          if (deprecatedTransformURNs.contains(urn)) {
+            continue;
+          }
+          final String finalUrn = urn;
+          TransformProvider transformProvider =
+              spec -> {
+                try {
+                  ExternalConfigurationPayload payload =
+                      ExternalConfigurationPayload.parseFrom(spec.getPayload());
+                  Row configRow =
+                      RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema()))
+                          .decode(new ByteArrayInputStream(payload.getPayload().toByteArray()));
+                  @Nullable PTransform transformFromRow = translator.fromConfigRow(configRow);
+                  if (transformFromRow != null) {

Review Comment:
   Now these methods return not nullable objects and provide default implementations that throw "UnsupportedOperationException". 



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;
+          try {
+            urn = translator.getUrn();
+            if (urn == null) {
+              LOG.info(

Review Comment:
   Yeah, changed to debug and updated.



##########
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java:
##########
@@ -435,10 +525,24 @@ public RunnerApi.PTransform translate(
       RunnerApi.PTransform.Builder transformBuilder =
           translateAppliedPTransform(appliedPTransform, subtransforms, components);
 
-      FunctionSpec spec =
-          KNOWN_PAYLOAD_TRANSLATORS
-              .get(appliedPTransform.getTransform().getClass())
-              .translate(appliedPTransform, components);
+      TransformPayloadTranslator payloadTranslator =
+          KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
+
+      ExternalTranslationOptions externalTranslationOptions =
+          appliedPTransform.getPipeline().getOptions().as(ExternalTranslationOptions.class);
+      List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride();
+
+      PTransform<?, ?> transform = appliedPTransform.getTransform();
+
+      FunctionSpec spec = null;
+      if (getUrn(transform) != null && urnsToOverride.contains(getUrn(transform))) {

Review Comment:
   I think translation time is a good fit for this since we are modifying the translated pipeline, but you are right that this has to be done at a higher level since otherwise it's impossible to wire inputs and outputs correctly.
   
   I moved this logic to PipelineTranslator and fixed the wiring.



##########
sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java:
##########
@@ -178,6 +184,65 @@ public List<String> getDependencies(
         }
       }
 
+      List<String> deprecatedTransformURNs = ImmutableList.of(READ_TRANSFORM_URN);
+      for (TransformPayloadTranslatorRegistrar registrar :
+          ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) {
+        for (Map.Entry<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
+            entry : registrar.getTransformPayloadTranslators().entrySet()) {
+          @Initialized TransformPayloadTranslator translator = entry.getValue();
+          if (translator == null) {
+            continue;
+          }
+
+          String urn = null;

Review Comment:
   This code was reverted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org