You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/12/01 23:13:07 UTC
[beam] branch master updated: Capture full response context to provide complete error information (#24416)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b9f2055897b Capture full response context to provide complete error information (#24416)
b9f2055897b is described below
commit b9f2055897b6dffd472125ece50609ecf70f5db4
Author: pablo rodriguez defino <pr...@gmail.com>
AuthorDate: Thu Dec 1 15:13:00 2022 -0800
Capture full response context to provide complete error information (#24416)
* capturing the full context failures to provide full picture of what has failed in the insert.
* fix checks
---
.../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 99317a3fb23..4b9a80d2ab2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -451,8 +452,6 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
DynamicMessage.parseFrom(
Preconditions.checkStateNotNull(appendClientInfo).descriptor,
protoBytes));
- new BigQueryStorageApiInsertError(
- failedRow, error.getRowIndexToErrorMessage().get(failedIndex));
failedRowsReceiver.output(
new BigQueryStorageApiInsertError(
failedRow, error.getRowIndexToErrorMessage().get(failedIndex)));
@@ -488,7 +487,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
"Append to stream {} by client #{} failed with error, operations will be retried. Details: {}",
streamName,
clientNumber,
- retrieveErrorDetails(failedContext));
+ retrieveErrorDetails(contexts));
invalidateWriteStream();
appendFailures.inc();
return RetryType.RETRY_ALL_OPERATIONS;
@@ -501,13 +500,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
return inserts.getSerializedRowsCount();
}
- String retrieveErrorDetails(AppendRowsContext failedContext) {
- return (failedContext.getError() != null)
- ? Arrays.stream(
- Preconditions.checkStateNotNull(failedContext.getError()).getStackTrace())
- .map(StackTraceElement::toString)
- .collect(Collectors.joining("\n"))
- : "no execption";
+ String retrieveErrorDetails(Iterable<AppendRowsContext> failedContext) {
+ return StreamSupport.stream(failedContext.spliterator(), false)
+ .<@Nullable Throwable>map(AppendRowsContext::getError)
+ .filter(err -> err != null)
+ .flatMap(thrw -> Arrays.stream(Preconditions.checkStateNotNull(thrw).getStackTrace()))
+ .map(StackTraceElement::toString)
+ .collect(Collectors.joining("\n"));
}
}