You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:24:44 UTC

[GitHub] [beam] zhoufek opened a new pull request #15037: [BEAM-12474] Write parsing errors to dead-letter topic

zhoufek opened a new pull request #15037:
URL: https://github.com/apache/beam/pull/15037


   Current behavior of PubsubIO is to ignore parsing errors, relying on Pub/Sub dead-letter configuration to eventually send the message to a dead-letter topic. However, the message sent to the dead-letter topic loses all info on the error (though error logs should exist). Since parsing errors are also unlikely transient, it wastes a lot of resources with retries, and it can lead to infinite retry loops if no dead-letter is set up. This PR attempts to mitigate these issues by allow PubsubIO.Read to be configured with a dead-letter topic.
   
   PubsubTestClient also needed to be updated to support testing this feature.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-866069521


   Run Whitespace PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665623749



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
     }
   }
 
+  /**
+   * A handler that holds onto the {@link Throwable} that led to the exception, returning it along
+   * with the original value as a {@link KV}.
+   *
+   * <p>Extends {@link SimpleFunction} so that full type information is captured. {@link KV} and
+   * {@link ComparableThrowable} coders can be easily inferred by Beam, so coder inference can be
+   * successfully applied if the consuming transform passes type information to the failure
+   * collection's {@link TupleTag}. This may require creating an instance of an anonymous inherited
+   * class rather than of this class directly.
+   */
+  public static class ThrowableHandler<T>

Review comment:
       Yeah, this is just for dealing with boilerplate. More specifically, since we offer a way to get the exception's values as a map, it seemed good to provide a handler that preserves the original. Otherwise, it seems like we're heavily encouraging using the map when it may not always be the best option.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-868563944


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r670510940



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-882515656


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r670488798



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -881,8 +930,60 @@ public String toString() {
               getIdAttribute(),
               getNeedsAttributes(),
               getNeedsMessageId());
-      PCollection<T> read =
-          input.apply(source).apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn()));
+
+      PCollection<T> read;
+      PCollection<PubsubMessage> preParse = input.apply(source);
+      TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
+      if (getDeadLetterTopicProvider() == null) {
+        read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
+      } else {
+        Result<PCollection<T>, KV<PubsubMessage, ComparableThrowable>> result =
+            preParse.apply(
+                "PubsubIO.Read/Map/Parse-Incoming-Messages",
+                MapElements.into(typeDescriptor)
+                    .via(getParseFn())
+                    .exceptionsVia(new WithFailures.ThrowableHandler<PubsubMessage>() {}));
+        read = result.output();
+
+        // Write out failures to the provided dead-letter topic.
+        result
+            .failures()

Review comment:
       Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-880738062


   Moved `EncodableThrowable` to values/ and squashed the commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-875788050


   Failure is related to BEAM-10955. PR15119 needs to be merged to disable the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665629900



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {
+  private Throwable throwable;
+
+  private ComparableThrowable() {
+    // Can't set this to null without adding a pointless @Nullable annotation to the field. It also
+    // needs to be set from the constructor to avoid a checkstyle violation.
+    this.throwable = new Throwable();
+  }
+
+  /** Wraps {@code throwable} and returns the result. */
+  public static ComparableThrowable forThrowable(Throwable throwable) {
+    ComparableThrowable comparable = new ComparableThrowable();
+    comparable.throwable = throwable;
+    return comparable;
+  }
+
+  /** Returns the underlying {@link Throwable}. */
+  public Throwable throwable() {
+    return throwable;
+  }
+
+  @Override
+  public int hashCode() {
+    return throwable.hashCode();
+  }
+
+  @Override
+  public boolean equals(@Nullable Object obj) {
+    if (!(obj instanceof ComparableThrowable)) {
+      return false;
+    }
+    Throwable other = ((ComparableThrowable) obj).throwable;
+
+    boolean currentLevelEqual =
+        throwable.getClass().isInstance(other)

Review comment:
       So just `return throwable.getClass().equals(other.getClass())` would be enough? I guess it could work, since this is just for after a built-in type has been deserialized. (And if Java is messing up serialization/deserialization of built-in types, we have far greater issues.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-880908256


   The only failure is related to BEAM-10955. It seems to be coming in and out of sickbay, so I don't think it should block this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-882514160






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-882726208


   Thanks! Sorry for the slow review here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-882514160


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r669908020



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
     }
   }
 
+  /**
+   * A handler that holds onto the {@link Throwable} that led to the exception, returning it along
+   * with the original value as a {@link KV}.
+   *
+   * <p>Extends {@link SimpleFunction} so that full type information is captured. {@link KV} and
+   * {@link ComparableThrowable} coders can be easily inferred by Beam, so coder inference can be
+   * successfully applied if the consuming transform passes type information to the failure
+   * collection's {@link TupleTag}. This may require creating an instance of an anonymous inherited
+   * class rather than of this class directly.
+   */
+  public static class ThrowableHandler<T>
+      extends SimpleFunction<ExceptionElement<T>, KV<T, ComparableThrowable>> {
+    @Override
+    public KV<T, ComparableThrowable> apply(ExceptionElement<T> f) {
+      return KV.of(f.element(), ComparableThrowable.forThrowable(f.exception()));

Review comment:
       Pipeline update checks if all coders are the same. Technically only coders used for persisted state in a shuffle. So if the binary encoding of the coder changes, it will be rejected for update. It was my mistake - it is actually `CustomCoder` that may have spurious incompatibilities. For `SerializableCoder` there may be similar risk because it contains the serialized `Class` object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r669906553



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {
+  private Throwable throwable;
+
+  private ComparableThrowable() {
+    // Can't set this to null without adding a pointless @Nullable annotation to the field. It also
+    // needs to be set from the constructor to avoid a checkstyle violation.
+    this.throwable = new Throwable();
+  }
+
+  /** Wraps {@code throwable} and returns the result. */
+  public static ComparableThrowable forThrowable(Throwable throwable) {
+    ComparableThrowable comparable = new ComparableThrowable();
+    comparable.throwable = throwable;
+    return comparable;
+  }
+
+  /** Returns the underlying {@link Throwable}. */
+  public Throwable throwable() {
+    return throwable;
+  }
+
+  @Override
+  public int hashCode() {
+    return throwable.hashCode();
+  }
+
+  @Override
+  public boolean equals(@Nullable Object obj) {
+    if (!(obj instanceof ComparableThrowable)) {
+      return false;
+    }
+    Throwable other = ((ComparableThrowable) obj).throwable;
+
+    boolean currentLevelEqual =
+        throwable.getClass().isInstance(other)

Review comment:
       Yea




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-880864790


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r669908356



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -881,8 +930,60 @@ public String toString() {
               getIdAttribute(),
               getNeedsAttributes(),
               getNeedsMessageId());
-      PCollection<T> read =
-          input.apply(source).apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn()));
+
+      PCollection<T> read;
+      PCollection<PubsubMessage> preParse = input.apply(source);
+      TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
+      if (getDeadLetterTopicProvider() == null) {
+        read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
+      } else {
+        Result<PCollection<T>, KV<PubsubMessage, ComparableThrowable>> result =
+            preParse.apply(
+                "PubsubIO.Read/Map/Parse-Incoming-Messages",
+                MapElements.into(typeDescriptor)
+                    .via(getParseFn())
+                    .exceptionsVia(new WithFailures.ThrowableHandler<PubsubMessage>() {}));
+        read = result.output();
+
+        // Write out failures to the provided dead-letter topic.
+        result
+            .failures()

Review comment:
       Yea the `DirectRunner` does a lot of extra encoding/decoding to exercise more code paths.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-866392855


   Run RAT PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665636511



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {

Review comment:
       Maybe `EncodableThrowable`? Since `Throwable` is already a serializable type, and the real purpose is for safer use with coders, which use encode/decode.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665634872



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -881,8 +930,60 @@ public String toString() {
               getIdAttribute(),
               getNeedsAttributes(),
               getNeedsMessageId());
-      PCollection<T> read =
-          input.apply(source).apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn()));
+
+      PCollection<T> read;
+      PCollection<PubsubMessage> preParse = input.apply(source);
+      TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
+      if (getDeadLetterTopicProvider() == null) {
+        read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
+      } else {
+        Result<PCollection<T>, KV<PubsubMessage, ComparableThrowable>> result =
+            preParse.apply(
+                "PubsubIO.Read/Map/Parse-Incoming-Messages",
+                MapElements.into(typeDescriptor)
+                    .via(getParseFn())
+                    .exceptionsVia(new WithFailures.ThrowableHandler<PubsubMessage>() {}));
+        read = result.output();
+
+        // Write out failures to the provided dead-letter topic.
+        result
+            .failures()

Review comment:
       At least during testing, I noticed a few log messages related to coder mutations when using some raw exception type, which is why I added the `ComparableThrowable`. So I think encoding/decoding was occurring, or could that just be attributed to the testing pipeline?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-875693142


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-882514160






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles merged pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles merged pull request #15037:
URL: https://github.com/apache/beam/pull/15037


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-868533581


   Added a comment to BEAM-10955 related to the Flink runner failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-866973006


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-882573763


   Failing tests seem related to BEAM-5172 and BEAM-10955


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-866842227


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-866392620


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665636511



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {

Review comment:
       Maybe `EncodableThrowable`? Since `Throwable` is already a serializable type, and the real purpose is for safer use with coders, which use encode/decode.
   
   Also, I'll be changing the class to be final to avoid potential issues with `equals` inheritance. (I don't think I had any reason to not mark it final. I think I just forgot to do so.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665631991



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
     }
   }
 
+  /**
+   * A handler that holds onto the {@link Throwable} that led to the exception, returning it along
+   * with the original value as a {@link KV}.
+   *
+   * <p>Extends {@link SimpleFunction} so that full type information is captured. {@link KV} and
+   * {@link ComparableThrowable} coders can be easily inferred by Beam, so coder inference can be
+   * successfully applied if the consuming transform passes type information to the failure
+   * collection's {@link TupleTag}. This may require creating an instance of an anonymous inherited
+   * class rather than of this class directly.
+   */
+  public static class ThrowableHandler<T>
+      extends SimpleFunction<ExceptionElement<T>, KV<T, ComparableThrowable>> {
+    @Override
+    public KV<T, ComparableThrowable> apply(ExceptionElement<T> f) {
+      return KV.of(f.element(), ComparableThrowable.forThrowable(f.exception()));

Review comment:
       Yes, it's using `SerializableCoder`. Should there be a custom coder to handle this, or should it be explicitly registered?
   
   Sorry, I'm not that familiar with how pipeline updates can be affected by changes like this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r669906276



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;

Review comment:
       Yea I think under values is an OK place.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-880823168






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-870580912


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-867036091


   Something seems wrong with pre-commit. It won't run. The only failure on the last run was a flake in the FlinkRunner test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-870580912


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-882726208


   Thanks! Sorry for the slow review here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-882726208


   Thanks! Sorry for the slow review here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-868455191


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665639355



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like

Review comment:
       Haha! It was supposed to be "many".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles merged pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles merged pull request #15037:
URL: https://github.com/apache/beam/pull/15037


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles merged pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles merged pull request #15037:
URL: https://github.com/apache/beam/pull/15037


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-867039758


   R: @TheNeuralBit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-882514222


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-864238756


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r657330871



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {

Review comment:
       Willing to accept suggestions on a different name. The best I could come up with is ComparableThrowable, but that might be confusing due to not implementing the Comparable interface. I guess implementing the interface is itself an option, but I don't think there's much value in that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-867249312


   I think I'm going to be swamped until I go on leave for a while on July 1. Maybe @kennknowles could review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r670510740



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
     }
   }
 
+  /**
+   * A handler that holds onto the {@link Throwable} that led to the exception, returning it along
+   * with the original value as a {@link KV}.
+   *
+   * <p>Extends {@link SimpleFunction} so that full type information is captured. {@link KV} and
+   * {@link ComparableThrowable} coders can be easily inferred by Beam, so coder inference can be
+   * successfully applied if the consuming transform passes type information to the failure
+   * collection's {@link TupleTag}. This may require creating an instance of an anonymous inherited
+   * class rather than of this class directly.
+   */
+  public static class ThrowableHandler<T>
+      extends SimpleFunction<ExceptionElement<T>, KV<T, ComparableThrowable>> {
+    @Override
+    public KV<T, ComparableThrowable> apply(ExceptionElement<T> f) {
+      return KV.of(f.element(), ComparableThrowable.forThrowable(f.exception()));

Review comment:
       Ack.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-867747918


   Sure. Enjoy your leave!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665622634



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;

Review comment:
       Is there a better place for it? It's technically user-facing through the `WithFailures` transform. I was thinking of putting it under `org.apache.beam.sdk.values`, but everything under there seemed more generally useful than this, which is targeting a very specific use case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-875689459


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r666196403



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like

Review comment:
       Updated the doc comment, so this is no longer relevant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r665587222



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;

Review comment:
       Noting that this namespace is for non-user-facing classes that have no backwards compatibility guarantees.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
     }
   }
 
+  /**
+   * A handler that holds onto the {@link Throwable} that led to the exception, returning it along
+   * with the original value as a {@link KV}.
+   *
+   * <p>Extends {@link SimpleFunction} so that full type information is captured. {@link KV} and
+   * {@link ComparableThrowable} coders can be easily inferred by Beam, so coder inference can be
+   * successfully applied if the consuming transform passes type information to the failure
+   * collection's {@link TupleTag}. This may require creating an instance of an anonymous inherited
+   * class rather than of this class directly.
+   */
+  public static class ThrowableHandler<T>

Review comment:
       I'm not sure if you know this, but you can do `new SimpleFunction<Foo, Bar>(<lambda>) {}` and the curly braces at the end cause it to be a subclass with type information preserved. I think having this class is fine, too. But if it is just boilerplate to preserve types then you might be able to use the inline anonymous subclass trick.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {
+  private Throwable throwable;
+
+  private ComparableThrowable() {
+    // Can't set this to null without adding a pointless @Nullable annotation to the field. It also
+    // needs to be set from the constructor to avoid a checkstyle violation.
+    this.throwable = new Throwable();
+  }
+
+  /** Wraps {@code throwable} and returns the result. */
+  public static ComparableThrowable forThrowable(Throwable throwable) {
+    ComparableThrowable comparable = new ComparableThrowable();
+    comparable.throwable = throwable;
+    return comparable;
+  }
+
+  /** Returns the underlying {@link Throwable}. */
+  public Throwable throwable() {
+    return throwable;
+  }
+
+  @Override
+  public int hashCode() {
+    return throwable.hashCode();
+  }
+
+  @Override
+  public boolean equals(@Nullable Object obj) {
+    if (!(obj instanceof ComparableThrowable)) {
+      return false;
+    }
+    Throwable other = ((ComparableThrowable) obj).throwable;
+
+    boolean currentLevelEqual =
+        throwable.getClass().isInstance(other)

Review comment:
       I think you want to check if their classes are equal. This will be fine for mutation detection, etc, which is not really using equality except to check that nothing broke.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {

Review comment:
       Yea I was going to comment on this. It is OK to be clunky. Perhaps `ThrowableWithEquals`. The problem of course is that inheriting concrete implementations of `equals` carries risk. Subclasses will most likely not implement equals so if they carry additional information they will have an incorrect implementation. (this is mitigated when they have an interface like `List` that specifies what equality means, and equality is defined to be in terms of the interface)

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {

Review comment:
       Having read the rest of the PR, I think the reason for this is to make it a friendly Beam element, so `SerializableThrowable` might be OK.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
##########
@@ -881,8 +930,60 @@ public String toString() {
               getIdAttribute(),
               getNeedsAttributes(),
               getNeedsMessageId());
-      PCollection<T> read =
-          input.apply(source).apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn()));
+
+      PCollection<T> read;
+      PCollection<PubsubMessage> preParse = input.apply(source);
+      TypeDescriptor<T> typeDescriptor = new TypeDescriptor<T>() {};
+      if (getDeadLetterTopicProvider() == null) {
+        read = preParse.apply(MapElements.into(typeDescriptor).via(getParseFn()));
+      } else {
+        Result<PCollection<T>, KV<PubsubMessage, ComparableThrowable>> result =
+            preParse.apply(
+                "PubsubIO.Read/Map/Parse-Incoming-Messages",
+                MapElements.into(typeDescriptor)
+                    .via(getParseFn())
+                    .exceptionsVia(new WithFailures.ThrowableHandler<PubsubMessage>() {}));
+        read = result.output();
+
+        // Write out failures to the provided dead-letter topic.
+        result
+            .failures()

Review comment:
       OK so seeing this, I _think_ there is no update problem because the throwable is never actually serialized.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithFailures.java
##########
@@ -92,6 +93,24 @@
     }
   }
 
+  /**
+   * A handler that holds onto the {@link Throwable} that led to the exception, returning it along
+   * with the original value as a {@link KV}.
+   *
+   * <p>Extends {@link SimpleFunction} so that full type information is captured. {@link KV} and
+   * {@link ComparableThrowable} coders can be easily inferred by Beam, so coder inference can be
+   * successfully applied if the consuming transform passes type information to the failure
+   * collection's {@link TupleTag}. This may require creating an instance of an anonymous inherited
+   * class rather than of this class directly.
+   */
+  public static class ThrowableHandler<T>
+      extends SimpleFunction<ExceptionElement<T>, KV<T, ComparableThrowable>> {
+    @Override
+    public KV<T, ComparableThrowable> apply(ExceptionElement<T> f) {
+      return KV.of(f.element(), ComparableThrowable.forThrowable(f.exception()));

Review comment:
       Unless I missed something, this output will be inferred to use `KvCoder.of(<T coder>. SerializableCoder)`. So this may create problems for pipeline update.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like

Review comment:
       `man child classes`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on pull request #15037:
URL: https://github.com/apache/beam/pull/15037#issuecomment-867748074


   R: @kennknowles


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] zhoufek commented on a change in pull request #15037: [BEAM-12474] Write PubsubIO parsing errors to dead-letter topic

Posted by GitBox <gi...@apache.org>.
zhoufek commented on a change in pull request #15037:
URL: https://github.com/apache/beam/pull/15037#discussion_r666195862



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/ComparableThrowable.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+/**
+ * A wrapper around a {@link Throwable} for use with coders.
+ *
+ * <p>The {@link Throwable} class is serializable. However, it, along with man child classes like
+ * {@link Exception}, does not override {@link Object#equals(Object)}. As a result, t1.equals(t2)
+ * will return false if t2 is a deserialized instance of serialized t1. This makes it appear as if
+ * coders are mutating the value, which can lead to things like log spam. This wrapper overrides
+ * {@link Object#equals(Object)} to get around this issue. The hash code remains the same as the
+ * underlying {@link Throwable}.
+ *
+ * <p>The equality comparison is transitive, meaning that for two {@link Throwable}s t1 and t2, the
+ * causes must be equal. This can occur either because they are both null or because they are both
+ * equal after being wrapped.
+ *
+ * <p>Note that this is simply a best effort based on properties like instance type, stack trace,
+ * message, and cause. It cannot guarantee that the state that led to the exception is the same,
+ * unless it is fully captured in the message, nor can it differentiate between two exceptions
+ * thrown at different times or in different processes. For this reason, this is not suitable for a
+ * general-purpose {@link Throwable} equality comparison. It merely exists to try to avoid false
+ * positives in a mutation check for coders.
+ *
+ * <p>Due to the above, this does not support comparison to a raw {@link Throwable}.
+ */
+public class ComparableThrowable implements Serializable {

Review comment:
       Going with `EncodableThrowable` for now. If you think `SerializableThrowable` is better, please let me know.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org