You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/31 08:36:11 UTC

[GitHub] [beam] iemejia commented on a change in pull request #12583: [BEAM-10706] Fix duplicate key error in DynamoDBIO.Write

iemejia commented on a change in pull request #12583:
URL: https://github.com/apache/beam/pull/12583#discussion_r479974236



##########
File path: sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java
##########
@@ -186,6 +192,10 @@ public void testRetries() throws Throwable {
 
     final List<WriteRequest> writeRequests = DynamoDBIOTestHelper.generateWriteRequests(numOfItems);
 
+    final List<String> overwriteByPKeys = new ArrayList<>();

Review comment:
       nit: `Arrays.asList` will make the code more concise.

##########
File path: sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -342,6 +343,8 @@ public boolean test(Throwable throwable) {
 
     abstract @Nullable SerializableFunction<T, KV<String, WriteRequest>> getWriteItemMapperFn();
 
+    abstract @Nullable List<String> getOverwriteByPKeys();

Review comment:
       Can you please remove the `@Nullable` here and set an empty collection as the default value on the `build()` method. This will simplify the validations in the rest of the code and will make it simpler.

##########
File path: sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java
##########
@@ -199,7 +209,8 @@ public void testRetries() throws Throwable {
                         writeRequest -> KV.of(tableName, writeRequest))
                 .withRetryConfiguration(
                     DynamoDBIO.RetryConfiguration.create(4, Duration.standardSeconds(10)))
-                .withAwsClientsProvider(AwsClientsProviderMock.of(amazonDynamoDBMock)));
+                .withAwsClientsProvider(AwsClientsProviderMock.of(amazonDynamoDBMock))
+                .withOverwriteByPKeys(overwriteByPKeys));

Review comment:
       Can we do this in an independent test to not change the original write test, e.g. `testDeduplicateWriteItems` and produce some duplicate items that test the correct removal of keys behaviour. I am assuming that for the batch case the direct runner will batch them together, but I will let a further note when I confirm this 100%.

##########
File path: sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -450,12 +459,35 @@ public void startBundle(StartBundleContext context) {
       public void processElement(ProcessContext context) throws Exception {
         final KV<String, WriteRequest> writeRequest =
             (KV<String, WriteRequest>) spec.getWriteItemMapperFn().apply(context.element());
+        if (spec.getOverwriteByPKeys() != null) {
+          removeDupPKeysRequestsIfAny(writeRequest.getValue());
+        }
         batch.add(writeRequest);
         if (batch.size() >= BATCH_SIZE) {
           flushBatch();
         }
       }
 
+      private void removeDupPKeysRequestsIfAny(WriteRequest request) {

Review comment:
       Can we better restructure this method to instead of removing from the batch every time we just skip adding the value if it already is part of the batch. It could make the code simpler.

##########
File path: sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -450,12 +459,35 @@ public void startBundle(StartBundleContext context) {
       public void processElement(ProcessContext context) throws Exception {
         final KV<String, WriteRequest> writeRequest =
             (KV<String, WriteRequest>) spec.getWriteItemMapperFn().apply(context.element());
+        if (spec.getOverwriteByPKeys() != null) {
+          removeDupPKeysRequestsIfAny(writeRequest.getValue());
+        }
         batch.add(writeRequest);
         if (batch.size() >= BATCH_SIZE) {
           flushBatch();
         }
       }
 
+      private void removeDupPKeysRequestsIfAny(WriteRequest request) {
+        Map<String, AttributeValue> pKeyValueNew = extractPkeyValues(request);
+        batch.removeIf(item -> extractPkeyValues(item.getValue()).equals(pKeyValueNew));

Review comment:
       I am wondering if this will get the existing key removed too? Maybe worth to validate this in the tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org