You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/05/16 19:32:25 UTC

[beam] branch master updated: Merge pull request #17520 from BEAM-12356 Close DatasetService leaked with getTable

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e84bd611225 Merge pull request #17520 from BEAM-12356 Close DatasetService leaked with getTable
e84bd611225 is described below

commit e84bd61122576469bf6d673fe6cab0862d727c66
Author: Minbo Bae <49...@users.noreply.github.com>
AuthorDate: Mon May 16 12:32:17 2022 -0700

    Merge pull request #17520 from BEAM-12356 Close DatasetService leaked with getTable
---
 .../sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java |  5 ++++-
 .../sdk/io/gcp/bigquery/BigQueryStorageTableSource.java |  7 +++++--
 .../beam/sdk/io/gcp/bigquery/BigQueryTableSource.java   | 17 ++++++++++-------
 .../sdk/io/gcp/bigquery/BigQueryTableSourceDef.java     | 12 +++++++-----
 4 files changed, 26 insertions(+), 15 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 120adc107fa..ee7f02ef668 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -28,6 +28,7 @@ import java.io.ObjectInputStream;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -173,7 +174,9 @@ class BigQueryStorageQuerySource<T> extends BigQueryStorageSourceBase<T> {
             location,
             queryTempDataset,
             kmsKey);
-    return bqServices.getDatasetService(options).getTable(queryResultTable);
+    try (DatasetService datasetService = bqServices.getDatasetService(options)) {
+      return datasetService.getTable(queryResultTable);
+    }
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index c53cab3d2c1..dcb3dfdbe1e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -28,6 +28,7 @@ import java.io.ObjectInputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -183,8 +184,10 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T>
                 ? options.getProject()
                 : options.getBigQueryProject());
       }
-      Table table = bqServices.getDatasetService(options).getTable(tableReference);
-      cachedTable.compareAndSet(null, table);
+      try (DatasetService datasetService = bqServices.getDatasetService(options)) {
+        Table table = datasetService.getTable(tableReference);
+        cachedTable.compareAndSet(null, table);
+      }
     }
 
     return cachedTable.get();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 86aa87ed937..7f16baf5c1a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -70,14 +71,16 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
     if (tableSizeBytes.get() == null) {
       BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
       TableReference tableRef = tableDef.getTableReference(bqOptions);
-      Table table = bqServices.getDatasetService(bqOptions).getTable(tableRef);
-      Long numBytes = table.getNumBytes();
-      if (table.getStreamingBuffer() != null
-          && table.getStreamingBuffer().getEstimatedBytes() != null) {
-        numBytes += table.getStreamingBuffer().getEstimatedBytes().longValue();
-      }
+      try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
+        Table table = datasetService.getTable(tableRef);
+        Long numBytes = table.getNumBytes();
+        if (table.getStreamingBuffer() != null
+            && table.getStreamingBuffer().getEstimatedBytes() != null) {
+          numBytes += table.getStreamingBuffer().getEstimatedBytes().longValue();
+        }
 
-      tableSizeBytes.compareAndSet(null, numBytes);
+        tableSizeBytes.compareAndSet(null, numBytes);
+      }
     }
     return tableSizeBytes.get();
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
index 435876f2597..9d86afb95ca 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -106,11 +107,12 @@ class BigQueryTableSourceDef implements BigQuerySourceDef {
   @Override
   public Schema getBeamSchema(BigQueryOptions bqOptions) {
     try {
-      TableReference tableRef = getTableReference(bqOptions);
-      TableSchema tableSchema =
-          bqServices.getDatasetService(bqOptions).getTable(tableRef).getSchema();
-      return BigQueryUtils.fromTableSchema(tableSchema);
-    } catch (IOException | InterruptedException | NullPointerException e) {
+      try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
+        TableReference tableRef = getTableReference(bqOptions);
+        TableSchema tableSchema = datasetService.getTable(tableRef).getSchema();
+        return BigQueryUtils.fromTableSchema(tableSchema);
+      }
+    } catch (Exception e) {
       throw new BigQuerySchemaRetrievalException("Exception while trying to retrieve schema", e);
     }
   }