You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/12/14 16:38:49 UTC

[GitHub] [beam] aromanenko-dev commented on a change in pull request #15964: [BEAM-13209] Fix DynamoDBIO.write to properly handle partial success (SDK v2)

aromanenko-dev commented on a change in pull request #15964:
URL: https://github.com/apache/beam/pull/15964#discussion_r768827317



##########
File path: sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -542,54 +562,64 @@ private void flushBatch() throws IOException, InterruptedException {
         if (batch.isEmpty()) {
           return;
         }
-
         try {
-          // Since each element is a KV<tableName, writeRequest> in the batch, we need to group them
-          // by tableName
-          Map<String, List<WriteRequest>> mapTableRequest =
+          // Group values KV<tableName, writeRequest> by tableName
+          // Note: The original order of arrival is lost reading the map entries.
+          Map<String, List<WriteRequest>> writesPerTable =
               batch.values().stream()
-                  .collect(
-                      Collectors.groupingBy(
-                          KV::getKey, Collectors.mapping(KV::getValue, Collectors.toList())));
-
-          BatchWriteItemRequest batchRequest =
-              BatchWriteItemRequest.builder().requestItems(mapTableRequest).build();
-
-          Sleeper sleeper = Sleeper.DEFAULT;
-          BackOff backoff = retryBackoff.backoff();
-          int attempt = 0;
-          while (true) {
-            attempt++;
-            try {
-              client.batchWriteItem(batchRequest);
-              break;
-            } catch (Exception ex) {
-              // Fail right away if there is no retry configuration
-              if (spec.getRetryConfiguration() == null
-                  || !spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
-                DYNAMO_DB_WRITE_FAILURES.inc();
-                LOG.info(
-                    "Unable to write batch items {}.", batchRequest.requestItems().entrySet(), ex);
-                throw new IOException("Error writing to DynamoDB (no attempt made to retry)", ex);
-              }
-
-              if (!BackOffUtils.next(sleeper, backoff)) {
-                throw new IOException(
-                    String.format(
-                        "Error writing to DynamoDB after %d attempt(s). No more attempts allowed",
-                        attempt),
-                    ex);
-              } else {
-                // Note: this used in test cases to verify behavior
-                LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
-              }
-            }
+                  .collect(groupingBy(KV::getKey, mapping(KV::getValue, toList())));
+
+          // Backoff used to resume from partial failures
+          BackOff resume = resumeBackoff.backoff();
+          do {
+            BatchWriteItemRequest batchRequest =
+                BatchWriteItemRequest.builder().requestItems(writesPerTable).build();
+            // If unprocessed items remain, we have to resume the operation (with backoff)
+            writesPerTable = writeWithRetries(batchRequest).unprocessedItems();
+          } while (!writesPerTable.isEmpty() && BackOffUtils.next(Sleeper.DEFAULT, resume));
+
+          if (!writesPerTable.isEmpty()) {
+            DYNAMO_DB_WRITE_FAILURES.inc();
+            LOG.warn(RESUME_ERROR_LOG, writesPerTable);

Review comment:
       Should it be `error` log level?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org