You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/05 04:35:09 UTC

[GitHub] [beam] rezarokni opened a new pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

rezarokni opened a new pull request #11929:
URL: https://github.com/apache/beam/pull/11929


   Add deadletter support to JsonToRow. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-644306987


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645497788


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r438508022



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() {};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());

Review comment:
       Changed to be passed in via resultBuilder.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437509966



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() {};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());

Review comment:
       Seems a bit roundabout. I would store these in the ParseResult object itself.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() {};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+      this.failedParse =
+          result.get(PARSE_ERROR_LINE).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
+      this.failedParseWithErr =
+          result
+              .get(PARSE_ERROR_LINE_WITH_MSG)
+              .setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
+
+      return ParseResult.result(this);
+    }
+
+    private static class ParseWithError extends DoFn<String, Row> {
+      private transient volatile @Nullable ObjectMapper objectMapper;

Review comment:
       why volatle?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() {};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+      this.failedParse =
+          result.get(PARSE_ERROR_LINE).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
+      this.failedParseWithErr =
+          result
+              .get(PARSE_ERROR_LINE_WITH_MSG)
+              .setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
+
+      return ParseResult.result(this);
+    }
+
+    private static class ParseWithError extends DoFn<String, Row> {
+      private transient volatile @Nullable ObjectMapper objectMapper;
+      Schema schema;
+      Boolean withExtendedErrorInfo;
+
+      ParseWithError(Schema schema, Boolean withExtendedErrorInfo) {
+        this.schema = schema;
+        this.withExtendedErrorInfo = withExtendedErrorInfo;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        try {
+
+          context.output(jsonToRow(objectMapper(), context.element()));
+
+        } catch (Exception ex) {
+
+          if (withExtendedErrorInfo) {
+            context.output(
+                PARSE_ERROR_LINE_WITH_MSG,

Review comment:
       Instead of separate output tags why not just add a nullable error-msg field to ERROR_ROW_SCHEMA that is not filled in if not asked for.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() {};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+      this.failedParse =
+          result.get(PARSE_ERROR_LINE).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
+      this.failedParseWithErr =
+          result
+              .get(PARSE_ERROR_LINE_WITH_MSG)
+              .setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
+
+      return ParseResult.result(this);
+    }
+
+    private static class ParseWithError extends DoFn<String, Row> {
+      private transient volatile @Nullable ObjectMapper objectMapper;
+      Schema schema;
+      Boolean withExtendedErrorInfo;
+
+      ParseWithError(Schema schema, Boolean withExtendedErrorInfo) {
+        this.schema = schema;
+        this.withExtendedErrorInfo = withExtendedErrorInfo;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        try {
+
+          context.output(jsonToRow(objectMapper(), context.element()));
+
+        } catch (Exception ex) {
+
+          if (withExtendedErrorInfo) {
+            context.output(
+                PARSE_ERROR_LINE_WITH_MSG,
+                Row.withSchema(ERROR_ROW_WITH_ERR_MSG_SCHEMA)
+                    .addValue(context.element())
+                    .addValue(ex.getMessage())
+                    .build());

Review comment:
       BTW you know that there's a more-readable builder now, right?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {

Review comment:
       I think we should name this something different, as not everyone knows the term dead letter. 

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() {};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+      this.failedParse =
+          result.get(PARSE_ERROR_LINE).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
+      this.failedParseWithErr =
+          result
+              .get(PARSE_ERROR_LINE_WITH_MSG)
+              .setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
+
+      return ParseResult.result(this);
+    }
+
+    private static class ParseWithError extends DoFn<String, Row> {
+      private transient volatile @Nullable ObjectMapper objectMapper;
+      Schema schema;
+      Boolean withExtendedErrorInfo;
+
+      ParseWithError(Schema schema, Boolean withExtendedErrorInfo) {
+        this.schema = schema;
+        this.withExtendedErrorInfo = withExtendedErrorInfo;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        try {
+
+          context.output(jsonToRow(objectMapper(), context.element()));
+
+        } catch (Exception ex) {
+
+          if (withExtendedErrorInfo) {
+            context.output(
+                PARSE_ERROR_LINE_WITH_MSG,
+                Row.withSchema(ERROR_ROW_WITH_ERR_MSG_SCHEMA)
+                    .addValue(context.element())
+                    .addValue(ex.getMessage())
+                    .build());
+          } else {
+            context.output(
+                PARSE_ERROR_LINE,
+                Row.withSchema(ERROR_ROW_SCHEMA).addValue(context.element()).build());
+          }
+        }
+      }
+
+      private ObjectMapper objectMapper() {
+        if (this.objectMapper == null) {
+          synchronized (this) {
+            if (this.objectMapper == null) {
+              this.objectMapper = newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
+            }
+          }
+        }
+
+        return this.objectMapper;
+      }
+    }
+  }
+
+  /** The result of a {@link JsonToRow#withDeadLetter(Schema)} transform. */
+  public static final class ParseResult implements POutput {
+    private final JsonToRowWithErrFn jsonToRowWithErrFn;
+
+    private ParseResult(JsonToRowWithErrFn jsonToRowWithErrFn) {
+      this.jsonToRowWithErrFn = jsonToRowWithErrFn;
+    }
+
+    public static ParseResult result(JsonToRowWithErrFn jsonToRowWithErrFn) {
+      return new ParseResult(jsonToRowWithErrFn);
+    }
+
+    @Override
+    public Pipeline getPipeline() {
+      return jsonToRowWithErrFn.pipeline;
+    }
+
+    @Override
+    public Map<TupleTag<?>, PValue> expand() {
+      if (jsonToRowWithErrFn.getExtendedErrorInfo()) {
+        return ImmutableMap.of(
+            JsonToRowWithErrFn.PARSED_LINE,
+            jsonToRowWithErrFn.parsedLine,
+            JsonToRowWithErrFn.PARSE_ERROR_LINE_WITH_MSG,
+            jsonToRowWithErrFn.failedParseWithErr);
+      }
+      return ImmutableMap.of(
+          JsonToRowWithErrFn.PARSED_LINE,
+          jsonToRowWithErrFn.parsedLine,
+          JsonToRowWithErrFn.PARSE_ERROR_LINE,
+          jsonToRowWithErrFn.failedParse);
+    }
+
+    @Override
+    public void finishSpecifyingOutput(
+        String transformName, PInput input, PTransform<?, ?> transform) {}
+
+    /** Returns a {@link PCollection} containing the {@link Row}s that have been parsed. */
+    public PCollection<Row> getResults() {
+      return jsonToRowWithErrFn.parsedLine;
+    }
+
+    /**
+     * Returns a {@link PCollection} containing the {@link Row}s that didn't parse.
+     *
+     * <p>Only use this method if you haven't enabled {@link
+     * JsonToRowWithErrFn#withExtendedErrorInfo()} . Otherwise use {@link
+     * ParseResult##getFailedInsertsWithErr()}
+     */
+    public PCollection<Row> getFailedToParseLines() {
+      checkArgument(
+          !jsonToRowWithErrFn.getExtendedErrorInfo(),
+          "Cannot use getFailedToParseLines as this ParseResult uses extended errors"
+              + " information. Use getFailedToParseLinesWithErr instead");
+      return jsonToRowWithErrFn.failedParse;
+    }
+
+    /**
+     * Returns a {@link PCollection} containing the a Row with detailed error information.
+     *
+     * <p>Only use this method if you have enabled {@link
+     * JsonToRowWithErrFn#withExtendedErrorInfo()}. * Otherwise use {@link
+     * ParseResult#getFailedToParseLines()}
+     */
+    public PCollection<Row> getFailedToParseLinesWithErr() {

Review comment:
       see above - I don't think we need both here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437762683



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() {};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+      this.failedParse =
+          result.get(PARSE_ERROR_LINE).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
+      this.failedParseWithErr =
+          result
+              .get(PARSE_ERROR_LINE_WITH_MSG)
+              .setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
+
+      return ParseResult.result(this);
+    }
+
+    private static class ParseWithError extends DoFn<String, Row> {
+      private transient volatile @Nullable ObjectMapper objectMapper;
+      Schema schema;
+      Boolean withExtendedErrorInfo;
+
+      ParseWithError(Schema schema, Boolean withExtendedErrorInfo) {
+        this.schema = schema;
+        this.withExtendedErrorInfo = withExtendedErrorInfo;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        try {
+
+          context.output(jsonToRow(objectMapper(), context.element()));
+
+        } catch (Exception ex) {
+
+          if (withExtendedErrorInfo) {
+            context.output(
+                PARSE_ERROR_LINE_WITH_MSG,
+                Row.withSchema(ERROR_ROW_WITH_ERR_MSG_SCHEMA)
+                    .addValue(context.element())
+                    .addValue(ex.getMessage())
+                    .build());

Review comment:
       Not sure, do you mean using withFieldValue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-644263372


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-643479659


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645525227


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645146173


   Just gotta believe in ourselves....


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437764105



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() {};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+      this.failedParse =
+          result.get(PARSE_ERROR_LINE).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
+      this.failedParseWithErr =
+          result
+              .get(PARSE_ERROR_LINE_WITH_MSG)
+              .setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
+
+      return ParseResult.result(this);
+    }
+
+    private static class ParseWithError extends DoFn<String, Row> {
+      private transient volatile @Nullable ObjectMapper objectMapper;

Review comment:
       This was used in the original JsonToRow , but I didnt see any notes as to why. Also this Object Mapper was at the Transform class level before and used in an anon DoFn. 
   In theory this is something that I would instantiate in @Setup, but again we might want to make those changes in a separate PR which can also have some larger tests to make sure it works ok with multiple DoFn's are active? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436215155



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());

Review comment:
       (I am mostly leaning towards not doing this, but lmk what you think)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-643680583


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436214157



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());

Review comment:
       I guess this doesn't make sense, but - would it help to include the Row Schema that we tried(and failed) to use for this JSON string? Some users may not needed, and others can add it themselves in the downstream ParDo - but it's possible it may help. Thoughts?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());
+                      }
+                    }
+                  })
+              .withOutputTags(main, TupleTagList.of(deadLetter)));

Review comment:
       you can add the schema for the outputs here, so that users do not need to add them themselves?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {

Review comment:
       `PCollectionTuple` is not great (though it's what we have after all) - do you think it could make sense to return a custom, more elegant `Result` type? See this PR, where something like this was added for Map-type ptransforms: https://github.com/apache/beam/pull/7736/files#diff-895d512e486834dfd818eec82c75b2c3R122-R181

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new TupleTag<Row>() {};
+
   public static PTransform<PCollection<String>, PCollection<Row>> withSchema(Schema rowSchema) {
     return JsonToRowFn.forSchema(rowSchema);
   }
 
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects of form: {@link JsonToRow#ERROR_ROW_SCHEMA} line : The original json string err :
+   * The error message from the parsing function.
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>{@link JsonToRow#MAIN_TUPLE_TAG}
+   *
+   * <p>{@Code PCollection<Row> personRows =
+   * results.get(JsonToRow.MAIN_TUPLE_TAG).setRowSchema(personSchema)}

Review comment:
       Maybe add the schema for the PCollections in `expand` so that users won't have to add them manually?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-642989434


   I think it would be cleaner to not have a separate output tag. You don't need to use a nullable field - you can simple only add the failure reason field to the output schema if needed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r436220739



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new TupleTag<Row>() {};
+
   public static PTransform<PCollection<String>, PCollection<Row>> withSchema(Schema rowSchema) {
     return JsonToRowFn.forSchema(rowSchema);
   }
 
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects of form: {@link JsonToRow#ERROR_ROW_SCHEMA} line : The original json string err :
+   * The error message from the parsing function.
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>{@link JsonToRow#MAIN_TUPLE_TAG}
+   *
+   * <p>{@Code PCollection<Row> personRows =
+   * results.get(JsonToRow.MAIN_TUPLE_TAG).setRowSchema(personSchema)}

Review comment:
       +1. If you output.a Row, you should be setting the schema in your transform.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;

Review comment:
       make final

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+

Review comment:
       would be nicer to make these field names configurable, though with defaults.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {

Review comment:
       I think it would be cleaner to wrap this in a custom result class and not expose the TupleTags to users. Look org.apache.beam.sdk.io.gcp.bigquery.WriteResult for an example.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {

Review comment:
       Why not use injected parameters instead of ProcessContext?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new TupleTag<Row>() {};
+

Review comment:
       give these tuple tags real names




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437762165



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {
+    return JsonToRowWithErrFn.forSchema(rowSchema);
+  }
+
+  @AutoValue
+  abstract static class JsonToRowWithErrFn extends PTransform<PCollection<String>, ParseResult> {
+
+    private Pipeline pipeline;
+
+    private PCollection<Row> parsedLine;
+    private PCollection<Row> failedParse;
+    private PCollection<Row> failedParseWithErr;
+
+    private static final String LINE_FIELD_NAME = "line";
+    private static final String ERROR_FIELD_NAME = "err";
+
+    public static final Schema ERROR_ROW_SCHEMA =
+        Schema.of(Field.of(LINE_FIELD_NAME, FieldType.STRING));
+
+    public static final Schema ERROR_ROW_WITH_ERR_MSG_SCHEMA =
+        Schema.of(
+            Field.of(LINE_FIELD_NAME, FieldType.STRING),
+            Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+    static final TupleTag<Row> PARSED_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE = new TupleTag<Row>() {};
+    static final TupleTag<Row> PARSE_ERROR_LINE_WITH_MSG = new TupleTag<Row>() {};
+
+    public abstract Schema getSchema();
+
+    public abstract String getLineFieldName();
+
+    public abstract String getErrorFieldName();
+
+    public abstract boolean getExtendedErrorInfo();
+
+    PCollection<Row> deadLetterCollection;
+
+    public abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setSchema(Schema value);
+
+      public abstract Builder setLineFieldName(String value);
+
+      public abstract Builder setErrorFieldName(String value);
+
+      public abstract Builder setExtendedErrorInfo(boolean value);
+
+      public abstract JsonToRowWithErrFn build();
+    }
+
+    public static JsonToRowWithErrFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new AutoValue_JsonToRow_JsonToRowWithErrFn.Builder()
+          .setSchema(rowSchema)
+          .setExtendedErrorInfo(false)
+          .setLineFieldName(LINE_FIELD_NAME)
+          .setErrorFieldName(ERROR_FIELD_NAME)
+          .build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn withExtendedErrorInfo() {
+      return this.toBuilder().setExtendedErrorInfo(true).build();
+    }
+
+    /**
+     * Sets the field name for the line field in the returned Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setLineField(String lineField) {
+      return this.toBuilder().setLineFieldName(lineField).build();
+    }
+
+    /**
+     * Adds the error message to the returned error Row.
+     *
+     * @return {@link JsonToRow}
+     */
+    public JsonToRowWithErrFn setErrorField(String errorField) {
+      if (!this.getExtendedErrorInfo()) {
+        throw new IllegalArgumentException(
+            "This option is only available with Extended Error Info.");
+      }
+      return this.toBuilder().setErrorFieldName(errorField).build();
+    }
+
+    @Override
+    public ParseResult expand(PCollection<String> jsonStrings) {
+
+      PCollectionTuple result =
+          jsonStrings.apply(
+              ParDo.of(new ParseWithError(this.getSchema(), getExtendedErrorInfo()))
+                  .withOutputTags(
+                      PARSED_LINE,
+                      TupleTagList.of(PARSE_ERROR_LINE).and(PARSE_ERROR_LINE_WITH_MSG)));
+
+      this.parsedLine = result.get(PARSED_LINE).setRowSchema(this.getSchema());
+      this.failedParse =
+          result.get(PARSE_ERROR_LINE).setRowSchema(JsonToRowWithErrFn.ERROR_ROW_SCHEMA);
+      this.failedParseWithErr =
+          result
+              .get(PARSE_ERROR_LINE_WITH_MSG)
+              .setRowSchema(JsonToRowWithErrFn.ERROR_ROW_WITH_ERR_MSG_SCHEMA);
+
+      return ParseResult.result(this);
+    }
+
+    private static class ParseWithError extends DoFn<String, Row> {
+      private transient volatile @Nullable ObjectMapper objectMapper;
+      Schema schema;
+      Boolean withExtendedErrorInfo;
+
+      ParseWithError(Schema schema, Boolean withExtendedErrorInfo) {
+        this.schema = schema;
+        this.withExtendedErrorInfo = withExtendedErrorInfo;
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        try {
+
+          context.output(jsonToRow(objectMapper(), context.element()));
+
+        } catch (Exception ex) {
+
+          if (withExtendedErrorInfo) {
+            context.output(
+                PARSE_ERROR_LINE_WITH_MSG,

Review comment:
       One advantage of being explicit is that the program will fail at pipeline creation time if the option is accidentally forgotten about.  Otherwise they could get a NPE. 
   
   But not sure if that protection is valuable enough, or if the simplification of the API is better? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-649232286


   @reuvenlax nudge :-)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645146110


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #11929:
URL: https://github.com/apache/beam/pull/11929


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-643706786


   @pabloem looks like the failures are from unrelated code, flaky tests?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-643807255


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-644898722


   It looks like SQL PostCommit did pass but was never registered here: https://builds.apache.org/job/beam_PostCommit_SQL_PR/325/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645560856


   Yup, same spotbugs issue. Reza, I think it's related to the AutoValue classes, but I'm not quite sure what it means. Can you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-642330786


   FYI `PubsubJsonTableProvider` has support for writing to a dead letter pubsub topic: https://github.com/apache/beam/blob/d5dd47b47cbdf0739ac1a28cb8fbd06becbdbae7/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java#L160. Maybe there's some duplicate logic we can remove after this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-644312387


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-642983187


   I have no further comments. LGTM. Thanks @rezarokni 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645785563


   All passed, I assume the AutoValue missing pieces was what was stopping the report showing here. When I ran it locally I got the report with the specific error.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645708355


   @pabloem It was actually a serious bug. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-647031913


   this LGTM- thanks for fixing Reza!
   @reuvenlax any more comments? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-642368157


   @reuvenlax @pabloem 
   The latest commit covers most of the points now, with the exception of the three open questions I raised.
   - Return of the Schema that failed. @pabloem 
   - Discussion about withExtendedErrMsg @reuvenlax 
   - use of transient with ObjectMapper @reuvenlax 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-639885056


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645519718


   I'm not sure if this is due to this change, but there's a spotbugs error on precommit: https://scans.gradle.com/s/k6qd4pwlitiie/console-log?task=:sdks:java:core:spotbugsMain


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645444786


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645008717


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437807053



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +131,267 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects within a {@link ParseResult}
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>ParseResult results = jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA));
+   *
+   * <p>{@link ParseResult#getResults()}
+   *
+   * <p>{@Code PCollection<Row> personRows = results.getResults()}
+   *
+   * <p>{@link ParseResult#getFailedToParseLines()}
+   *
+   * <p>{@Code PCollection<Row> errorsLines = results.getFailedToParseLines()}
+   *
+   * <p>To access the reason for the failure you will need to first enable extended error reporting.
+   * {@Code ParseResult results =
+   * jsonPersons.apply(JsonToRow.withDeadLetter(PERSON_SCHEMA).withExtendedErrorInfo()); }
+   *
+   * <p>{@link ParseResult#getFailedToParseLinesWithErr()}
+   *
+   * <p>{@Code PCollection<Row> errorsLinesWithErrMsg = results.getFailedToParseLines()}
+   *
+   * @return {@link JsonToRowWithErrFn}
+   */
+  @Experimental(Kind.SCHEMAS)
+  public static JsonToRowWithErrFn withDeadLetter(Schema rowSchema) {

Review comment:
       Changed to withExceptionReporting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-643561830


   @pabloem Fixed checkstyle in test, sorry about that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-643505627


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-643680819


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-643476319


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-643021410


   @reuvenlax Ahh yes very nice... changed in latest commit. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-642367042


   @TheNeuralBit If you have time, would be great if you could scan the ParseResult and see if the API would suite all the things PubSubIO needs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-645444709


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-644315697


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437806981



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437760550



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());
+                      }
+                    }
+                  })
+              .withOutputTags(main, TupleTagList.of(deadLetter)));

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-644361867


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r437411646



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new TupleTag<Row>() {};
+
   public static PTransform<PCollection<String>, PCollection<Row>> withSchema(Schema rowSchema) {
     return JsonToRowFn.forSchema(rowSchema);
   }
 
+  /**
+   * Enable Dead letter support. If this value is set errors in the parsing layer are returned as
+   * Row objects of form: {@link JsonToRow#ERROR_ROW_SCHEMA} line : The original json string err :
+   * The error message from the parsing function.
+   *
+   * <p>You can access the results by using:
+   *
+   * <p>{@link JsonToRow#MAIN_TUPLE_TAG}
+   *
+   * <p>{@Code PCollection<Row> personRows =
+   * results.get(JsonToRow.MAIN_TUPLE_TAG).setRowSchema(personSchema)}

Review comment:
       Fixed.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());

Review comment:
       I think it would be nice to add it as SideInput, but I think for this first version maybe we stay with basic error message?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {
+
+      return jsonStrings.apply(
+          ParDo.of(
+                  new DoFn<String, Row>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext context) {
+                      try {
+                        context.output(jsonToRow(objectMapper(), context.element()));
+                      } catch (Exception ex) {
+                        context.output(
+                            deadLetter,
+                            Row.withSchema(ERROR_ROW_SCHEMA)
+                                .addValue(context.element())
+                                .addValue(ex.getMessage())
+                                .build());

Review comment:
       I think it would be nice to add it as SideInput, but I think for this first version maybe we stay with basic error message?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {
+    private transient volatile @Nullable ObjectMapper objectMapper;
+    private Schema schema;
+    private static final String METRIC_NAMESPACE = "JsonToRowFn";
+    private static final String DEAD_LETTER_METRIC_NAME = "JsonToRowFn_ParseFailure";
+
+    private Distribution jsonConversionErrors =
+        Metrics.distribution(METRIC_NAMESPACE, DEAD_LETTER_METRIC_NAME);
+
+    public static final TupleTag<Row> main = MAIN_TUPLE_TAG;
+    public static final TupleTag<Row> deadLetter = DEAD_LETTER_TUPLE_TAG;
+
+    PCollection<Row> deadLetterCollection;
+
+    static JsonToRowWithFailureCaptureFn forSchema(Schema rowSchema) {
+      // Throw exception if this schema is not supported by RowJson
+      RowJson.verifySchemaSupported(rowSchema);
+      return new JsonToRowWithFailureCaptureFn(rowSchema);
+    }
+
+    private JsonToRowWithFailureCaptureFn(Schema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public PCollectionTuple expand(PCollection<String> jsonStrings) {

Review comment:
       Created a ParseResult object.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+

Review comment:
       Done.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -116,4 +157,65 @@ private ObjectMapper objectMapper() {
       return this.objectMapper;
     }
   }
+
+  static class JsonToRowWithFailureCaptureFn
+      extends PTransform<PCollection<String>, PCollectionTuple> {

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-644957521


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-644898842


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-642959812


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] rezarokni commented on a change in pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
rezarokni commented on a change in pull request #11929:
URL: https://github.com/apache/beam/pull/11929#discussion_r438470939



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
##########
@@ -72,10 +79,44 @@
 @Experimental(Kind.SCHEMAS)
 public class JsonToRow {
 
+  private static final String LINE_FIELD_NAME = "line";
+  private static final String ERROR_FIELD_NAME = "err";
+
+  public static final Schema ERROR_ROW_SCHEMA =
+      Schema.of(
+          Field.of(LINE_FIELD_NAME, FieldType.STRING),
+          Field.of(ERROR_FIELD_NAME, FieldType.STRING));
+
+  public static final TupleTag<Row> MAIN_TUPLE_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> DEAD_LETTER_TUPLE_TAG = new TupleTag<Row>() {};
+

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #11929:
URL: https://github.com/apache/beam/pull/11929#issuecomment-644361692


   Run SQL PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org