You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/04/19 02:50:35 UTC
[beam] branch master updated: Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4b709d5456b Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables
4b709d5456b is described below
commit 4b709d5456b105ffcc251da7a0a4a0b560491b1c
Author: Minbo Bae <49...@users.noreply.github.com>
AuthorDate: Tue Apr 19 11:50:24 2022 +0900
Merge pull request #17382: [BEAM-12356] Close DatasetService leak as local variables
---
.../sdk/io/gcp/bigquery/BigQueryQueryHelper.java | 195 +++++++++++----------
.../io/gcp/bigquery/BigQueryQuerySourceDef.java | 17 +-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 55 +++---
3 files changed, 138 insertions(+), 129 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
index b78afc514ae..a42eea20052 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
@@ -96,105 +96,112 @@ class BigQueryQueryHelper {
throws InterruptedException, IOException {
// Step 1: Find the effective location of the query.
String effectiveLocation = location;
- DatasetService tableService = bqServices.getDatasetService(options);
- if (effectiveLocation == null) {
- List<TableReference> referencedTables =
- dryRunQueryIfNeeded(
- bqServices,
- options,
- dryRunJobStats,
- query,
- flattenResults,
- useLegacySql,
- location)
- .getQuery()
- .getReferencedTables();
- if (referencedTables != null && !referencedTables.isEmpty()) {
- TableReference referencedTable = referencedTables.get(0);
- effectiveLocation =
- tableService
- .getDataset(referencedTable.getProjectId(), referencedTable.getDatasetId())
- .getLocation();
+ try (DatasetService tableService = bqServices.getDatasetService(options)) {
+ if (effectiveLocation == null) {
+ List<TableReference> referencedTables =
+ dryRunQueryIfNeeded(
+ bqServices,
+ options,
+ dryRunJobStats,
+ query,
+ flattenResults,
+ useLegacySql,
+ location)
+ .getQuery()
+ .getReferencedTables();
+ if (referencedTables != null && !referencedTables.isEmpty()) {
+ TableReference referencedTable = referencedTables.get(0);
+ effectiveLocation =
+ tableService
+ .getDataset(referencedTable.getProjectId(), referencedTable.getDatasetId())
+ .getLocation();
+ }
}
- }
- // Step 2: Create a temporary dataset in the query location only if the user has not specified a
- // temp dataset.
- String queryJobId =
- BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.QUERY);
- Optional<String> queryTempDatasetOpt = Optional.ofNullable(queryTempDatasetId);
- TableReference queryResultTable =
- createTempTableReference(
- options.getBigQueryProject() == null
- ? options.getProject()
- : options.getBigQueryProject(),
- queryJobId,
- queryTempDatasetOpt);
-
- boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
- // Create dataset only if it has not been set by the user
- if (beamToCreateTempDataset) {
- LOG.info("Creating temporary dataset {} for query results", queryResultTable.getDatasetId());
-
- tableService.createDataset(
- queryResultTable.getProjectId(),
- queryResultTable.getDatasetId(),
- effectiveLocation,
- "Temporary tables for query results of job " + options.getJobName(),
- TimeUnit.DAYS.toMillis(1));
- } else { // If the user specified a temp dataset, check that the destination table does not
- // exist
- Table destTable = tableService.getTable(queryResultTable);
- checkArgument(
- destTable == null,
- "Refusing to write on existing table {} in the specified temp dataset {}",
- queryResultTable.getTableId(),
- queryResultTable.getDatasetId());
- }
+ // Step 2: Create a temporary dataset in the query location only if the user has not specified
+ // a temp dataset.
+ String queryJobId =
+ BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.QUERY);
+ Optional<String> queryTempDatasetOpt = Optional.ofNullable(queryTempDatasetId);
+ TableReference queryResultTable =
+ createTempTableReference(
+ options.getBigQueryProject() == null
+ ? options.getProject()
+ : options.getBigQueryProject(),
+ queryJobId,
+ queryTempDatasetOpt);
+
+ boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
+ // Create dataset only if it has not been set by the user
+ if (beamToCreateTempDataset) {
+ LOG.info(
+ "Creating temporary dataset {} for query results", queryResultTable.getDatasetId());
+
+ tableService.createDataset(
+ queryResultTable.getProjectId(),
+ queryResultTable.getDatasetId(),
+ effectiveLocation,
+ "Temporary tables for query results of job " + options.getJobName(),
+ TimeUnit.DAYS.toMillis(1));
+ } else { // If the user specified a temp dataset, check that the destination table does not
+ // exist
+ Table destTable = tableService.getTable(queryResultTable);
+ checkArgument(
+ destTable == null,
+ "Refusing to write on existing table {} in the specified temp dataset {}",
+ queryResultTable.getTableId(),
+ queryResultTable.getDatasetId());
+ }
- // Step 3: Execute the query. Generate a transient (random) query job ID, because this code may
- // be retried after the temporary dataset and table have been deleted by a previous attempt --
- // in that case, we want to regenerate the temporary dataset and table, and we'll need a fresh
- // query ID to do that.
- LOG.info(
- "Exporting query results into temporary table {} using job {}",
- queryResultTable,
- queryJobId);
-
- JobReference jobReference =
- new JobReference()
- .setProjectId(
- options.getBigQueryProject() == null
- ? options.getProject()
- : options.getBigQueryProject())
- .setLocation(effectiveLocation)
- .setJobId(queryJobId);
-
- JobConfigurationQuery queryConfiguration =
- createBasicQueryConfig(query, flattenResults, useLegacySql)
- .setAllowLargeResults(true)
- .setDestinationTable(queryResultTable)
- .setCreateDisposition("CREATE_IF_NEEDED")
- .setWriteDisposition("WRITE_TRUNCATE")
- .setPriority(priority.name());
-
- if (kmsKey != null) {
- queryConfiguration.setDestinationEncryptionConfiguration(
- new EncryptionConfiguration().setKmsKeyName(kmsKey));
- }
+ // Step 3: Execute the query. Generate a transient (random) query job ID, because this code
+ // may be retried after the temporary dataset and table have been deleted by a previous
+ // attempt -- in that case, we want to regenerate the temporary dataset and table, and we'll
+ // need a fresh query ID to do that.
+ LOG.info(
+ "Exporting query results into temporary table {} using job {}",
+ queryResultTable,
+ queryJobId);
+
+ JobReference jobReference =
+ new JobReference()
+ .setProjectId(
+ options.getBigQueryProject() == null
+ ? options.getProject()
+ : options.getBigQueryProject())
+ .setLocation(effectiveLocation)
+ .setJobId(queryJobId);
+
+ JobConfigurationQuery queryConfiguration =
+ createBasicQueryConfig(query, flattenResults, useLegacySql)
+ .setAllowLargeResults(true)
+ .setDestinationTable(queryResultTable)
+ .setCreateDisposition("CREATE_IF_NEEDED")
+ .setWriteDisposition("WRITE_TRUNCATE")
+ .setPriority(priority.name());
+
+ if (kmsKey != null) {
+ queryConfiguration.setDestinationEncryptionConfiguration(
+ new EncryptionConfiguration().setKmsKeyName(kmsKey));
+ }
- JobService jobService = bqServices.getJobService(options);
- jobService.startQueryJob(jobReference, queryConfiguration);
- Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
- if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
- throw new IOException(
- String.format(
- "Query job %s failed, status: %s",
- queryJobId, BigQueryHelpers.statusToPrettyString(job.getStatus())));
- }
+ JobService jobService = bqServices.getJobService(options);
+ jobService.startQueryJob(jobReference, queryConfiguration);
+ Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
+ if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
+ throw new IOException(
+ String.format(
+ "Query job %s failed, status: %s",
+ queryJobId, BigQueryHelpers.statusToPrettyString(job.getStatus())));
+ }
- LOG.info("Query job {} completed", queryJobId);
- return queryResultTable;
+ LOG.info("Query job {} completed", queryJobId);
+ return queryResultTable;
+
+ } catch (RuntimeException | IOException | InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private static JobConfigurationQuery createBasicQueryConfig(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 2da260eabad..41c9caf2556 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -135,14 +135,15 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
bqOptions.getJobName(), stepUuid, JobType.QUERY),
queryTempDatasetOpt);
- BigQueryServices.DatasetService tableService = bqServices.getDatasetService(bqOptions);
- LOG.info("Deleting temporary table with query results {}", tableToRemove);
- tableService.deleteTable(tableToRemove);
- boolean datasetCreatedByBeam = !queryTempDatasetOpt.isPresent();
- if (datasetCreatedByBeam) {
- // Remove temporary dataset only if it was created by Beam
- LOG.info("Deleting temporary dataset with query results {}", tableToRemove.getDatasetId());
- tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+ try (BigQueryServices.DatasetService tableService = bqServices.getDatasetService(bqOptions)) {
+ LOG.info("Deleting temporary table with query results {}", tableToRemove);
+ tableService.deleteTable(tableToRemove);
+ boolean datasetCreatedByBeam = !queryTempDatasetOpt.isPresent();
+ if (datasetCreatedByBeam) {
+ // Remove temporary dataset only if it was created by Beam
+ LOG.info("Deleting temporary dataset with query results {}", tableToRemove.getDatasetId());
+ tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+ }
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 49d421fde49..ec5c7176a19 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -114,34 +114,35 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
protected ExtractResult extractFiles(PipelineOptions options) throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
- BigQueryServices.DatasetService datasetService = bqServices.getDatasetService(bqOptions);
- Table table = datasetService.getTable(tableToExtract);
- if (table == null) {
- throw new IOException(
- String.format(
- "Cannot start an export job since table %s does not exist",
- BigQueryHelpers.toTableSpec(tableToExtract)));
- }
+ try (BigQueryServices.DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
+ Table table = datasetService.getTable(tableToExtract);
+ if (table == null) {
+ throw new IOException(
+ String.format(
+ "Cannot start an export job since table %s does not exist",
+ BigQueryHelpers.toTableSpec(tableToExtract)));
+ }
- TableSchema schema = table.getSchema();
- JobService jobService = bqServices.getJobService(bqOptions);
- String extractJobId =
- BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.EXPORT);
- final String extractDestinationDir =
- resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
- String bqLocation =
- BigQueryHelpers.getDatasetLocation(
- datasetService, tableToExtract.getProjectId(), tableToExtract.getDatasetId());
- List<ResourceId> tempFiles =
- executeExtract(
- extractJobId,
- tableToExtract,
- jobService,
- bqOptions.getProject(),
- extractDestinationDir,
- bqLocation,
- useAvroLogicalTypes);
- return new ExtractResult(schema, tempFiles);
+ TableSchema schema = table.getSchema();
+ JobService jobService = bqServices.getJobService(bqOptions);
+ String extractJobId =
+ BigQueryResourceNaming.createJobIdPrefix(options.getJobName(), stepUuid, JobType.EXPORT);
+ final String extractDestinationDir =
+ resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, tableToExtract.getProjectId(), tableToExtract.getDatasetId());
+ List<ResourceId> tempFiles =
+ executeExtract(
+ extractJobId,
+ tableToExtract,
+ jobService,
+ bqOptions.getProject(),
+ extractDestinationDir,
+ bqLocation,
+ useAvroLogicalTypes);
+ return new ExtractResult(schema, tempFiles);
+ }
}
@Override