You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jr...@apache.org on 2023/05/04 17:35:48 UTC

[beam] branch release-2.47.0 updated: Merge pull request #26503: fix dataloss bug in batch Storage API sink. (#26512)

This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch release-2.47.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.47.0 by this push:
     new e80e46a00d0 Merge pull request #26503: fix dataloss bug in batch Storage API sink. (#26512)
e80e46a00d0 is described below

commit e80e46a00d0de562672d501afa2e11c4a77e625a
Author: Bruno Volpato <bv...@google.com>
AuthorDate: Thu May 4 13:35:39 2023 -0400

    Merge pull request #26503: fix dataloss bug in batch Storage API sink. (#26512)
    
    Co-authored-by: reuvenlax <re...@google.com>
---
 .../bigquery/StorageApiWriteUnshardedRecords.java  | 67 ++++++++++++++++++----
 1 file changed, 55 insertions(+), 12 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index f0015cddc38..3c082752449 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -31,11 +31,13 @@ import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.DynamicMessage;
 import com.google.protobuf.InvalidProtocolBufferException;
+import io.grpc.Status;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -224,11 +226,14 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       ProtoRows protoRows;
       List<org.joda.time.Instant> timestamps;
 
+      int failureCount;
+
       public AppendRowsContext(
           long offset, ProtoRows protoRows, List<org.joda.time.Instant> timestamps) {
         this.offset = offset;
         this.protoRows = protoRows;
         this.timestamps = timestamps;
+        this.failureCount = 0;
       }
     }
 
@@ -301,17 +306,20 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       }
 
       String getOrCreateStreamName() {
-        try {
-          if (!useDefaultStream) {
-            this.streamName =
-                Preconditions.checkStateNotNull(maybeDatasetService)
-                    .createWriteStream(tableUrn, Type.PENDING)
-                    .getName();
-          } else {
-            this.streamName = getDefaultStreamName();
+        if (Strings.isNullOrEmpty(this.streamName)) {
+          try {
+            if (!useDefaultStream) {
+              this.streamName =
+                  Preconditions.checkStateNotNull(maybeDatasetService)
+                      .createWriteStream(tableUrn, Type.PENDING)
+                      .getName();
+              this.currentOffset = 0;
+            } else {
+              this.streamName = getDefaultStreamName();
+            }
+          } catch (Exception e) {
+            throw new RuntimeException(e);
           }
-        } catch (Exception e) {
-          throw new RuntimeException(e);
         }
         return this.streamName;
       }
@@ -376,7 +384,6 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
               // This pin is "owned" by the current DoFn.
               Preconditions.checkStateNotNull(newAppendClientInfo.getStreamAppendClient()).pin();
             }
-            this.currentOffset = 0;
             nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
             this.appendClientInfo = newAppendClientInfo;
           }
@@ -507,6 +514,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
 
         long offset = -1;
         if (!this.useDefaultStream) {
+          getOrCreateStreamName(); // Force creation of the stream before we get offsets.
           offset = this.currentOffset;
           this.currentOffset += inserts.getSerializedRowsCount();
         }
@@ -598,7 +606,42 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                   streamName,
                   clientNumber,
                   retrieveErrorDetails(contexts));
+              failedContext.failureCount += 1;
+
+              // Maximum number of times we retry before we fail the work item.
+              if (failedContext.failureCount > 5) {
+                throw new RuntimeException("More than 5 attempts to call AppendRows failed.");
+              }
+
+              // The following errors are known to be persistent, so always fail the work item in
+              // this case.
+              Throwable error = Preconditions.checkStateNotNull(failedContext.getError());
+              Status.Code statusCode = Status.fromThrowable(error).getCode();
+              if (statusCode.equals(Status.Code.OUT_OF_RANGE)
+                  || statusCode.equals(Status.Code.ALREADY_EXISTS)) {
+                throw new RuntimeException(
+                    "Append to stream "
+                        + this.streamName
+                        + " failed with invalid "
+                        + "offset of "
+                        + failedContext.offset);
+              }
+
+              boolean streamDoesNotExist =
+                  failedContext.getError() instanceof Exceptions.StreamFinalizedException
+                      || statusCode.equals(Status.Code.INVALID_ARGUMENT)
+                      || statusCode.equals(Status.Code.NOT_FOUND)
+                      || statusCode.equals(Status.Code.FAILED_PRECONDITION);
+              if (streamDoesNotExist) {
+                throw new RuntimeException(
+                    "Append to stream "
+                        + this.streamName
+                        + " failed with stream "
+                        + "doesn't exist");
+              }
+
               invalidateWriteStream();
+
               appendFailures.inc();
               return RetryType.RETRY_ALL_OPERATIONS;
             },
@@ -629,7 +672,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
       String retrieveErrorDetails(Iterable<AppendRowsContext> failedContext) {
         return StreamSupport.stream(failedContext.spliterator(), false)
             .<@Nullable Throwable>map(AppendRowsContext::getError)
-            .filter(err -> err != null)
+            .filter(Objects::nonNull)
             .map(
                 thrw ->
                     Preconditions.checkStateNotNull(thrw).toString()