You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/08/10 19:11:23 UTC

[GitHub] [beam] scwhittle commented on a change in pull request #15301: Change renames to handle retries internally

scwhittle commented on a change in pull request #15301:
URL: https://github.com/apache/beam/pull/15301#discussion_r686250554



##########
File path: sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java
##########
@@ -681,21 +734,77 @@ public void onSuccess(RewriteResponse rewriteResponse, HttpHeaders responseHeade
 
     @Override
     public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
-      readyToEnqueue = false;
-      throw new IOException(String.format("Error trying to rewrite %s to %s: %s", from, to, e));
+      if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
+        if (move) {
+          // Treat the missing source as a successful move. We don't verify the destination file
+          // exists as it may have subsequently been moved by something else.
+          readyToEnqueue = false;
+          lastError = null;
+        } else {
+          throw new FileNotFoundException(from.toString());
+        }
+      } else {
+        lastError = e;
+        readyToEnqueue = true;
+      }
     }
   }
 
   public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
       throws IOException {
-    LinkedList<RewriteOp> rewrites = makeRewriteOps(srcFilenames, destFilenames);
-    while (rewrites.size() > 0) {
-      executeBatches(makeCopyBatches(rewrites));
+    rewrite(srcFilenames, destFilenames, false);
+  }
+
+  public void rename(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+      throws IOException {
+    rewrite(srcFilenames, destFilenames, true);
+  }
+
+  private void rewrite(Iterable<String> srcFilenames, Iterable<String> destFilenames, boolean move)
+      throws IOException {
+    LinkedList<RewriteOp> rewrites = makeRewriteOps(srcFilenames, destFilenames, move);
+    org.apache.beam.sdk.util.BackOff backoff = BACKOFF_FACTORY.backoff();
+    while (true) {
+      List<BatchRequest> batches = makeCopyBatches(rewrites); // Removes completed rewrite ops.
+      if (batches.isEmpty()) {
+        break;
+      }
+      RewriteOp sampleErrorOp =
+          rewrites.stream().filter(op -> op.getLastError() != null).findFirst().orElse(null);

Review comment:
       we skip the 404 errors in the per-op handling above, so the error here is an error we want to retry
   
   we are not skipping, we either retry or once we hit the backoff limit we propagate one of the errors




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org