You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/16 19:32:25 UTC
[beam] branch master updated: Merge pull request #17520 from BEAM-12356 Close DatasetService leaked with getTable
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e84bd611225 Merge pull request #17520 from BEAM-12356 Close DatasetService leaked with getTable
e84bd611225 is described below
commit e84bd61122576469bf6d673fe6cab0862d727c66
Author: Minbo Bae <49...@users.noreply.github.com>
AuthorDate: Mon May 16 12:32:17 2022 -0700
Merge pull request #17520 from BEAM-12356 Close DatasetService leaked with getTable
---
.../sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java | 5 ++++-
.../sdk/io/gcp/bigquery/BigQueryStorageTableSource.java | 7 +++++--
.../beam/sdk/io/gcp/bigquery/BigQueryTableSource.java | 17 ++++++++++-------
.../sdk/io/gcp/bigquery/BigQueryTableSourceDef.java | 12 +++++++-----
4 files changed, 26 insertions(+), 15 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 120adc107fa..ee7f02ef668 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -28,6 +28,7 @@ import java.io.ObjectInputStream;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -173,7 +174,9 @@ class BigQueryStorageQuerySource<T> extends BigQueryStorageSourceBase<T> {
location,
queryTempDataset,
kmsKey);
- return bqServices.getDatasetService(options).getTable(queryResultTable);
+ try (DatasetService datasetService = bqServices.getDatasetService(options)) {
+ return datasetService.getTable(queryResultTable);
+ }
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index c53cab3d2c1..dcb3dfdbe1e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -28,6 +28,7 @@ import java.io.ObjectInputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -183,8 +184,10 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T>
? options.getProject()
: options.getBigQueryProject());
}
- Table table = bqServices.getDatasetService(options).getTable(tableReference);
- cachedTable.compareAndSet(null, table);
+ try (DatasetService datasetService = bqServices.getDatasetService(options)) {
+ Table table = datasetService.getTable(tableReference);
+ cachedTable.compareAndSet(null, table);
+ }
}
return cachedTable.get();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 86aa87ed937..7f16baf5c1a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableReference;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -70,14 +71,16 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
if (tableSizeBytes.get() == null) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableRef = tableDef.getTableReference(bqOptions);
- Table table = bqServices.getDatasetService(bqOptions).getTable(tableRef);
- Long numBytes = table.getNumBytes();
- if (table.getStreamingBuffer() != null
- && table.getStreamingBuffer().getEstimatedBytes() != null) {
- numBytes += table.getStreamingBuffer().getEstimatedBytes().longValue();
- }
+ try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
+ Table table = datasetService.getTable(tableRef);
+ Long numBytes = table.getNumBytes();
+ if (table.getStreamingBuffer() != null
+ && table.getStreamingBuffer().getEstimatedBytes() != null) {
+ numBytes += table.getStreamingBuffer().getEstimatedBytes().longValue();
+ }
- tableSizeBytes.compareAndSet(null, numBytes);
+ tableSizeBytes.compareAndSet(null, numBytes);
+ }
}
return tableSizeBytes.get();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
index 435876f2597..9d86afb95ca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -106,11 +107,12 @@ class BigQueryTableSourceDef implements BigQuerySourceDef {
@Override
public Schema getBeamSchema(BigQueryOptions bqOptions) {
try {
- TableReference tableRef = getTableReference(bqOptions);
- TableSchema tableSchema =
- bqServices.getDatasetService(bqOptions).getTable(tableRef).getSchema();
- return BigQueryUtils.fromTableSchema(tableSchema);
- } catch (IOException | InterruptedException | NullPointerException e) {
+ try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
+ TableReference tableRef = getTableReference(bqOptions);
+ TableSchema tableSchema = datasetService.getTable(tableRef).getSchema();
+ return BigQueryUtils.fromTableSchema(tableSchema);
+ }
+ } catch (Exception e) {
throw new BigQuerySchemaRetrievalException("Exception while trying to retrieve schema", e);
}
}