You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/24 21:27:01 UTC

[GitHub] [beam] egalpin opened a new pull request #15381: [BEAM-10990] Elasticsearch response filtering

egalpin opened a new pull request #15381:
URL: https://github.com/apache/beam/pull/15381


   Adds the ability to prevent infinite retries with non-transient Elasticsearch write failures by providing a user-configurable setting called (for now, I'm not certain the name is perfect) `withThrowWriteFailures`, which is `true` by default to maintain backward compatibility.
   
   If `withThrowWriteFailures` is set to true, the response from Elasticsearch Bulk API will be used to capture the result of persisting a document (or deleting it). The order of Bulk API response is guaranteed to be in the same order as the Bulk API request[1], so we can stitch together what the write result of a given input document was.
   
   This PR introduces a new class `WriteSummary`. This class is used to collect the context of a document as it passes from raw input document (i.e. the PCollection being sent to ElasticsearchIO.Write/ElasticsearchIO.DocToBulk), the Bulk Directive which resulted from transform settings and the input document (ex. delete directive, upsert, scripted upsert, etc), as well as whether the document resulted in an error from ES. By maintaining all the context/ancestry of the document, users can get at the input document and its resulting error, for example. Without maintaining the ancestry, we could only return the Bulk Directive which would be less helpful in many cases.
   
   [1] https://discuss.elastic.co/t/ordering-of-responses-in-the-bulk-api/13264
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot edited a comment on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot edited a comment on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-938449865


   > @echauchot It's highly possible that I have a fundamental misunderstanding about how to handle window data. I'm going to try to outline my goals and challenges, and take the proposed implementation out of focus.
   > 
   > Goals:
   > 
   > * Change BulkIO/Write to output PCollectionTuple rather than PDone, in order to support reporting the status of indexing each input document
   > * Leave windows/timestampes/etc of input data entirely unaltered
   > 
   > Challenges:
   > 
   > * BulkIOBaseFn relies on buffering inputs, either using bundles or Stateful specs
   > * BulkIOBaseFn#finishBundle() must be called to ensure that any buffered inputs are sent to ES, and as of this PR, output to the PCollectionTuple
   > * DoFn.FinishBundle mehtods can accept a [FinishBundleContext](https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/transforms/DoFn.FinishBundleContext.html) in order to output elements
   > * FinishBundleContext output methods all require explicit specification of a BoundedWindow instance
   > 
   > I got a bit stuck on that last point. My impression was that in order to ensure buffered docs' results were output, and in order to leave those elements' windows unaltered, I needed to keep track of the windows to which those elements belong so that they could be explicitly passed to FinishBundleContext#output.
   > 
   > I'd definitely be keen to learn more about how to handle windows and challenge my assumptions here. Thanks for your time @echauchot in reviewing and teaching.
   
   Just to write here what we discussed privately yesterday (Apache way: what did not happen publicly did not happen at all):
   - My bad, I did not know about the `DoFn#finishBundle()`  signature change. It now forces specifying a window in the output. I first thought that dealing with windows was not needed and brought unnecessary complexity but it seems it is mandatory :smile: 
   but I hope it is ony temporary until `OutputReceiver` is fleshed out.
   - I took a look at the existing IOs that ouput data to PCollections: 
     - `FhirIO`: outputs to last seen window. Seems incorrect.
     - `HadoopFormatIO` and `BigqueryIO` store to a map keyed by window to then output per window similarly to what you do
     => So I guess the general window maintaining looks good. I need to look in more details at the code to give LGTM
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] timrobertson100 edited a comment on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
timrobertson100 edited a comment on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-910498732


   I'm sorry I haven't got time to complete a review, but I have read through the changes once, and it all looks well structured and has good attention to detail on comments, tests etc.
   
   The area I can't comment on immediately is the adapter and windowing behavior - if someone could confirm that section looks reasonable I think it looks good to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering and [BEAM-5172] Tries to reduce ES UTest flakiness

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-1060915619


   I believe the flushing logic has a bug where we are outputting to the wrong window: https://issues.apache.org/jira/browse/BEAM-14064


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-930949400


   > Thanks for the feedback @echauchot! I wasn't sure if re-windowing within an IO was acceptable but it sounds like that's the path to go. It's way less complex, and I definitely like that. I'll make those changes
   
   @egalpin I mean: you should not change the windows of the elements. In fact, you should not deal with the windows at all. Your problem is to join the input elements with the status (json and error) of the write. You could do:
   - join by doc id but that whould not be possible in case the id is not provided in the input doc (autogeneration)
   - you maintain the same order between input docs and WriteSummary objects so you could simple join index 1 with index 1, index 2 with index 2 etc...
   I don't get why you bothered with windows in the first place but maybe there is something I missed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on a change in pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on a change in pull request #15381:
URL: https://github.com/apache/beam/pull/15381#discussion_r731102223



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1636,11 +1806,15 @@ public void processElement(ProcessContext c) throws IOException {
    * cluster. This class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for
    * convenience and backward compatibility.
    */
-  public static class Write extends PTransform<PCollection<String>, PDone> {
+  public static class Write extends PTransform<PCollection<String>, PCollectionTuple> {

Review comment:
       This actually raised a question for me. In the case where a user selects that errors should be raised rather than returned via PCollectionTuple, and where there are no errors, is the PDone implicit? Is there an issue with no longer explicitly `returning PDone.in(...)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering and [BEAM-5172] Tries to reduce ES UTest flakiness

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-947666532


   Thanks for your review efforts and insights  @echauchot, it was a pleasure as always 🎉


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-904988980


   R: @jbonofre @timrobertson100 @echauchot
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #15381:
URL: https://github.com/apache/beam/pull/15381#discussion_r716701847



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1623,11 +1651,153 @@ public void setup() throws IOException {
 
       @ProcessElement
       public void processElement(ProcessContext c) throws IOException {
-        c.output(createBulkApiEntity(spec, c.element(), backendVersion));
+        String inputDoc = c.element();
+        String bulkDirective = createBulkApiEntity(spec, inputDoc, backendVersion);
+        c.output(
+            WriteSummary.create()
+                .withInputDoc(inputDoc)
+                .withBulkDirective(bulkDirective)
+                // N.B. Saving the element timestamp for later use allows for exactly emulating
+                // c.output(...) because c.output is equivalent to
+                // c.outputWithTimestamp(..., c.timestamp())
+                .withTimestamp(c.timestamp()));
       }
     }
   }
 
+  public static class WriteSummaryCoder extends AtomicCoder<WriteSummary> implements Serializable {
+    private static final WriteSummaryCoder INSTANCE = new WriteSummaryCoder();
+
+    private WriteSummaryCoder() {}
+
+    public static WriteSummaryCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(WriteSummary value, OutputStream outStream) throws IOException {
+      NullableCoder.of(StringUtf8Coder.of()).encode(value.getInputDoc(), outStream);
+      NullableCoder.of(StringUtf8Coder.of()).encode(value.getBulkDirective(), outStream);
+      BooleanCoder.of().encode(value.getHasError(), outStream);
+      NullableCoder.of(StringUtf8Coder.of()).encode(value.getResponseItemJson(), outStream);
+      NullableCoder.of(InstantCoder.of()).encode(value.getTimestamp(), outStream);
+    }
+
+    @Override
+    public WriteSummary decode(InputStream inStream) throws IOException {
+      String inputDoc = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
+      String bulkDirective = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
+      boolean hasError = BooleanCoder.of().decode(inStream);
+      String responseItemJson = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
+      Instant timestamp = NullableCoder.of(InstantCoder.of()).decode(inStream);
+
+      return WriteSummary.create()
+          .withInputDoc(inputDoc)
+          .withBulkDirective(bulkDirective)
+          .withHasError(hasError)
+          .withResponseItemJson(responseItemJson)
+          .withTimestamp(timestamp);
+    }
+  }
+
+  // Immutable POJO for maintaining various states of documents and their bulk representation, plus
+  // response from ES for the given document and the timestamp of the data
+  @DefaultCoder(WriteSummaryCoder.class)
+  @AutoValue
+  public abstract static class WriteSummary implements Serializable {

Review comment:
       As this class is instanciated before any write operation (in the DocToBulk transform) I'd call it simply `Document` and that would echo with `DocumentMetaData` existing class. WDYT ?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(
+      HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes, boolean throwWriteErrors)
       throws IOException {
 
+    List<WriteSummary> responses = new ArrayList<>();
+    int numErrors = 0;
     JsonNode searchResult = parseResponse(responseEntity);
-    boolean errors = searchResult.path("errors").asBoolean();
-    if (errors) {
-      int numErrors = 0;
-
-      StringBuilder errorMessages =
-          new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
-      JsonNode items = searchResult.path("items");
-      if (items.isMissingNode() || items.size() == 0) {
-        errorMessages.append(searchResult.toString());
-      }
-      // some items present in bulk might have errors, concatenate error messages
-      for (JsonNode item : items) {
-        JsonNode error = item.findValue("error");
-        if (error != null) {
-          // N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed.
-          String type = error.path("type").asText();
-          String reason = error.path("reason").asText();
-          String docId = item.findValue("_id").asText();
-          JsonNode causedBy = error.path("caused_by"); // May not be present
-          String cbReason = causedBy.path("reason").asText();
-          String cbType = causedBy.path("type").asText();
-
-          if (allowedErrorTypes == null
-              || (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) {
-            // 'error' and 'causedBy` fields are not null, and the error is not being ignored.
-            numErrors++;
-
-            errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
-
-            if (!causedBy.isMissingNode()) {
-              errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
-            }
+    StringBuilder errorMessages =
+        new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
+    JsonNode items = searchResult.path("items");
+
+    if (items.isMissingNode() || items.size() == 0) {
+      // This would only be expected in cases like connectivity issues or similar
+      errorMessages.append(searchResult.toString());
+      throw new RuntimeException(
+          String.format(
+              "'items' missing from Elasticsearch response: %s", errorMessages.toString()));
+    }
+
+    // some items present in bulk might have errors, concatenate error messages and record
+    // which items had errors
+    for (JsonNode item : items) {
+      WriteSummary result = WriteSummary.create().withResponseItemJson(item.toString());
+
+      JsonNode error = item.findValue("error");
+      if (error != null) {
+        // N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed.
+        String type = error.path("type").asText();
+        String reason = error.path("reason").asText();
+        String docId = item.findValue("_id").asText();
+        JsonNode causedBy = error.path("caused_by"); // May not be present
+        String cbReason = causedBy.path("reason").asText();
+        String cbType = causedBy.path("type").asText();
+
+        if (allowedErrorTypes == null
+            || (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) {
+          // 'error' and 'causedBy` fields are not null, and the error is not being ignored.
+          result = result.withHasError(true);
+          numErrors++;
+
+          errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
+
+          if (!causedBy.isMissingNode()) {
+            errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
           }
         }
       }
-      if (numErrors > 0) {
+      responses.add(result);
+    }
+
+    if (numErrors > 0) {
+      LOG.error(errorMessages.toString());
+      if (throwWriteErrors) {

Review comment:
       l like that this behavior is configurable !

##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
##########
@@ -141,6 +141,18 @@ public void testWriteWithAllowableErrors() throws Exception {
     elasticsearchIOTestCommon.testWriteWithAllowedErrors();
   }
 
+  @Test

Review comment:
       No need to test in ITest it is more of an UTest. ITests are for high load

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(
+      HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes, boolean throwWriteErrors)
       throws IOException {
 
+    List<WriteSummary> responses = new ArrayList<>();
+    int numErrors = 0;
     JsonNode searchResult = parseResponse(responseEntity);
-    boolean errors = searchResult.path("errors").asBoolean();
-    if (errors) {
-      int numErrors = 0;
-
-      StringBuilder errorMessages =
-          new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
-      JsonNode items = searchResult.path("items");
-      if (items.isMissingNode() || items.size() == 0) {
-        errorMessages.append(searchResult.toString());
-      }
-      // some items present in bulk might have errors, concatenate error messages
-      for (JsonNode item : items) {
-        JsonNode error = item.findValue("error");
-        if (error != null) {
-          // N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed.
-          String type = error.path("type").asText();
-          String reason = error.path("reason").asText();
-          String docId = item.findValue("_id").asText();
-          JsonNode causedBy = error.path("caused_by"); // May not be present
-          String cbReason = causedBy.path("reason").asText();
-          String cbType = causedBy.path("type").asText();
-
-          if (allowedErrorTypes == null
-              || (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) {
-            // 'error' and 'causedBy` fields are not null, and the error is not being ignored.
-            numErrors++;
-
-            errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
-
-            if (!causedBy.isMissingNode()) {
-              errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
-            }
+    StringBuilder errorMessages =
+        new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
+    JsonNode items = searchResult.path("items");
+
+    if (items.isMissingNode() || items.size() == 0) {
+      // This would only be expected in cases like connectivity issues or similar
+      errorMessages.append(searchResult.toString());
+      throw new RuntimeException(

Review comment:
       fail the pipeline in that case ?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1789,8 +1964,14 @@ public Write withAllowableResponseErrors(@Nullable Set<String> allowableResponse
       return this;
     }
 
+    /** Refer to {@link BulkIO#withThrowWriteErrors}. */
+    public Write withThrowWriteErrors(boolean throwWriteErrors) {

Review comment:
       Please add a hint to tell users that this parameter could be usefull in case the pipeline is in streaming mode to avoid infinite retry

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1636,11 +1806,15 @@ public void processElement(ProcessContext c) throws IOException {
    * cluster. This class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for
    * convenience and backward compatibility.
    */
-  public static class Write extends PTransform<PCollection<String>, PDone> {
+  public static class Write extends PTransform<PCollection<String>, PCollectionTuple> {

Review comment:
        I see the write tests do not change so there is no breaking change. I guess the user could chose whether he wants to process the PCollectionTuple or not. 

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(

Review comment:
       I would rename this method. Indeed, it can also report items written with sucess so I'd call it something like `createWriteReport`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-920079004


   @egalpin sorry for the late review, taking a look now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] timrobertson100 commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
timrobertson100 commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-910498732


   I'm sorry I haven't got time to complete a review, but I have read through the changes once, and it all looks well structured and has good attention to detail on comments etc.
   
   The area I can't comment on immediately is the adapter and windowing behavior - if someone could confirm that section looks reasonable I think it looks good to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-930485572


   Thanks for the feedback @echauchot! I wasn't sure if re-windowing within an IO was acceptable but it sounds like that's the path to go.  It's way less complex, and I definitely like that.  I'll make those changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-928377730


   Oh I had also forgotten about an alternative solution that would have less complexity, but I’m not sure how it might or might not adhere to best practices as a sink in the system. 
   
   We could instead combine inputs globally and forget about maintaining windows altogether. I believe this is what the BQ Write method does, and I think that not maintaining windows in a sink is generally sane 🤷‍♂️ 
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot edited a comment on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot edited a comment on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-930949400


   > Thanks for the feedback @echauchot! I wasn't sure if re-windowing within an IO was acceptable but it sounds like that's the path to go. It's way less complex, and I definitely like that. I'll make those changes
   
   @egalpin my pleasure ! 
   
   I mean: you should not change the windows of the elements. In fact, you should not deal with the windows at all. Your problem is to join the input elements with the status (json and error) of the write. You could do:
   - join by doc id but that whould not be possible in case the id is not provided in the input doc (autogeneration)
   - you maintain the same order between input docs and WriteSummary objects so you could simple join index 1 with index 1, index 2 with index 2 etc...
   I don't get why you bothered with windows in the first place but maybe there is something I missed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-910380980


   Friendly bump. Anyone able to review?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on a change in pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on a change in pull request #15381:
URL: https://github.com/apache/beam/pull/15381#discussion_r731017896



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(
+      HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes, boolean throwWriteErrors)
       throws IOException {
 
+    List<WriteSummary> responses = new ArrayList<>();
+    int numErrors = 0;
     JsonNode searchResult = parseResponse(responseEntity);
-    boolean errors = searchResult.path("errors").asBoolean();
-    if (errors) {
-      int numErrors = 0;
-
-      StringBuilder errorMessages =
-          new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
-      JsonNode items = searchResult.path("items");
-      if (items.isMissingNode() || items.size() == 0) {
-        errorMessages.append(searchResult.toString());
-      }
-      // some items present in bulk might have errors, concatenate error messages
-      for (JsonNode item : items) {
-        JsonNode error = item.findValue("error");
-        if (error != null) {
-          // N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed.
-          String type = error.path("type").asText();
-          String reason = error.path("reason").asText();
-          String docId = item.findValue("_id").asText();
-          JsonNode causedBy = error.path("caused_by"); // May not be present
-          String cbReason = causedBy.path("reason").asText();
-          String cbType = causedBy.path("type").asText();
-
-          if (allowedErrorTypes == null
-              || (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) {
-            // 'error' and 'causedBy` fields are not null, and the error is not being ignored.
-            numErrors++;
-
-            errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
-
-            if (!causedBy.isMissingNode()) {
-              errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
-            }
+    StringBuilder errorMessages =
+        new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
+    JsonNode items = searchResult.path("items");
+
+    if (items.isMissingNode() || items.size() == 0) {
+      // This would only be expected in cases like connectivity issues or similar
+      errorMessages.append(searchResult.toString());
+      throw new RuntimeException(

Review comment:
       Good call, this is overzealous. I'll reduce to a log warning




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-946037958


   Note that https://github.com/apache/beam/pull/15381/commits/e01efcd40e8e22edcf03ae30b0f53df9a31a2851 aims to also address [BEAM-5172](https://issues.apache.org/jira/browse/BEAM-5172)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #15381:
URL: https://github.com/apache/beam/pull/15381#discussion_r716701847



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1623,11 +1651,153 @@ public void setup() throws IOException {
 
       @ProcessElement
       public void processElement(ProcessContext c) throws IOException {
-        c.output(createBulkApiEntity(spec, c.element(), backendVersion));
+        String inputDoc = c.element();
+        String bulkDirective = createBulkApiEntity(spec, inputDoc, backendVersion);
+        c.output(
+            WriteSummary.create()
+                .withInputDoc(inputDoc)
+                .withBulkDirective(bulkDirective)
+                // N.B. Saving the element timestamp for later use allows for exactly emulating
+                // c.output(...) because c.output is equivalent to
+                // c.outputWithTimestamp(..., c.timestamp())
+                .withTimestamp(c.timestamp()));
       }
     }
   }
 
+  public static class WriteSummaryCoder extends AtomicCoder<WriteSummary> implements Serializable {
+    private static final WriteSummaryCoder INSTANCE = new WriteSummaryCoder();
+
+    private WriteSummaryCoder() {}
+
+    public static WriteSummaryCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(WriteSummary value, OutputStream outStream) throws IOException {
+      NullableCoder.of(StringUtf8Coder.of()).encode(value.getInputDoc(), outStream);
+      NullableCoder.of(StringUtf8Coder.of()).encode(value.getBulkDirective(), outStream);
+      BooleanCoder.of().encode(value.getHasError(), outStream);
+      NullableCoder.of(StringUtf8Coder.of()).encode(value.getResponseItemJson(), outStream);
+      NullableCoder.of(InstantCoder.of()).encode(value.getTimestamp(), outStream);
+    }
+
+    @Override
+    public WriteSummary decode(InputStream inStream) throws IOException {
+      String inputDoc = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
+      String bulkDirective = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
+      boolean hasError = BooleanCoder.of().decode(inStream);
+      String responseItemJson = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
+      Instant timestamp = NullableCoder.of(InstantCoder.of()).decode(inStream);
+
+      return WriteSummary.create()
+          .withInputDoc(inputDoc)
+          .withBulkDirective(bulkDirective)
+          .withHasError(hasError)
+          .withResponseItemJson(responseItemJson)
+          .withTimestamp(timestamp);
+    }
+  }
+
+  // Immutable POJO for maintaining various states of documents and their bulk representation, plus
+  // response from ES for the given document and the timestamp of the data
+  @DefaultCoder(WriteSummaryCoder.class)
+  @AutoValue
+  public abstract static class WriteSummary implements Serializable {

Review comment:
       As this class is instanciated before any write operation (in the DocToBulk transform) I'd call it simply `Document` and that would echo with `DocumentMetaData` existing class. WDYT ?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(
+      HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes, boolean throwWriteErrors)
       throws IOException {
 
+    List<WriteSummary> responses = new ArrayList<>();
+    int numErrors = 0;
     JsonNode searchResult = parseResponse(responseEntity);
-    boolean errors = searchResult.path("errors").asBoolean();
-    if (errors) {
-      int numErrors = 0;
-
-      StringBuilder errorMessages =
-          new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
-      JsonNode items = searchResult.path("items");
-      if (items.isMissingNode() || items.size() == 0) {
-        errorMessages.append(searchResult.toString());
-      }
-      // some items present in bulk might have errors, concatenate error messages
-      for (JsonNode item : items) {
-        JsonNode error = item.findValue("error");
-        if (error != null) {
-          // N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed.
-          String type = error.path("type").asText();
-          String reason = error.path("reason").asText();
-          String docId = item.findValue("_id").asText();
-          JsonNode causedBy = error.path("caused_by"); // May not be present
-          String cbReason = causedBy.path("reason").asText();
-          String cbType = causedBy.path("type").asText();
-
-          if (allowedErrorTypes == null
-              || (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) {
-            // 'error' and 'causedBy` fields are not null, and the error is not being ignored.
-            numErrors++;
-
-            errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
-
-            if (!causedBy.isMissingNode()) {
-              errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
-            }
+    StringBuilder errorMessages =
+        new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
+    JsonNode items = searchResult.path("items");
+
+    if (items.isMissingNode() || items.size() == 0) {
+      // This would only be expected in cases like connectivity issues or similar
+      errorMessages.append(searchResult.toString());
+      throw new RuntimeException(
+          String.format(
+              "'items' missing from Elasticsearch response: %s", errorMessages.toString()));
+    }
+
+    // some items present in bulk might have errors, concatenate error messages and record
+    // which items had errors
+    for (JsonNode item : items) {
+      WriteSummary result = WriteSummary.create().withResponseItemJson(item.toString());
+
+      JsonNode error = item.findValue("error");
+      if (error != null) {
+        // N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed.
+        String type = error.path("type").asText();
+        String reason = error.path("reason").asText();
+        String docId = item.findValue("_id").asText();
+        JsonNode causedBy = error.path("caused_by"); // May not be present
+        String cbReason = causedBy.path("reason").asText();
+        String cbType = causedBy.path("type").asText();
+
+        if (allowedErrorTypes == null
+            || (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) {
+          // 'error' and 'causedBy` fields are not null, and the error is not being ignored.
+          result = result.withHasError(true);
+          numErrors++;
+
+          errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
+
+          if (!causedBy.isMissingNode()) {
+            errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
           }
         }
       }
-      if (numErrors > 0) {
+      responses.add(result);
+    }
+
+    if (numErrors > 0) {
+      LOG.error(errorMessages.toString());
+      if (throwWriteErrors) {

Review comment:
       l like that this behavior is configurable !

##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
##########
@@ -141,6 +141,18 @@ public void testWriteWithAllowableErrors() throws Exception {
     elasticsearchIOTestCommon.testWriteWithAllowedErrors();
   }
 
+  @Test

Review comment:
       No need to test in ITest it is more of an UTest. ITests are for high load

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(
+      HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes, boolean throwWriteErrors)
       throws IOException {
 
+    List<WriteSummary> responses = new ArrayList<>();
+    int numErrors = 0;
     JsonNode searchResult = parseResponse(responseEntity);
-    boolean errors = searchResult.path("errors").asBoolean();
-    if (errors) {
-      int numErrors = 0;
-
-      StringBuilder errorMessages =
-          new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
-      JsonNode items = searchResult.path("items");
-      if (items.isMissingNode() || items.size() == 0) {
-        errorMessages.append(searchResult.toString());
-      }
-      // some items present in bulk might have errors, concatenate error messages
-      for (JsonNode item : items) {
-        JsonNode error = item.findValue("error");
-        if (error != null) {
-          // N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed.
-          String type = error.path("type").asText();
-          String reason = error.path("reason").asText();
-          String docId = item.findValue("_id").asText();
-          JsonNode causedBy = error.path("caused_by"); // May not be present
-          String cbReason = causedBy.path("reason").asText();
-          String cbType = causedBy.path("type").asText();
-
-          if (allowedErrorTypes == null
-              || (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) {
-            // 'error' and 'causedBy` fields are not null, and the error is not being ignored.
-            numErrors++;
-
-            errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
-
-            if (!causedBy.isMissingNode()) {
-              errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
-            }
+    StringBuilder errorMessages =
+        new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
+    JsonNode items = searchResult.path("items");
+
+    if (items.isMissingNode() || items.size() == 0) {
+      // This would only be expected in cases like connectivity issues or similar
+      errorMessages.append(searchResult.toString());
+      throw new RuntimeException(

Review comment:
       fail the pipeline in that case ?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1789,8 +1964,14 @@ public Write withAllowableResponseErrors(@Nullable Set<String> allowableResponse
       return this;
     }
 
+    /** Refer to {@link BulkIO#withThrowWriteErrors}. */
+    public Write withThrowWriteErrors(boolean throwWriteErrors) {

Review comment:
       Please add a hint to tell users that this parameter could be usefull in case the pipeline is in streaming mode to avoid infinite retry

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1636,11 +1806,15 @@ public void processElement(ProcessContext c) throws IOException {
    * cluster. This class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for
    * convenience and backward compatibility.
    */
-  public static class Write extends PTransform<PCollection<String>, PDone> {
+  public static class Write extends PTransform<PCollection<String>, PCollectionTuple> {

Review comment:
        I see the write tests do not change so there is no breaking change. I guess the user could chose whether he wants to process the PCollectionTuple or not. 

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(

Review comment:
       I would rename this method. Indeed, it can also report items written with sucess so I'd call it something like `createWriteReport`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on a change in pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on a change in pull request #15381:
URL: https://github.com/apache/beam/pull/15381#discussion_r700377417



##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -111,7 +124,7 @@ static String getEsIndex() {
   }
 
   static final String ES_TYPE = "test";
-  static final long NUM_DOCS_UTESTS = 100L;
+  static final long NUM_DOCS_UTESTS = 40L;

Review comment:
       This was a bit of grasping at straws to be honest. My thought was that fewer docs would hopefully lower the flakiness of the uTests given that it would be less arduous on the state to write fewer docs. But granted, 100 is not so different from 40.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-926382993


   @aaltay @egalpin sorry guys I had very reduced availability these days because of the Apachecon. Resuming review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-932037237


   @egalpin I don't get it. Neither BulkIOBaseFn#finishBundle() nor ElasticsearchIO#flushbatch() expect windowing information.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot edited a comment on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot edited a comment on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-930949400


   > Thanks for the feedback @echauchot! I wasn't sure if re-windowing within an IO was acceptable but it sounds like that's the path to go. It's way less complex, and I definitely like that. I'll make those changes
   
   @egalpin my pleasure ! 
   
   I mean: you should not change the windows of the elements. In fact, you should not deal with the windows at all. Your problem is to join the input elements with the status (json and error) of the write. You could do:
   - join by doc id but that whould not be possible in case the id is not provided in the input doc (autogeneration). So it is not the correct way to go.
   - you maintain the same order between input docs and WriteSummary objects so you could simply join index 1 with index 1, index 2 with index 2 etc...
   I don't get why you bothered with windows in the first place but maybe there is something I missed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #15381:
URL: https://github.com/apache/beam/pull/15381#discussion_r731695328



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1636,11 +1806,15 @@ public void processElement(ProcessContext c) throws IOException {
    * cluster. This class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for
    * convenience and backward compatibility.
    */
-  public static class Write extends PTransform<PCollection<String>, PDone> {
+  public static class Write extends PTransform<PCollection<String>, PCollectionTuple> {

Review comment:
       PDone is more of a tagging class : does no configuration and has no PValue (superclass of PCollection) in expand(). It should be ok but the best is to add a test to ensure that the pipeline actually finishes in that particular case. All the other combinations of error/isThrowErrors either throw a RuntimeException or output a PCollectionTuple.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-946897054






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-928377730


   Oh I had also forgotten about an alternative solution that would have less complexity, but I’m not sure how it might or might not adhere to best practices as a sink in the system. 
   
   We could instead combine inputs globally and forget about maintaining windows altogether. I believe this is what the BQ Write method does, and I think that not maintaining windows in a sink is generally sane 🤷‍♂️ 
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on a change in pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on a change in pull request #15381:
URL: https://github.com/apache/beam/pull/15381#discussion_r725913649



##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -2214,7 +2502,40 @@ private void flushBatch() throws IOException, InterruptedException {
           }
           responseEntity = handleRetry("POST", endPoint, Collections.emptyMap(), requestBody);
         }
-        checkForErrors(responseEntity, spec.getAllowedResponseErrors());
+
+        List<WriteSummary> responses =
+            checkForErrors(
+                responseEntity, spec.getAllowedResponseErrors(), spec.getThrowWriteErrors());
+
+        return mergeInputsAndResponses(inputEntries, responses);
+      }
+
+      private static Multimap<BoundedWindow, WriteSummary> mergeInputsAndResponses(
+          List<Entry<BoundedWindow, WriteSummary>> inputs, List<WriteSummary> responses) {
+
+        checkArgument(
+            inputs.size() == responses.size(), "inputs and responses must be of same size");
+
+        Multimap<BoundedWindow, WriteSummary> results = ArrayListMultimap.create();
+
+        // N.B. the order of responses must always match the order of inputs
+        for (int i = 0; i < inputs.size(); i++) {
+          BoundedWindow outputWindow = inputs.get(i).getKey();
+
+          // Contains raw input document and Bulk directive counterpart only
+          WriteSummary inputDoc = inputs.get(i).getValue();
+
+          // Contains stringified JSON response from Elasticsearch and error status only
+          WriteSummary outputDoc = responses.get(i);
+
+          WriteSummary merged =

Review comment:
       Plkease add a comment: contains, all the `WriteSummary` fields set matching inputDoc and write response

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(
+      HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes, boolean throwWriteErrors)
       throws IOException {
 
+    List<WriteSummary> responses = new ArrayList<>();
+    int numErrors = 0;
     JsonNode searchResult = parseResponse(responseEntity);
-    boolean errors = searchResult.path("errors").asBoolean();
-    if (errors) {
-      int numErrors = 0;
-
-      StringBuilder errorMessages =
-          new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
-      JsonNode items = searchResult.path("items");
-      if (items.isMissingNode() || items.size() == 0) {
-        errorMessages.append(searchResult.toString());
-      }
-      // some items present in bulk might have errors, concatenate error messages
-      for (JsonNode item : items) {
-        JsonNode error = item.findValue("error");
-        if (error != null) {
-          // N.B. An empty-string within the allowedErrorTypes Set implies all errors are allowed.
-          String type = error.path("type").asText();
-          String reason = error.path("reason").asText();
-          String docId = item.findValue("_id").asText();
-          JsonNode causedBy = error.path("caused_by"); // May not be present
-          String cbReason = causedBy.path("reason").asText();
-          String cbType = causedBy.path("type").asText();
-
-          if (allowedErrorTypes == null
-              || (!allowedErrorTypes.contains(type) && !allowedErrorTypes.contains(cbType))) {
-            // 'error' and 'causedBy` fields are not null, and the error is not being ignored.
-            numErrors++;
-
-            errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, reason, type));
-
-            if (!causedBy.isMissingNode()) {
-              errorMessages.append(String.format("%nCaused by: %s (%s)", cbReason, cbType));
-            }
+    StringBuilder errorMessages =
+        new StringBuilder("Error writing to Elasticsearch, some elements could not be inserted:");
+    JsonNode items = searchResult.path("items");
+
+    if (items.isMissingNode() || items.size() == 0) {
+      // This would only be expected in cases like connectivity issues or similar
+      errorMessages.append(searchResult.toString());
+      throw new RuntimeException(

Review comment:
       ping ?

##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
##########
@@ -141,6 +141,18 @@ public void testWriteWithAllowableErrors() throws Exception {
     elasticsearchIOTestCommon.testWriteWithAllowedErrors();
   }
 
+  @Test

Review comment:
       ping ?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -237,49 +250,64 @@ static JsonNode parseResponse(HttpEntity responseEntity) throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, @Nullable Set<String> allowedErrorTypes)
+  static List<WriteSummary> checkForErrors(

Review comment:
       ping ?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1623,11 +1651,153 @@ public void setup() throws IOException {
 
       @ProcessElement
       public void processElement(ProcessContext c) throws IOException {
-        c.output(createBulkApiEntity(spec, c.element(), backendVersion));
+        String inputDoc = c.element();
+        String bulkDirective = createBulkApiEntity(spec, inputDoc, backendVersion);
+        c.output(
+            WriteSummary.create()
+                .withInputDoc(inputDoc)
+                .withBulkDirective(bulkDirective)
+                // N.B. Saving the element timestamp for later use allows for exactly emulating
+                // c.output(...) because c.output is equivalent to
+                // c.outputWithTimestamp(..., c.timestamp())
+                .withTimestamp(c.timestamp()));
       }
     }
   }
 
+  public static class WriteSummaryCoder extends AtomicCoder<WriteSummary> implements Serializable {
+    private static final WriteSummaryCoder INSTANCE = new WriteSummaryCoder();
+
+    private WriteSummaryCoder() {}
+
+    public static WriteSummaryCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(WriteSummary value, OutputStream outStream) throws IOException {
+      NullableCoder.of(StringUtf8Coder.of()).encode(value.getInputDoc(), outStream);
+      NullableCoder.of(StringUtf8Coder.of()).encode(value.getBulkDirective(), outStream);
+      BooleanCoder.of().encode(value.getHasError(), outStream);
+      NullableCoder.of(StringUtf8Coder.of()).encode(value.getResponseItemJson(), outStream);
+      NullableCoder.of(InstantCoder.of()).encode(value.getTimestamp(), outStream);
+    }
+
+    @Override
+    public WriteSummary decode(InputStream inStream) throws IOException {
+      String inputDoc = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
+      String bulkDirective = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
+      boolean hasError = BooleanCoder.of().decode(inStream);
+      String responseItemJson = NullableCoder.of(StringUtf8Coder.of()).decode(inStream);
+      Instant timestamp = NullableCoder.of(InstantCoder.of()).decode(inStream);
+
+      return WriteSummary.create()
+          .withInputDoc(inputDoc)
+          .withBulkDirective(bulkDirective)
+          .withHasError(hasError)
+          .withResponseItemJson(responseItemJson)
+          .withTimestamp(timestamp);
+    }
+  }
+
+  // Immutable POJO for maintaining various states of documents and their bulk representation, plus
+  // response from ES for the given document and the timestamp of the data
+  @DefaultCoder(WriteSummaryCoder.class)
+  @AutoValue
+  public abstract static class WriteSummary implements Serializable {

Review comment:
       ping ?

##########
File path: sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1789,8 +1964,14 @@ public Write withAllowableResponseErrors(@Nullable Set<String> allowableResponse
       return this;
     }
 
+    /** Refer to {@link BulkIO#withThrowWriteErrors}. */
+    public Write withThrowWriteErrors(boolean throwWriteErrors) {

Review comment:
       ping ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-932279563


   @echauchot It's highly possible that I have a fundamental misunderstanding about how to handle window data.  I'm going to try to outline my goals and challenges, and take the proposed implementation out of focus.
   
   Goals:
   
   - Change BulkIO/Write to output PCollectionTuple rather than PDone, in order to support reporting the status of indexing each input document
   - Leave windows/timestampes/etc of input data entirely unaltered
   
   Challenges:
   
   - BulkIOBaseFn relies on buffering inputs, either using bundles or Stateful specs
   - BulkIOBaseFn#finishBundle() must be called to ensure that any buffered inputs are sent to ES, and as of this PR, output to the PCollectionTuple
   - DoFn.FinishBundle mehtods can accept a [FinishBundleContext](https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/transforms/DoFn.FinishBundleContext.html) in order to output elements
   - FinishBundleContext output methods all require explicit specification of a BoundedWindow instance
   
   I got a bit stuck on that last point.  My impression was that in order to ensure buffered docs' results were output, and in order to leave those elements' windows unaltered, I needed to keep track of the windows to which those elements belong so that they could be explicitly passed to FinishBundleContext#output.
   
   I'd definitely be keen to learn more about how to handle windows and challenge my assumptions here.  Thanks for your time @echauchot in reviewing and teaching.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] timrobertson100 commented on a change in pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
timrobertson100 commented on a change in pull request #15381:
URL: https://github.com/apache/beam/pull/15381#discussion_r700312684



##########
File path: sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -111,7 +124,7 @@ static String getEsIndex() {
   }
 
   static final String ES_TYPE = "test";
-  static final long NUM_DOCS_UTESTS = 100L;
+  static final long NUM_DOCS_UTESTS = 40L;

Review comment:
       Just out of interest - why, please?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-911700074


   Thanks @timrobertson100 for having a look! 
   
   @echauchot I’m definitely keen to understand if the strategy in the PR will work and its windowing implications, and would be happy to learn of alternative approaches!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-911613389


   @timrobertson100 sure ! thanks for taking a look !
   @egalpin thanks for your work !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin edited a comment on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin edited a comment on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-931264027


   Thanks for clarifying, that makes sense 👍 My original intent was to leave windows alone entirely and just output without modification. It’s worth noting that the window each element belongs to _should_ be left unmodified, and that’s why the complexity is present. 
   
   The challenge arose from the use of FinishBundle, where the output method requires explicit use of a BoundedWindow. So all of the window collection/multimap/adapter complexity is all done so that we could output any buffered elements when FinishBundle is called. If FinishBundle could accept a MultiOutput or OutputReceiver, I believe this would be solved neatly. 
   
   I just didn’t want to gate this change on FinishBundle changes, but maybe that’s the right path?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-905093496


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] timrobertson100 commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
timrobertson100 commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-911234729


   Thanks @egalpin 
   
   @echauchot - if you have time, could you please take a look at the section of changes in ElasticsearchIO from lines 2346 onwards? I'm just not familiar enough with the windowing to verify. The rest of the changes seem very reasonable.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-910655360


   Ya I agree that maintaining a record of timestamps and using the context adapter is a lot of complexity.  The concept was borrowed heavily from this RedisIO PR:  https://github.com/apache/beam/pull/5841
   
   I'm interested in investigating how much work would be involved in completing [BEAM-1287](https://issues.apache.org/jira/browse/BEAM-1287) to allow `FinishBundle` methods to accept OutputReceiver/MultiOutputReceiver as that would make this type of pattern (which seems to be used in at least a few places) much easier to work with.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-920496317


   Thanks for having a look @echauchot! And no problem, I’ve been swamped with work and unable to address the flaky tests so I completely understand!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-930485572


   Thanks for the feedback @echauchot! I wasn't sure if re-windowing within an IO was acceptable but it sounds like that's the path to go.  It's way less complex, and I definitely like that.  I'll make those changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot merged pull request #15381: [BEAM-10990] Elasticsearch response filtering and [BEAM-5172] Tries to reduce ES UTest flakiness

Posted by GitBox <gi...@apache.org>.
echauchot merged pull request #15381:
URL: https://github.com/apache/beam/pull/15381


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-946537530


   > [BEAM-5172](https://issues.apache.org/jira/browse/BEAM-5172)
   
   Nice !
   But please, rename commit with [BEAM-5172] header as it refers to a different ticket, I guess the utest docCount decrease commit refers to the same ticket as well. And also please squash the [BEAM-10990] commits together and the [BEAM-5172] commits together.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] egalpin commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
egalpin commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-931264027


   Thanks for clarifying, that makes sense đź‘Ť My original intent was to leave windows alone entirely and just output without modification. 
   
   The challenge arose from the use of FinishBundle, where the output method requires explicit use of a BoundedWindow. So all of the window collection/multimap/adapter complexity is all done so that we could output any buffered elements when FinishBundle is called. If FinishBundle could accept a MultiOutput or OutputReceiver, I believe this would be solved neatly. 
   
   I just didn’t want to gate this change on FinishBundle changes, but maybe that’s the right path?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-938449865


   > @echauchot It's highly possible that I have a fundamental misunderstanding about how to handle window data. I'm going to try to outline my goals and challenges, and take the proposed implementation out of focus.
   > 
   > Goals:
   > 
   > * Change BulkIO/Write to output PCollectionTuple rather than PDone, in order to support reporting the status of indexing each input document
   > * Leave windows/timestampes/etc of input data entirely unaltered
   > 
   > Challenges:
   > 
   > * BulkIOBaseFn relies on buffering inputs, either using bundles or Stateful specs
   > * BulkIOBaseFn#finishBundle() must be called to ensure that any buffered inputs are sent to ES, and as of this PR, output to the PCollectionTuple
   > * DoFn.FinishBundle mehtods can accept a [FinishBundleContext](https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/transforms/DoFn.FinishBundleContext.html) in order to output elements
   > * FinishBundleContext output methods all require explicit specification of a BoundedWindow instance
   > 
   > I got a bit stuck on that last point. My impression was that in order to ensure buffered docs' results were output, and in order to leave those elements' windows unaltered, I needed to keep track of the windows to which those elements belong so that they could be explicitly passed to FinishBundleContext#output.
   > 
   > I'd definitely be keen to learn more about how to handle windows and challenge my assumptions here. Thanks for your time @echauchot in reviewing and teaching.
   
   Just to write here what we discussed privately yesterday (Apache way: what did not happen publicly did not happen at all):
   - My bad, I did not know about the `DoFn#finishBundle()`  signature change. It now forces specifying a window in the output. I first thought that dealing with windows was not needed and brought unnecessary complexity but it seems it is mandatory :smile: 
   but I hope it is ony temporary until `OutputReceiver` is fleshed out.
   - I took a look at the existing IOs that ouput data to PCollections: 
     - `FhirIO`: outputs to last seen window. Seem incorrect.
     - `HadoopFormatIO` and `BigqueryIO` store to map keyed by window to then output per window similarly to what you do
     => So I guess the general window maintaining looks good. I need to look in more details at the code to give LGTM
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] echauchot commented on pull request #15381: [BEAM-10990] Elasticsearch response filtering and [BEAM-5172] Tries to reduce ES UTest flakiness

Posted by GitBox <gi...@apache.org>.
echauchot commented on pull request #15381:
URL: https://github.com/apache/beam/pull/15381#issuecomment-1062976356


   Should be fixed by [this PR](https://github.com/apache/beam/pull/16744/)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org