You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/04/22 17:14:44 UTC
[beam] branch master updated: Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 00320aa7793 Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness
00320aa7793 is described below
commit 00320aa7793ed4322a3fd029864dd300ac8bec00
Author: Reuven Lax <re...@google.com>
AuthorDate: Fri Apr 22 10:14:34 2022 -0700
Merge pull request #17428: [BEAM-14326] Make sure BigQuery daemon thread doesn't exit suddenly, as this leads to pipeline stuckness
---
.../StorageApiDynamicDestinationsTableRow.java | 10 +++++-
.../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 42 ++++++++++++++--------
2 files changed, 36 insertions(+), 16 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index 281a7adb529..c48dbe0dedb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowToStorageApiProto.SchemaTooNarrowException;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,10 +75,10 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
{
tableSchema = getSchema(destination);
+ TableReference tableReference = getTable(destination).getTableReference();
if (tableSchema == null) {
// If the table already exists, then try and fetch the schema from the existing
// table.
- TableReference tableReference = getTable(destination).getTableReference();
tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
if (tableSchema == null) {
if (createDisposition == CreateDisposition.CREATE_NEVER) {
@@ -95,7 +96,14 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT>
+ "using a create disposition of CREATE_IF_NEEDED.");
}
}
+ } else {
+ // Make sure we register this schema with the cache, unless there's already a more
+ // up-to-date schema.
+ tableSchema =
+ MoreObjects.firstNonNull(
+ SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema);
}
+
descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
index 83246b26de2..e1775af2289 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
@@ -178,6 +178,21 @@ public class TableSchemaCache {
return schemaHolder.map(SchemaHolder::getTableSchema).orElse(null);
}
+ /**
+ * Registers schema for a table if one is not already present. If a schema is already in the
+ * cache, returns the existing schema, otherwise returns null.
+ */
+ @Nullable
+ public TableSchema putSchemaIfAbsent(TableReference tableReference, TableSchema tableSchema) {
+ final String key = tableKey(tableReference);
+ Optional<SchemaHolder> existing =
+ runUnderMonitor(
+ () ->
+ Optional.ofNullable(
+ this.cachedSchemas.putIfAbsent(key, SchemaHolder.of(tableSchema, 0))));
+ return existing.map(SchemaHolder::getTableSchema).orElse(null);
+ }
+
public void refreshSchema(TableReference tableReference, DatasetService datasetService) {
int targetVersion =
runUnderMonitor(
@@ -187,13 +202,11 @@ public class TableSchemaCache {
"Cannot call refreshSchema after the object has been stopped!");
}
String key = tableKey(tableReference);
- SchemaHolder schemaHolder = cachedSchemas.get(key);
- if (schemaHolder == null) {
- throw new RuntimeException("Can't refresh unknown schema!");
- }
- tablesToRefresh.put(key, Refresh.of(datasetService, schemaHolder.getVersion() + 1));
+ @Nullable SchemaHolder schemaHolder = cachedSchemas.get(key);
+ int nextVersion = schemaHolder != null ? schemaHolder.getVersion() + 1 : 0;
+ tablesToRefresh.put(key, Refresh.of(datasetService, nextVersion));
// Wait at least until the next version.
- return schemaHolder.getVersion() + 1;
+ return nextVersion;
});
waitForRefresh(tableReference, targetVersion);
}
@@ -237,11 +250,9 @@ public class TableSchemaCache {
.entrySet()
.removeIf(
entry -> {
- SchemaHolder schemaHolder = cachedSchemas.get(entry.getKey());
- if (schemaHolder == null) {
- throw new RuntimeException("Unexpected null schema for " + entry.getKey());
- }
- return schemaHolder.getVersion() >= entry.getValue().getTargetVersion();
+ @Nullable SchemaHolder schemaHolder = cachedSchemas.get(entry.getKey());
+ return schemaHolder != null
+ && schemaHolder.getVersion() >= entry.getValue().getTargetVersion();
});
} finally {
tableUpdateMonitor.leave();
@@ -272,10 +283,11 @@ public class TableSchemaCache {
if (timeRemaining.getMillis() > 0) {
Thread.sleep(timeRemaining.getMillis());
}
- } catch (InterruptedException e) {
- runUnderMonitor(() -> this.stopped = true);
- return;
- } catch (IOException e) {
+ } catch (Exception e) {
+ // Since this is a daemon thread, don't exit until it is explicitly shut down. Exiting early
+ // can cause the
+ // pipeline to stall.
+ LOG.error("Caught exception in BigQuery's table schema cache refresh thread: " + e);
}
this.refreshExecutor.submit(this::refreshThread);
}