You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jr...@apache.org on 2023/05/04 17:35:48 UTC
[beam] branch release-2.47.0 updated: Merge pull request #26503: fix dataloss bug in batch Storage API sink. (#26512)
This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch release-2.47.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.47.0 by this push:
new e80e46a00d0 Merge pull request #26503: fix dataloss bug in batch Storage API sink. (#26512)
e80e46a00d0 is described below
commit e80e46a00d0de562672d501afa2e11c4a77e625a
Author: Bruno Volpato <bv...@google.com>
AuthorDate: Thu May 4 13:35:39 2023 -0400
Merge pull request #26503: fix dataloss bug in batch Storage API sink. (#26512)
Co-authored-by: reuvenlax <re...@google.com>
---
.../bigquery/StorageApiWriteUnshardedRecords.java | 67 ++++++++++++++++++----
1 file changed, 55 insertions(+), 12 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index f0015cddc38..3c082752449 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -31,11 +31,13 @@ import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
+import io.grpc.Status;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -224,11 +226,14 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
ProtoRows protoRows;
List<org.joda.time.Instant> timestamps;
+ int failureCount;
+
public AppendRowsContext(
long offset, ProtoRows protoRows, List<org.joda.time.Instant> timestamps) {
this.offset = offset;
this.protoRows = protoRows;
this.timestamps = timestamps;
+ this.failureCount = 0;
}
}
@@ -301,17 +306,20 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
}
String getOrCreateStreamName() {
- try {
- if (!useDefaultStream) {
- this.streamName =
- Preconditions.checkStateNotNull(maybeDatasetService)
- .createWriteStream(tableUrn, Type.PENDING)
- .getName();
- } else {
- this.streamName = getDefaultStreamName();
+ if (Strings.isNullOrEmpty(this.streamName)) {
+ try {
+ if (!useDefaultStream) {
+ this.streamName =
+ Preconditions.checkStateNotNull(maybeDatasetService)
+ .createWriteStream(tableUrn, Type.PENDING)
+ .getName();
+ this.currentOffset = 0;
+ } else {
+ this.streamName = getDefaultStreamName();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- } catch (Exception e) {
- throw new RuntimeException(e);
}
return this.streamName;
}
@@ -376,7 +384,6 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
// This pin is "owned" by the current DoFn.
Preconditions.checkStateNotNull(newAppendClientInfo.getStreamAppendClient()).pin();
}
- this.currentOffset = 0;
nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
this.appendClientInfo = newAppendClientInfo;
}
@@ -507,6 +514,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
long offset = -1;
if (!this.useDefaultStream) {
+ getOrCreateStreamName(); // Force creation of the stream before we get offsets.
offset = this.currentOffset;
this.currentOffset += inserts.getSerializedRowsCount();
}
@@ -598,7 +606,42 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
streamName,
clientNumber,
retrieveErrorDetails(contexts));
+ failedContext.failureCount += 1;
+
+ // Maximum number of times we retry before we fail the work item.
+ if (failedContext.failureCount > 5) {
+ throw new RuntimeException("More than 5 attempts to call AppendRows failed.");
+ }
+
+ // The following errors are known to be persistent, so always fail the work item in
+ // this case.
+ Throwable error = Preconditions.checkStateNotNull(failedContext.getError());
+ Status.Code statusCode = Status.fromThrowable(error).getCode();
+ if (statusCode.equals(Status.Code.OUT_OF_RANGE)
+ || statusCode.equals(Status.Code.ALREADY_EXISTS)) {
+ throw new RuntimeException(
+ "Append to stream "
+ + this.streamName
+ + " failed with invalid "
+ + "offset of "
+ + failedContext.offset);
+ }
+
+ boolean streamDoesNotExist =
+ failedContext.getError() instanceof Exceptions.StreamFinalizedException
+ || statusCode.equals(Status.Code.INVALID_ARGUMENT)
+ || statusCode.equals(Status.Code.NOT_FOUND)
+ || statusCode.equals(Status.Code.FAILED_PRECONDITION);
+ if (streamDoesNotExist) {
+ throw new RuntimeException(
+ "Append to stream "
+ + this.streamName
+ + " failed with stream "
+ + "doesn't exist");
+ }
+
invalidateWriteStream();
+
appendFailures.inc();
return RetryType.RETRY_ALL_OPERATIONS;
},
@@ -629,7 +672,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
String retrieveErrorDetails(Iterable<AppendRowsContext> failedContext) {
return StreamSupport.stream(failedContext.spliterator(), false)
.<@Nullable Throwable>map(AppendRowsContext::getError)
- .filter(err -> err != null)
+ .filter(Objects::nonNull)
.map(
thrw ->
Preconditions.checkStateNotNull(thrw).toString()