You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/19 00:52:53 UTC

[GitHub] [beam] reuvenlax commented on a diff in pull request #17382: [BEAM-12356] Close DatasetService leak as local variables

reuvenlax commented on code in PR #17382:
URL: https://github.com/apache/beam/pull/17382#discussion_r852510070


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java:
##########
@@ -96,105 +96,112 @@ public static TableReference executeQuery(
       throws InterruptedException, IOException {
     // Step 1: Find the effective location of the query.
     String effectiveLocation = location;
-    DatasetService tableService = bqServices.getDatasetService(options);
-    if (effectiveLocation == null) {
-      List<TableReference> referencedTables =
-          dryRunQueryIfNeeded(
-                  bqServices,
-                  options,
-                  dryRunJobStats,
-                  query,
-                  flattenResults,
-                  useLegacySql,
-                  location)
-              .getQuery()
-              .getReferencedTables();
-      if (referencedTables != null && !referencedTables.isEmpty()) {
-        TableReference referencedTable = referencedTables.get(0);
-        effectiveLocation =
-            tableService
-                .getDataset(referencedTable.getProjectId(), referencedTable.getDatasetId())
-                .getLocation();
+    try (DatasetService tableService = bqServices.getDatasetService(options)) {
+      if (effectiveLocation == null) {
+        List<TableReference> referencedTables =
+            dryRunQueryIfNeeded(
+                    bqServices,
+                    options,
+                    dryRunJobStats,
+                    query,
+                    flattenResults,
+                    useLegacySql,
+                    location)
+                .getQuery()
+                .getReferencedTables();
+        if (referencedTables != null && !referencedTables.isEmpty()) {
+          TableReference referencedTable = referencedTables.get(0);
+          effectiveLocation =
+              tableService
+                  .getDataset(referencedTable.getProjectId(), referencedTable.getDatasetId())
+                  .getLocation();
+        }
       }
-    }
 
-    // Step 2: Create a temporary dataset in the query location only if the user has not specified a
-    // temp dataset.
-    String queryJobId =
-        BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.QUERY);
-    Optional<String> queryTempDatasetOpt = Optional.ofNullable(queryTempDatasetId);
-    TableReference queryResultTable =
-        createTempTableReference(
-            options.getBigQueryProject() == null
-                ? options.getProject()
-                : options.getBigQueryProject(),
-            queryJobId,
-            queryTempDatasetOpt);
-
-    boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
-    // Create dataset only if it has not been set by the user
-    if (beamToCreateTempDataset) {
-      LOG.info("Creating temporary dataset {} for query results", queryResultTable.getDatasetId());
-
-      tableService.createDataset(
-          queryResultTable.getProjectId(),
-          queryResultTable.getDatasetId(),
-          effectiveLocation,
-          "Temporary tables for query results of job " + options.getJobName(),
-          TimeUnit.DAYS.toMillis(1));
-    } else { // If the user specified a temp dataset, check that the destination table does not
-      // exist
-      Table destTable = tableService.getTable(queryResultTable);
-      checkArgument(
-          destTable == null,
-          "Refusing to write on existing table {} in the specified temp dataset {}",
-          queryResultTable.getTableId(),
-          queryResultTable.getDatasetId());
-    }
+      // Step 2: Create a temporary dataset in the query location only if the user has not specified
+      // a temp dataset.
+      String queryJobId =
+          BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.QUERY);
+      Optional<String> queryTempDatasetOpt = Optional.ofNullable(queryTempDatasetId);
+      TableReference queryResultTable =
+          createTempTableReference(
+              options.getBigQueryProject() == null
+                  ? options.getProject()
+                  : options.getBigQueryProject(),
+              queryJobId,
+              queryTempDatasetOpt);
+
+      boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
+      // Create dataset only if it has not been set by the user
+      if (beamToCreateTempDataset) {
+        LOG.info(
+            "Creating temporary dataset {} for query results", queryResultTable.getDatasetId());
+
+        tableService.createDataset(
+            queryResultTable.getProjectId(),
+            queryResultTable.getDatasetId(),
+            effectiveLocation,
+            "Temporary tables for query results of job " + options.getJobName(),
+            TimeUnit.DAYS.toMillis(1));
+      } else { // If the user specified a temp dataset, check that the destination table does not
+        // exist
+        Table destTable = tableService.getTable(queryResultTable);
+        checkArgument(
+            destTable == null,
+            "Refusing to write on existing table {} in the specified temp dataset {}",
+            queryResultTable.getTableId(),
+            queryResultTable.getDatasetId());
+      }
 
-    // Step 3: Execute the query. Generate a transient (random) query job ID, because this code may
-    // be retried after the temporary dataset and table have been deleted by a previous attempt --
-    // in that case, we want to regenerate the temporary dataset and table, and we'll need a fresh
-    // query ID to do that.
-    LOG.info(
-        "Exporting query results into temporary table {} using job {}",
-        queryResultTable,
-        queryJobId);
-
-    JobReference jobReference =
-        new JobReference()
-            .setProjectId(
-                options.getBigQueryProject() == null
-                    ? options.getProject()
-                    : options.getBigQueryProject())
-            .setLocation(effectiveLocation)
-            .setJobId(queryJobId);
-
-    JobConfigurationQuery queryConfiguration =
-        createBasicQueryConfig(query, flattenResults, useLegacySql)
-            .setAllowLargeResults(true)
-            .setDestinationTable(queryResultTable)
-            .setCreateDisposition("CREATE_IF_NEEDED")
-            .setWriteDisposition("WRITE_TRUNCATE")
-            .setPriority(priority.name());
-
-    if (kmsKey != null) {
-      queryConfiguration.setDestinationEncryptionConfiguration(
-          new EncryptionConfiguration().setKmsKeyName(kmsKey));
-    }
+      // Step 3: Execute the query. Generate a transient (random) query job ID, because this code
+      // may be retried after the temporary dataset and table have been deleted by a previous
+      // attempt -- in that case, we want to regenerate the temporary dataset and table, and we'll
+      // need a fresh query ID to do that.
+      LOG.info(
+          "Exporting query results into temporary table {} using job {}",
+          queryResultTable,
+          queryJobId);
+
+      JobReference jobReference =
+          new JobReference()
+              .setProjectId(
+                  options.getBigQueryProject() == null
+                      ? options.getProject()
+                      : options.getBigQueryProject())
+              .setLocation(effectiveLocation)
+              .setJobId(queryJobId);
+
+      JobConfigurationQuery queryConfiguration =
+          createBasicQueryConfig(query, flattenResults, useLegacySql)
+              .setAllowLargeResults(true)
+              .setDestinationTable(queryResultTable)
+              .setCreateDisposition("CREATE_IF_NEEDED")
+              .setWriteDisposition("WRITE_TRUNCATE")
+              .setPriority(priority.name());
+
+      if (kmsKey != null) {
+        queryConfiguration.setDestinationEncryptionConfiguration(
+            new EncryptionConfiguration().setKmsKeyName(kmsKey));
+      }
 
-    JobService jobService = bqServices.getJobService(options);
-    jobService.startQueryJob(jobReference, queryConfiguration);
-    Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
-    if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
-      throw new IOException(
-          String.format(
-              "Query job %s failed, status: %s",
-              queryJobId, BigQueryHelpers.statusToPrettyString(job.getStatus())));
-    }
+      JobService jobService = bqServices.getJobService(options);
+      jobService.startQueryJob(jobReference, queryConfiguration);
+      Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
+      if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
+        throw new IOException(
+            String.format(
+                "Query job %s failed, status: %s",
+                queryJobId, BigQueryHelpers.statusToPrettyString(job.getStatus())));
+      }
 
-    LOG.info("Query job {} completed", queryJobId);
-    return queryResultTable;
+      LOG.info("Query job {} completed", queryJobId);
+      return queryResultTable;
+
+    } catch (RuntimeException | IOException | InterruptedException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }

Review Comment:
   Are the new catch blocks needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org