You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/12/01 23:13:07 UTC

[beam] branch master updated: Capture full response context to provide complete error information (#24416)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b9f2055897b Capture full response context to provide complete error information (#24416)
b9f2055897b is described below

commit b9f2055897b6dffd472125ece50609ecf70f5db4
Author: pablo rodriguez defino <pr...@gmail.com>
AuthorDate: Thu Dec 1 15:13:00 2022 -0800

    Capture full response context to provide complete error information (#24416)
    
    * capturing the full context failures to provide full picture of what has failed in the insert.
    
    * fix checks
---
 .../gcp/bigquery/StorageApiWriteUnshardedRecords.java | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 99317a3fb23..4b9a80d2ab2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -451,8 +452,6 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                             DynamicMessage.parseFrom(
                                 Preconditions.checkStateNotNull(appendClientInfo).descriptor,
                                 protoBytes));
-                    new BigQueryStorageApiInsertError(
-                        failedRow, error.getRowIndexToErrorMessage().get(failedIndex));
                     failedRowsReceiver.output(
                         new BigQueryStorageApiInsertError(
                             failedRow, error.getRowIndexToErrorMessage().get(failedIndex)));
@@ -488,7 +487,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                   "Append to stream {} by client #{} failed with error, operations will be retried. Details: {}",
                   streamName,
                   clientNumber,
-                  retrieveErrorDetails(failedContext));
+                  retrieveErrorDetails(contexts));
               invalidateWriteStream();
               appendFailures.inc();
               return RetryType.RETRY_ALL_OPERATIONS;
@@ -501,13 +500,13 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
         return inserts.getSerializedRowsCount();
       }
 
-      String retrieveErrorDetails(AppendRowsContext failedContext) {
-        return (failedContext.getError() != null)
-            ? Arrays.stream(
-                    Preconditions.checkStateNotNull(failedContext.getError()).getStackTrace())
-                .map(StackTraceElement::toString)
-                .collect(Collectors.joining("\n"))
-            : "no execption";
+      String retrieveErrorDetails(Iterable<AppendRowsContext> failedContext) {
+        return StreamSupport.stream(failedContext.spliterator(), false)
+            .<@Nullable Throwable>map(AppendRowsContext::getError)
+            .filter(err -> err != null)
+            .flatMap(thrw -> Arrays.stream(Preconditions.checkStateNotNull(thrw).getStackTrace()))
+            .map(StackTraceElement::toString)
+            .collect(Collectors.joining("\n"));
       }
     }