You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/06 00:53:41 UTC

[GitHub] [beam] reuvenlax commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

reuvenlax commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436220739



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new TupleTag<Row>() {};
+
   public static PTransform<PCollection<String>, PCollection<Row>> withSchema(Schema rowSchema) {
     return JsonToRowFn.forSchema(rowSchema);
   }
 
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects of form: {@link JsonToRow#ERROR_ROW_SCHEMA} line : The original json string err :
+   * The error message from the parsing function.
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>{@link JsonToRow#MAIN_TUPLE_TAG}
+   *
+   * <p>{@Code PCollection<Row> personRows =
+   * results.get(JsonToRow.MAIN_TUPLE_TAG).setRowSchema(personSchema)}

Review comment:
       +1. If you output.a Row, you should be setting the schema in your transform.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;

Review comment:
       make final

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+

Review comment:
       would be nicer to make these field names configurable, though with defaults.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {

Review comment:
       I think it would be cleaner to wrap this in a custom result class and not expose the TupleTags to users. Look org.apache.beam.sdk.io.gcp.bigquery.WriteResult for an example.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {

Review comment:
       Why not use injected parameters instead of ProcessContext?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new TupleTag<Row>() {};
+

Review comment:
       give these tuple tags real names




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org