You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/12/14 16:39:00 UTC

[jira] [Work logged] (BEAM-13209) DynamoDBIO silently drops unprocessed items

     [ https://issues.apache.org/jira/browse/BEAM-13209?focusedWorklogId=695932&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-695932 ]

ASF GitHub Bot logged work on BEAM-13209:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Dec/21 16:38
            Start Date: 14/Dec/21 16:38
    Worklog Time Spent: 10m 
      Work Description: aromanenko-dev commented on a change in pull request #15964:
URL: https://github.com/apache/beam/pull/15964#discussion_r768827317



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -542,54 +562,64 @@ private void flushBatch() throws IOException, InterruptedException {
         if (batch.isEmpty()) {
           return;
         }
-
         try {
-          // Since each element is a KV<tableName, writeRequest> in the batch, we need to group them
-          // by tableName
-          Map<String, List<WriteRequest>> mapTableRequest =
+          // Group values KV<tableName, writeRequest> by tableName
+          // Note: The original order of arrival is lost reading the map entries.
+          Map<String, List<WriteRequest>> writesPerTable =
               batch.values().stream()
-                  .collect(
-                      Collectors.groupingBy(
-                          KV::getKey, Collectors.mapping(KV::getValue, Collectors.toList())));
-
-          BatchWriteItemRequest batchRequest =
-              BatchWriteItemRequest.builder().requestItems(mapTableRequest).build();
-
-          Sleeper sleeper = Sleeper.DEFAULT;
-          BackOff backoff = retryBackoff.backoff();
-          int attempt = 0;
-          while (true) {
-            attempt++;
-            try {
-              client.batchWriteItem(batchRequest);
-              break;
-            } catch (Exception ex) {
-              // Fail right away if there is no retry configuration
-              if (spec.getRetryConfiguration() == null
-                  || !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
-                DYNAMO_DB_WRITE_FAILURES.inc();
-                LOG.info(
-                    "Unable to write batch items {}.", batchRequest.requestItems().entrySet(), ex);
-                throw new IOException("Error writing to DynamoDB (no attempt made to retry)", ex);
-              }
-
-              if (!BackOffUtils.next(sleeper, backoff)) {
-                throw new IOException(
-                    String.format(
-                        "Error writing to DynamoDB after %d attempt(s). No more attempts allowed",
-                        attempt),
-                    ex);
-              } else {
-                // Note: this used in test cases to verify behavior
-                LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
-              }
-            }
+                  .collect(groupingBy(KV::getKey, mapping(KV::getValue, toList())));
+
+          // Backoff used to resume from partial failures
+          BackOff resume = resumeBackoff.backoff();
+          do {
+            BatchWriteItemRequest batchRequest =
+                BatchWriteItemRequest.builder().requestItems(writesPerTable).build();
+            // If unprocessed items remain, we have to resume the operation (with backoff)
+            writesPerTable = writeWithRetries(batchRequest).unprocessedItems();
+          } while (!writesPerTable.isEmpty() && BackOffUtils.next(Sleeper.DEFAULT, resume));
+
+          if (!writesPerTable.isEmpty()) {
+            DYNAMO_DB_WRITE_FAILURES.inc();
+            LOG.warn(RESUME_ERROR_LOG, writesPerTable);

Review comment:
       Should it be `error` log level?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 695932)
    Time Spent: 0.5h  (was: 20m)

> DynamoDBIO silently drops unprocessed items
> -------------------------------------------
>
>                 Key: BEAM-13209
>                 URL: https://issues.apache.org/jira/browse/BEAM-13209
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P1
>              Labels: aws, data-loss, dynamodb
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> `[batchWriteItem|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html]` might fail partially and return unprocessed items. Such partial failures are not handled and result in a data loss.
> If hitting DynamoDB at scale it's rather likely to run into this.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)