You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/04/19 02:50:35 UTC

[beam] branch master updated: Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b709d5456b Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables
4b709d5456b is described below

commit 4b709d5456b105ffcc251da7a0a4a0b560491b1c
Author: Minbo Bae <49...@users.noreply.github.com>
AuthorDate: Tue Apr 19 11:50:24 2022 +0900

    Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables
---
 .../sdk/io/gcp/bigquery/BigQueryQueryHelper.java   | 195 +++++++++++----------
 .../io/gcp/bigquery/BigQueryQuerySourceDef.java    |  17 +-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java    |  55 +++---
 3 files changed, 138 insertions(+), 129 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
index b78afc514ae..a42eea20052 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
@@ -96,105 +96,112 @@ class BigQueryQueryHelper {
       throws InterruptedException, IOException {
     // Step 1: Find the effective location of the query.
     String effectiveLocation = location;
-    DatasetService tableService = bqServices.getDatasetService(options);
-    if (effectiveLocation == null) {
-      List<TableReference> referencedTables =
-          dryRunQueryIfNeeded(
-                  bqServices,
-                  options,
-                  dryRunJobStats,
-                  query,
-                  flattenResults,
-                  useLegacySql,
-                  location)
-              .getQuery()
-              .getReferencedTables();
-      if (referencedTables != null && !referencedTables.isEmpty()) {
-        TableReference referencedTable = referencedTables.get(0);
-        effectiveLocation =
-            tableService
-                .getDataset(referencedTable.getProjectId(), referencedTable.getDatasetId())
-                .getLocation();
+    try (DatasetService tableService = bqServices.getDatasetService(options)) {
+      if (effectiveLocation == null) {
+        List<TableReference> referencedTables =
+            dryRunQueryIfNeeded(
+                    bqServices,
+                    options,
+                    dryRunJobStats,
+                    query,
+                    flattenResults,
+                    useLegacySql,
+                    location)
+                .getQuery()
+                .getReferencedTables();
+        if (referencedTables != null && !referencedTables.isEmpty()) {
+          TableReference referencedTable = referencedTables.get(0);
+          effectiveLocation =
+              tableService
+                  .getDataset(referencedTable.getProjectId(), referencedTable.getDatasetId())
+                  .getLocation();
+        }
       }
-    }
 
-    // Step 2: Create a temporary dataset in the query location only if the user has not specified a
-    // temp dataset.
-    String queryJobId =
-        BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.QUERY);
-    Optional<String> queryTempDatasetOpt = Optional.ofNullable(queryTempDatasetId);
-    TableReference queryResultTable =
-        createTempTableReference(
-            options.getBigQueryProject() == null
-                ? options.getProject()
-                : options.getBigQueryProject(),
-            queryJobId,
-            queryTempDatasetOpt);
-
-    boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
-    // Create dataset only if it has not been set by the user
-    if (beamToCreateTempDataset) {
-      LOG.info("Creating temporary dataset {} for query results", queryResultTable.getDatasetId());
-
-      tableService.createDataset(
-          queryResultTable.getProjectId(),
-          queryResultTable.getDatasetId(),
-          effectiveLocation,
-          "Temporary tables for query results of job " + options.getJobName(),
-          TimeUnit.DAYS.toMillis(1));
-    } else { // If the user specified a temp dataset, check that the destination table does not
-      // exist
-      Table destTable = tableService.getTable(queryResultTable);
-      checkArgument(
-          destTable == null,
-          "Refusing to write on existing table {} in the specified temp dataset {}",
-          queryResultTable.getTableId(),
-          queryResultTable.getDatasetId());
-    }
+      // Step 2: Create a temporary dataset in the query location only if the user has not specified
+      // a temp dataset.
+      String queryJobId =
+          BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.QUERY);
+      Optional<String> queryTempDatasetOpt = Optional.ofNullable(queryTempDatasetId);
+      TableReference queryResultTable =
+          createTempTableReference(
+              options.getBigQueryProject() == null
+                  ? options.getProject()
+                  : options.getBigQueryProject(),
+              queryJobId,
+              queryTempDatasetOpt);
+
+      boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
+      // Create dataset only if it has not been set by the user
+      if (beamToCreateTempDataset) {
+        LOG.info(
+            "Creating temporary dataset {} for query results", queryResultTable.getDatasetId());
+
+        tableService.createDataset(
+            queryResultTable.getProjectId(),
+            queryResultTable.getDatasetId(),
+            effectiveLocation,
+            "Temporary tables for query results of job " + options.getJobName(),
+            TimeUnit.DAYS.toMillis(1));
+      } else { // If the user specified a temp dataset, check that the destination table does not
+        // exist
+        Table destTable = tableService.getTable(queryResultTable);
+        checkArgument(
+            destTable == null,
+            "Refusing to write on existing table {} in the specified temp dataset {}",
+            queryResultTable.getTableId(),
+            queryResultTable.getDatasetId());
+      }
 
-    // Step 3: Execute the query. Generate a transient (random) query job ID, because this code may
-    // be retried after the temporary dataset and table have been deleted by a previous attempt --
-    // in that case, we want to regenerate the temporary dataset and table, and we'll need a fresh
-    // query ID to do that.
-    LOG.info(
-        "Exporting query results into temporary table {} using job {}",
-        queryResultTable,
-        queryJobId);
-
-    JobReference jobReference =
-        new JobReference()
-            .setProjectId(
-                options.getBigQueryProject() == null
-                    ? options.getProject()
-                    : options.getBigQueryProject())
-            .setLocation(effectiveLocation)
-            .setJobId(queryJobId);
-
-    JobConfigurationQuery queryConfiguration =
-        createBasicQueryConfig(query, flattenResults, useLegacySql)
-            .setAllowLargeResults(true)
-            .setDestinationTable(queryResultTable)
-            .setCreateDisposition("CREATE_IF_NEEDED")
-            .setWriteDisposition("WRITE_TRUNCATE")
-            .setPriority(priority.name());
-
-    if (kmsKey != null) {
-      queryConfiguration.setDestinationEncryptionConfiguration(
-          new EncryptionConfiguration().setKmsKeyName(kmsKey));
-    }
+      // Step 3: Execute the query. Generate a transient (random) query job ID, because this code
+      // may be retried after the temporary dataset and table have been deleted by a previous
+      // attempt -- in that case, we want to regenerate the temporary dataset and table, and we'll
+      // need a fresh query ID to do that.
+      LOG.info(
+          "Exporting query results into temporary table {} using job {}",
+          queryResultTable,
+          queryJobId);
+
+      JobReference jobReference =
+          new JobReference()
+              .setProjectId(
+                  options.getBigQueryProject() == null
+                      ? options.getProject()
+                      : options.getBigQueryProject())
+              .setLocation(effectiveLocation)
+              .setJobId(queryJobId);
+
+      JobConfigurationQuery queryConfiguration =
+          createBasicQueryConfig(query, flattenResults, useLegacySql)
+              .setAllowLargeResults(true)
+              .setDestinationTable(queryResultTable)
+              .setCreateDisposition("CREATE_IF_NEEDED")
+              .setWriteDisposition("WRITE_TRUNCATE")
+              .setPriority(priority.name());
+
+      if (kmsKey != null) {
+        queryConfiguration.setDestinationEncryptionConfiguration(
+            new EncryptionConfiguration().setKmsKeyName(kmsKey));
+      }
 
-    JobService jobService = bqServices.getJobService(options);
-    jobService.startQueryJob(jobReference, queryConfiguration);
-    Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
-    if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
-      throw new IOException(
-          String.format(
-              "Query job %s failed, status: %s",
-              queryJobId, BigQueryHelpers.statusToPrettyString(job.getStatus())));
-    }
+      JobService jobService = bqServices.getJobService(options);
+      jobService.startQueryJob(jobReference, queryConfiguration);
+      Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
+      if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
+        throw new IOException(
+            String.format(
+                "Query job %s failed, status: %s",
+                queryJobId, BigQueryHelpers.statusToPrettyString(job.getStatus())));
+      }
 
-    LOG.info("Query job {} completed", queryJobId);
-    return queryResultTable;
+      LOG.info("Query job {} completed", queryJobId);
+      return queryResultTable;
+
+    } catch (RuntimeException | IOException | InterruptedException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   private static JobConfigurationQuery createBasicQueryConfig(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 2da260eabad..41c9caf2556 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -135,14 +135,15 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
                 bqOptions.getJobName(), stepUuid, JobType.QUERY),
             queryTempDatasetOpt);
 
-    BigQueryServices.DatasetService tableService = bqServices.getDatasetService(bqOptions);
-    LOG.info("Deleting temporary table with query results {}", tableToRemove);
-    tableService.deleteTable(tableToRemove);
-    boolean datasetCreatedByBeam = !queryTempDatasetOpt.isPresent();
-    if (datasetCreatedByBeam) {
-      // Remove temporary dataset only if it was created by Beam
-      LOG.info("Deleting temporary dataset with query results {}", tableToRemove.getDatasetId());
-      tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+    try (BigQueryServices.DatasetService tableService = bqServices.getDatasetService(bqOptions)) {
+      LOG.info("Deleting temporary table with query results {}", tableToRemove);
+      tableService.deleteTable(tableToRemove);
+      boolean datasetCreatedByBeam = !queryTempDatasetOpt.isPresent();
+      if (datasetCreatedByBeam) {
+        // Remove temporary dataset only if it was created by Beam
+        LOG.info("Deleting temporary dataset with query results {}", tableToRemove.getDatasetId());
+        tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+      }
     }
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 49d421fde49..ec5c7176a19 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -114,34 +114,35 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
   protected ExtractResult extractFiles(PipelineOptions options) throws Exception {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     TableReference tableToExtract = getTableToExtract(bqOptions);
-    BigQueryServices.DatasetService datasetService = bqServices.getDatasetService(bqOptions);
-    Table table = datasetService.getTable(tableToExtract);
-    if (table == null) {
-      throw new IOException(
-          String.format(
-              "Cannot start an export job since table %s does not exist",
-              BigQueryHelpers.toTableSpec(tableToExtract)));
-    }
+    try (BigQueryServices.DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
+      Table table = datasetService.getTable(tableToExtract);
+      if (table == null) {
+        throw new IOException(
+            String.format(
+                "Cannot start an export job since table %s does not exist",
+                BigQueryHelpers.toTableSpec(tableToExtract)));
+      }
 
-    TableSchema schema = table.getSchema();
-    JobService jobService = bqServices.getJobService(bqOptions);
-    String extractJobId =
-        BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.EXPORT);
-    final String extractDestinationDir =
-        resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
-    String bqLocation =
-        BigQueryHelpers.getDatasetLocation(
-            datasetService, tableToExtract.getProjectId(), tableToExtract.getDatasetId());
-    List<ResourceId> tempFiles =
-        executeExtract(
-            extractJobId,
-            tableToExtract,
-            jobService,
-            bqOptions.getProject(),
-            extractDestinationDir,
-            bqLocation,
-            useAvroLogicalTypes);
-    return new ExtractResult(schema, tempFiles);
+      TableSchema schema = table.getSchema();
+      JobService jobService = bqServices.getJobService(bqOptions);
+      String extractJobId =
+          BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.EXPORT);
+      final String extractDestinationDir =
+          resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
+      String bqLocation =
+          BigQueryHelpers.getDatasetLocation(
+              datasetService, tableToExtract.getProjectId(), tableToExtract.getDatasetId());
+      List<ResourceId> tempFiles =
+          executeExtract(
+              extractJobId,
+              tableToExtract,
+              jobService,
+              bqOptions.getProject(),
+              extractDestinationDir,
+              bqLocation,
+              useAvroLogicalTypes);
+      return new ExtractResult(schema, tempFiles);
+    }
   }
 
   @Override