You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/10 10:20:22 UTC

[GitHub] [beam] brachi-wernick commented on a change in pull request #15915: [beam-12737] add API to handle failed rows in the collection due to sql query runtime error

brachi-wernick commented on a change in pull request #15915:
URL: https://github.com/apache/beam/pull/15915#discussion_r746445391



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -192,9 +207,17 @@ public Calc copy(RelTraitSet traitSet, RelNode input, RexProgram program) {
               outputSchema,
               options.getVerifyRowValues(),
               getJarPaths(program),
-              inputGetter.getFieldAccess());
-
-      return upstream.apply(ParDo.of(calcFn)).setRowSchema(outputSchema);
+              inputGetter.getFieldAccess(),
+              errorsTransformer != null);
+
+      PCollectionTuple tuple =
+          upstream.apply(ParDo.of(calcFn).withOutputTags(rows, TupleTagList.of(errors)));
+      PCollection<BeamCalcRelError> errorPCollection =
+          tuple.get(errors).setCoder(BeamCalcRelErrorCoder.of(RowCoder.of(upstream.getSchema())));

Review comment:
       sure,  I can change it to be a json string instead of row, but before I change it, I want to make sure we don't miss here something, because I prefer to send the row itself to the error transformer.
   
   The error row is not the row *after* running the expression and getting the error, it is the origin row, as it was *before* running the DoFn, that's why I set the schema to be `upstream.getSchema()`, 
   The row *after* the expression execution is a different one, and has `outputSchema`.
   
   Also each node in the same SqlTransform create this chaining, and will assign a diff input and output schema, these schemas are per node and not per all the sql transform.
    
   This is how will it look like when expanding it in DF (ignore the failed icons, this is because I abort the execution but it worked well)
   ![image](https://user-images.githubusercontent.com/3658085/141094730-77671b5b-1cc5-4839-8610-80eb6b379843.png)
   

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -112,11 +119,19 @@
   private static final long MILLIS_PER_DAY = 86400000L;
 
   private static final ParameterExpression rowParam = Expressions.parameter(Row.class, "row");
+  private PTransform<PCollection<BeamCalcRelError>, POutput> errorsTransformer;
+  private static final TupleTag<Row> rows = new TupleTag<Row>() {};
+  private static final TupleTag<BeamCalcRelError> errors = new TupleTag<BeamCalcRelError>() {};
 
   public BeamCalcRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexProgram program) {
     super(cluster, traits, input, program);
   }
 
+  @Override
+  public void withErrorsTransformer(PTransform<PCollection<BeamCalcRelError>, POutput> ptransform) {

Review comment:
       Yes, I can add to `buildPTransform`, but it will break the API for all `BeamRelNode` implementations.
   This way, when I add it with a default function, any implementation that doesn't need to handle errors can continue as it was before, and the one that wants to add error handling will implement this method and use this variable.
   Does it make sense? or you still prefer to break the api




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org