You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/06 00:27:36 UTC

[GitHub] [beam] pabloem commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

pabloem commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436214157



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());

Review comment:
       I guess this doesn't make sense, but - would it help to include the Row Schema that we tried(and failed) to use for this JSON string? Some users may not needed, and others can add it themselves in the downstream ParDo - but it's possible it may help. Thoughts?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());
+                      }
+                    }
+                  })
+              .withOutputTags(main, TupleTagList.of(deadLetter)));

Review comment:
       you can add the schema for the outputs here, so that users do not need to add them themselves?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {

Review comment:
       `PCollectionTuple` is not great (though it's what we have after all) - do you think it could make sense to return a custom, more elegant `Result` type? See this PR, where something like this was added for Map-type ptransforms: https://github.com/apache/beam/pull/7736/files#diff-895d512e486834dfd818eec82c75b2c3R122-R181

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new TupleTag<Row>() {};
+
   public static PTransform<PCollection<String>, PCollection<Row>> withSchema(Schema rowSchema) {
     return JsonToRowFn.forSchema(rowSchema);
   }
 
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects of form: {@link JsonToRow#ERROR_ROW_SCHEMA} line : The original json string err :
+   * The error message from the parsing function.
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>{@link JsonToRow#MAIN_TUPLE_TAG}
+   *
+   * <p>{@Code PCollection<Row> personRows =
+   * results.get(JsonToRow.MAIN_TUPLE_TAG).setRowSchema(personSchema)}

Review comment:
       Maybe add the schema for the PCollections in `expand` so that users won't have to add them manually?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org