You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/04/22 17:14:44 UTC

[beam] branch master updated: Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 00320aa7793 Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness
00320aa7793 is described below

commit 00320aa7793ed4322a3fd029864dd300ac8bec00
Author: Reuven Lax <re...@google.com>
AuthorDate: Fri Apr 22 10:14:34 2022 -0700

    Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness
---
 .../StorageApiDynamicDestinationsTableRow.java     | 10 +++++-
 .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 42 ++++++++++++++--------
 2 files changed, 36 insertions(+), 16 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index 281a7adb529..c48dbe0dedb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaTooNarrowException;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,10 +75,10 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
 
       {
         tableSchema = getSchema(destination);
+        TableReference tableReference = getTable(destination).getTableReference();
         if (tableSchema == null) {
           // If the table already exists, then try and fetch the schema from the existing
           // table.
-          TableReference tableReference = getTable(destination).getTableReference();
           tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
           if (tableSchema == null) {
             if (createDisposition == CreateDisposition.CREATE_NEVER) {
@@ -95,7 +96,14 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
                       + "using a create disposition of CREATE_IF_NEEDED.");
             }
           }
+        } else {
+          // Make sure we register this schema with the cache, unless there's already a more
+          // up-to-date schema.
+          tableSchema =
+              MoreObjects.firstNonNull(
+                  SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema);
         }
+
         descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
         descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
       }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
index 83246b26de2..e1775af2289 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
@@ -178,6 +178,21 @@ public class TableSchemaCache {
     return schemaHolder.map(SchemaHolder::getTableSchema).orElse(null);
   }
 
+  /**
+   * Registers schema for a table if one is not already present. If a schema is already in the
+   * cache, returns the existing schema, otherwise returns null.
+   */
+  @Nullable
+  public TableSchema putSchemaIfAbsent(TableReference tableReference, TableSchema tableSchema) {
+    final String key = tableKey(tableReference);
+    Optional<SchemaHolder> existing =
+        runUnderMonitor(
+            () ->
+                Optional.ofNullable(
+                    this.cachedSchemas.putIfAbsent(key, SchemaHolder.of(tableSchema, 0))));
+    return existing.map(SchemaHolder::getTableSchema).orElse(null);
+  }
+
   public void refreshSchema(TableReference tableReference, DatasetService datasetService) {
     int targetVersion =
         runUnderMonitor(
@@ -187,13 +202,11 @@ public class TableSchemaCache {
                     "Cannot call refreshSchema after the object has been stopped!");
               }
               String key = tableKey(tableReference);
-              SchemaHolder schemaHolder = cachedSchemas.get(key);
-              if (schemaHolder == null) {
-                throw new RuntimeException("Can't refresh unknown schema!");
-              }
-              tablesToRefresh.put(key, Refresh.of(datasetService, schemaHolder.getVersion() + 1));
+              @Nullable SchemaHolder schemaHolder = cachedSchemas.get(key);
+              int nextVersion = schemaHolder != null ? schemaHolder.getVersion() + 1 : 0;
+              tablesToRefresh.put(key, Refresh.of(datasetService, nextVersion));
               // Wait at least until the next version.
-              return schemaHolder.getVersion() + 1;
+              return nextVersion;
             });
     waitForRefresh(tableReference, targetVersion);
   }
@@ -237,11 +250,9 @@ public class TableSchemaCache {
               .entrySet()
               .removeIf(
                   entry -> {
-                    SchemaHolder schemaHolder = cachedSchemas.get(entry.getKey());
-                    if (schemaHolder == null) {
-                      throw new RuntimeException("Unexpected null schema for " + entry.getKey());
-                    }
-                    return schemaHolder.getVersion() >= entry.getValue().getTargetVersion();
+                    @Nullable SchemaHolder schemaHolder = cachedSchemas.get(entry.getKey());
+                    return schemaHolder != null
+                        && schemaHolder.getVersion() >= entry.getValue().getTargetVersion();
                   });
         } finally {
           tableUpdateMonitor.leave();
@@ -272,10 +283,11 @@ public class TableSchemaCache {
       if (timeRemaining.getMillis() > 0) {
         Thread.sleep(timeRemaining.getMillis());
       }
-    } catch (InterruptedException e) {
-      runUnderMonitor(() -> this.stopped = true);
-      return;
-    } catch (IOException e) {
+    } catch (Exception e) {
+      // Since this is a daemon thread, don't exit until it is explicitly shut down. Exiting early
+      // can cause the
+      // pipeline to stall.
+      LOG.error("Caught exception in BigQuery's table schema cache refresh thread: " + e);
     }
     this.refreshExecutor.submit(this::refreshThread);
   }