You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/04 13:36:30 UTC

[GitHub] [beam] mosche opened a new pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

mosche opened a new pull request #15895:
URL: https://github.com/apache/beam/pull/15895


   https://github.com/apache/beam/pull/12583 introduced a mechanism to deduplicate elements per batch based on deduplication keys. Unfortunately the mechanism is not backwards compatible. If upgrading without setting the deduplication keys all elements end up with the same empty deduplication key. As a result only one element per batch is ever written to DynamoDB, everything else is lost.
   
   This change makes sure elements are deduplicated based on element equality if no deduplication keys are provided.
   
   Missing unit tests for reads and writes are added to better catch such bugs in the future.
   Bugs in DynamoDBIOTest (using localstack) are fixed, further improvements of this (proper integration testing) should be done in BEAM-7559.
   
   The same bug exists for aws (SDK v1). I'll follow up with another PR for that.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   `ValidatesRunner` compliance status (on master branch)
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_ULR/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/lastCompletedBuild/badge/icon?subject=V1+Streaming">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon?subject=V1+Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2_Streaming/lastCompletedBuild/badge/icon?subject=V2+Streaming">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon?subject=Java+8">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon?subject=Java+11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon?subject=Portable+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Samza/lastCompletedBuild/badge/icon?subject=Portable">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon?subject=Structured+Streaming">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon?subject=ValCont">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon?subject=Portable">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Examples testing status on various runners
   --------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Lang</th>
         <th>ULR</th>
         <th>Dataflow</th>
         <th>Flink</th>
         <th>Samza</th>
         <th>Spark</th>
         <th>Twister2</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Go</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Java</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/lastCompletedBuild/badge/icon?subject=V1">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Java11_Cron/lastCompletedBuild/badge/icon?subject=V1+Java11">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java_Examples_Dataflow_V2/lastCompletedBuild/badge/icon?subject=V2">
           </a><br>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>Python</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
       <tr>
         <td>XLang</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   Post-Commit SDK/Transform Integration Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>Go</th>
         <th>Java</th>
         <th>Python</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon?subject=3.6">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon?subject=3.7">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon?subject=3.8">
           </a>
         </td>
       </tr>
     </tbody>
   </table>
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   <table>
     <thead>
       <tr>
         <th>---</th>
         <th>Java</th>
         <th>Python</th>
         <th>Go</th>
         <th>Website</th>
         <th>Whitespace</th>
         <th>Typescript</th>
       </tr>
     </thead>
     <tbody>
       <tr>
         <td>Non-portable</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon">
           </a><br>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon?subject=Tests">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon?subject=Lint">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon?subject=Docker">
           </a><br>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon?subject=Docs">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
       </tr>
       <tr>
         <td>Portable</td>
         <td>---</td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>
           <a href="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/">
             <img alt="Build Status" src="https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Cron/lastCompletedBuild/badge/icon">
           </a>
         </td>
         <td>---</td>
         <td>---</td>
         <td>---</td>
       </tr>
     </tbody>
   </table>
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r742845572



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -328,6 +337,8 @@ public static Builder builder() {
 
       public abstract Builder setMaxDuration(Duration maxDuration);
 
+      abstract Builder setInitialDuration(Duration initialDuration);

Review comment:
       This is important for testing, otherwise tests involving retries take forever.

##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -504,17 +517,20 @@ public void processElement(ProcessContext context) throws Exception {
       }
 
       private Map<String, AttributeValue> extractDeduplicateKeyValues(WriteRequest request) {
+        List<String> deduplicationKeys = spec.getDeduplicateKeys();
+        Map<String, AttributeValue> attributes = Collections.emptyMap();
+
         if (request.putRequest() != null) {
-          return request.putRequest().item().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+          attributes = request.putRequest().item();
         } else if (request.deleteRequest() != null) {
-          return request.deleteRequest().key().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        } else {
-          return Collections.emptyMap();
+          attributes = request.deleteRequest().key();
+        }
+
+        if (attributes.isEmpty() || deduplicationKeys.isEmpty()) {
+          return attributes;
         }

Review comment:
       The fix is to just return the unchanged attributes if `getDeduplicateKeys()` returns an empty list.
   Everything else here is pretty much improving test coverage.

##########
File path: sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOTest.java
##########
@@ -64,7 +65,10 @@
 /** Test Coverage for the IO. */
 public class DynamoDBIOTest implements Serializable {
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
-  @Rule public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(DynamoDBIO.class);
+
+  @Rule
+  public final transient ExpectedLogs writeFnLogs =
+      ExpectedLogs.none(DynamoDBIO.Write.WriteFn.class);

Review comment:
       This was previously checking for the wrong logger, tests still succeeded :/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #15895:
URL: https://github.com/apache/beam/pull/15895#issuecomment-966903305


   Thanks a lot for the review @dennisylyung ☘️ 
   Could you also have a look at the same fix for sdk 1 pls? 
   https://github.com/apache/beam/pull/15898


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev merged pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev merged pull request #15895:
URL: https://github.com/apache/beam/pull/15895


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r742848900



##########
File path: sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOTest.java
##########
@@ -64,7 +65,10 @@
 /** Test Coverage for the IO. */
 public class DynamoDBIOTest implements Serializable {
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
-  @Rule public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(DynamoDBIO.class);
+
+  @Rule
+  public final transient ExpectedLogs writeFnLogs =
+      ExpectedLogs.none(DynamoDBIO.Write.WriteFn.class);

Review comment:
       This was previously checking for the wrong logger, tests still succeeded :/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r743892138



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -504,17 +517,20 @@ public void processElement(ProcessContext context) throws Exception {
       }
 
       private Map<String, AttributeValue> extractDeduplicateKeyValues(WriteRequest request) {
+        List<String> deduplicationKeys = spec.getDeduplicateKeys();
+        Map<String, AttributeValue> attributes = Collections.emptyMap();
+
         if (request.putRequest() != null) {
-          return request.putRequest().item().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+          attributes = request.putRequest().item();
         } else if (request.deleteRequest() != null) {
-          return request.deleteRequest().key().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        } else {
-          return Collections.emptyMap();
+          attributes = request.deleteRequest().key();
+        }
+
+        if (attributes.isEmpty() || deduplicationKeys.isEmpty()) {
+          return attributes;
         }

Review comment:
       The fix is to just return the unchanged attributes if `getDeduplicateKeys()` returns an empty list.
   Everything else here is pretty much just improving test coverage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15895:
URL: https://github.com/apache/beam/pull/15895#issuecomment-961974456


   @dennisylyung As initial author of deduplication change, could you take a look, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r742845572



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -328,6 +337,8 @@ public static Builder builder() {
 
       public abstract Builder setMaxDuration(Duration maxDuration);
 
+      abstract Builder setInitialDuration(Duration initialDuration);

Review comment:
       This is important for testing, otherwise tests involving retries take forever.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r743892138



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -504,17 +517,20 @@ public void processElement(ProcessContext context) throws Exception {
       }
 
       private Map<String, AttributeValue> extractDeduplicateKeyValues(WriteRequest request) {
+        List<String> deduplicationKeys = spec.getDeduplicateKeys();
+        Map<String, AttributeValue> attributes = Collections.emptyMap();
+
         if (request.putRequest() != null) {
-          return request.putRequest().item().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+          attributes = request.putRequest().item();
         } else if (request.deleteRequest() != null) {
-          return request.deleteRequest().key().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        } else {
-          return Collections.emptyMap();
+          attributes = request.deleteRequest().key();
+        }
+
+        if (attributes.isEmpty() || deduplicationKeys.isEmpty()) {
+          return attributes;
         }

Review comment:
       The fix is to just return the unchanged attributes if `getDeduplicateKeys()` returns an empty list.
   Everything else here is pretty much improving test coverage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r742845572



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -328,6 +337,8 @@ public static Builder builder() {
 
       public abstract Builder setMaxDuration(Duration maxDuration);
 
+      abstract Builder setInitialDuration(Duration initialDuration);

Review comment:
       This is important for testing, otherwise tests involving retries take forever.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r742848900



##########
File path: sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOTest.java
##########
@@ -64,7 +65,10 @@
 /** Test Coverage for the IO. */
 public class DynamoDBIOTest implements Serializable {
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
-  @Rule public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(DynamoDBIO.class);
+
+  @Rule
+  public final transient ExpectedLogs writeFnLogs =
+      ExpectedLogs.none(DynamoDBIO.Write.WriteFn.class);

Review comment:
       This was previously checking for the wrong logger, tests still succeeded :/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r742848175



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -504,17 +517,20 @@ public void processElement(ProcessContext context) throws Exception {
       }
 
       private Map<String, AttributeValue> extractDeduplicateKeyValues(WriteRequest request) {
+        List<String> deduplicationKeys = spec.getDeduplicateKeys();
+        Map<String, AttributeValue> attributes = Collections.emptyMap();
+
         if (request.putRequest() != null) {
-          return request.putRequest().item().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+          attributes = request.putRequest().item();
         } else if (request.deleteRequest() != null) {
-          return request.deleteRequest().key().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        } else {
-          return Collections.emptyMap();
+          attributes = request.deleteRequest().key();
+        }
+
+        if (attributes.isEmpty() || deduplicationKeys.isEmpty()) {
+          return attributes;
         }
+
+        return ImmutableMap.copyOf(filterKeys(attributes, deduplicationKeys::contains));

Review comment:
       Changed the logic to using `Maps.filterKeys` so the intend is rather obvious compared to before:
   ```java
   attributes.entrySet().stream()
     .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
     .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #15895:
URL: https://github.com/apache/beam/pull/15895#issuecomment-961085233


   R: @aromanenko-dev 
   R: @iemejia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r742848175



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -504,17 +517,20 @@ public void processElement(ProcessContext context) throws Exception {
       }
 
       private Map<String, AttributeValue> extractDeduplicateKeyValues(WriteRequest request) {
+        List<String> deduplicationKeys = spec.getDeduplicateKeys();
+        Map<String, AttributeValue> attributes = Collections.emptyMap();
+
         if (request.putRequest() != null) {
-          return request.putRequest().item().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+          attributes = request.putRequest().item();
         } else if (request.deleteRequest() != null) {
-          return request.deleteRequest().key().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        } else {
-          return Collections.emptyMap();
+          attributes = request.deleteRequest().key();
+        }
+
+        if (attributes.isEmpty() || deduplicationKeys.isEmpty()) {
+          return attributes;
         }
+
+        return ImmutableMap.copyOf(filterKeys(attributes, deduplicationKeys::contains));

Review comment:
       Changed the logic to using `Maps.filterKeys` so the intend is rather obvious compared to before:
   ```java
   attributes.entrySet().stream()
     .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
     .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15895:
URL: https://github.com/apache/beam/pull/15895#issuecomment-961974456


   @dennisylyung As initial author of deduplication change, could you take a look, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on pull request #15895:
URL: https://github.com/apache/beam/pull/15895#issuecomment-961017370


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mosche commented on a change in pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
mosche commented on a change in pull request #15895:
URL: https://github.com/apache/beam/pull/15895#discussion_r742845572



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -328,6 +337,8 @@ public static Builder builder() {
 
       public abstract Builder setMaxDuration(Duration maxDuration);
 
+      abstract Builder setInitialDuration(Duration initialDuration);

Review comment:
       This is important for testing, otherwise tests involving retries take forever.

##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -504,17 +517,20 @@ public void processElement(ProcessContext context) throws Exception {
       }
 
       private Map<String, AttributeValue> extractDeduplicateKeyValues(WriteRequest request) {
+        List<String> deduplicationKeys = spec.getDeduplicateKeys();
+        Map<String, AttributeValue> attributes = Collections.emptyMap();
+
         if (request.putRequest() != null) {
-          return request.putRequest().item().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+          attributes = request.putRequest().item();
         } else if (request.deleteRequest() != null) {
-          return request.deleteRequest().key().entrySet().stream()
-              .filter(entry -> spec.getDeduplicateKeys().contains(entry.getKey()))
-              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        } else {
-          return Collections.emptyMap();
+          attributes = request.deleteRequest().key();
+        }
+
+        if (attributes.isEmpty() || deduplicationKeys.isEmpty()) {
+          return attributes;
         }

Review comment:
       The fix is to just return the unchanged attributes if `getDeduplicateKeys()` returns an empty list.
   Everything else here is pretty much improving test coverage.

##########
File path: sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOTest.java
##########
@@ -64,7 +65,10 @@
 /** Test Coverage for the IO. */
 public class DynamoDBIOTest implements Serializable {
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
-  @Rule public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(DynamoDBIO.class);
+
+  @Rule
+  public final transient ExpectedLogs writeFnLogs =
+      ExpectedLogs.none(DynamoDBIO.Write.WriteFn.class);

Review comment:
       This was previously checking for the wrong logger, tests still succeeded :/




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aromanenko-dev commented on pull request #15895: [BEAM-13009] Bugfix: Prevent dataloss in DynamoDBIO write if deduplication keys are not set (aws SDK v2)

Posted by GitBox <gi...@apache.org>.
aromanenko-dev commented on pull request #15895:
URL: https://github.com/apache/beam/pull/15895#issuecomment-961974456


   @dennisylyung As initial author of deduplication change, could you take a look, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org