You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/03 22:15:47 UTC
[1/2] beam git commit: Remove job name usages from BigQueryIO at
pipeline construction time
Repository: beam
Updated Branches:
refs/heads/master 57f449c4c -> 17f0843eb
Remove job name usages from BigQueryIO at pipeline construction time
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ddf8d49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ddf8d49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ddf8d49
Branch: refs/heads/master
Commit: 0ddf8d49d94288e693494ac0685b0c6df78dcd3b
Parents: 57f449c
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Tue May 2 13:55:32 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 3 15:15:02 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 61 ++++-------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 32 +++------
.../io/gcp/bigquery/BigQueryQuerySource.java | 40 +++++-------
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 24 +++----
.../io/gcp/bigquery/BigQueryTableSource.java | 15 ++---
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 69 +++++++++++---------
.../sdk/io/gcp/bigquery/FakeJobService.java | 5 +-
7 files changed, 94 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index e04361c..3850cbd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -256,15 +256,6 @@ public class BigQueryHelpers {
}
}
- @VisibleForTesting
- static class BeamJobUuidToBigQueryJobUuid
- implements SerializableFunction<String, String> {
- @Override
- public String apply(String from) {
- return "beam_job_" + from;
- }
- }
-
static class TableSchemaToJsonSchema
implements SerializableFunction<TableSchema, String> {
@Override
@@ -297,14 +288,6 @@ public class BigQueryHelpers {
}
}
- static class TableRefToProjectId
- implements SerializableFunction<TableReference, String> {
- @Override
- public String apply(TableReference from) {
- return from.getProjectId();
- }
- }
-
@VisibleForTesting
static class TableSpecToTableRef
implements SerializableFunction<String, TableReference> {
@@ -314,39 +297,21 @@ public class BigQueryHelpers {
}
}
- @VisibleForTesting
- static class CreatePerBeamJobUuid
- implements SerializableFunction<String, String> {
- private final String stepUuid;
-
- CreatePerBeamJobUuid(String stepUuid) {
- this.stepUuid = stepUuid;
- }
-
- @Override
- public String apply(String jobUuid) {
- return stepUuid + "_" + jobUuid.replaceAll("-", "");
- }
+ static String createJobIdToken(String jobName, String stepUuid) {
+ return String.format("beam_job_%s_%s", stepUuid, jobName.replaceAll("-", ""));
}
- @VisibleForTesting
- static class CreateJsonTableRefFromUuid
- implements SerializableFunction<String, TableReference> {
- private final String executingProject;
-
- CreateJsonTableRefFromUuid(String executingProject) {
- this.executingProject = executingProject;
- }
+ static String getExtractJobId(String jobIdToken) {
+ return String.format("%s-extract", jobIdToken);
+ }
- @Override
- public TableReference apply(String jobUuid) {
- String queryTempDatasetId = "temp_dataset_" + jobUuid;
- String queryTempTableId = "temp_table_" + jobUuid;
- TableReference queryTempTableRef = new TableReference()
- .setProjectId(executingProject)
- .setDatasetId(queryTempDatasetId)
- .setTableId(queryTempTableId);
- return queryTempTableRef;
- }
+ static TableReference createTempTableReference(String projectId, String jobUuid) {
+ String queryTempDatasetId = "temp_dataset_" + jobUuid;
+ String queryTempTableId = "temp_table_" + jobUuid;
+ TableReference queryTempTableRef = new TableReference()
+ .setProjectId(projectId)
+ .setDatasetId(queryTempDatasetId)
+ .setTableId(queryTempTableId);
+ return queryTempTableRef;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index ea97906..2ff5cd7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.Job;
@@ -44,9 +46,6 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreatePerBeamJobUuid;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
@@ -468,15 +467,9 @@ public class BigQueryIO {
@Override
public PCollection<TableRow> expand(PBegin input) {
- String stepUuid = BigQueryHelpers.randomUUIDString();
+ final String stepUuid = BigQueryHelpers.randomUUIDString();
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
- ValueProvider<String> jobUuid = NestedValueProvider.of(
- StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
- final ValueProvider<String> jobIdToken = NestedValueProvider.of(
- jobUuid, new BeamJobUuidToBigQueryJobUuid());
-
BoundedSource<TableRow> source;
-
final String extractDestinationDir;
String tempLocation = bqOptions.getTempLocation();
try {
@@ -487,15 +480,12 @@ public class BigQueryIO {
String.format("Failed to resolve extract destination directory in %s", tempLocation));
}
- final String executingProject = bqOptions.getProject();
if (getQuery() != null
&& (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) {
source =
BigQueryQuerySource.create(
- jobIdToken,
+ stepUuid,
getQuery(),
- NestedValueProvider.of(
- jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
getFlattenResults(),
getUseLegacySql(),
extractDestinationDir,
@@ -503,11 +493,10 @@ public class BigQueryIO {
} else {
source =
BigQueryTableSource.create(
- jobIdToken,
+ stepUuid,
getTableProvider(),
extractDestinationDir,
- getBigQueryServices(),
- StaticValueProvider.of(executingProject));
+ getBigQueryServices());
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
new PassThroughThenCleanup.CleanupOperation() {
@@ -517,8 +506,9 @@ public class BigQueryIO {
JobReference jobRef =
new JobReference()
- .setProjectId(executingProject)
- .setJobId(getExtractJobId(jobIdToken));
+ .setProjectId(bqOptions.getProject())
+ .setJobId(
+ getExtractJobId(createJobIdToken(bqOptions.getJobName(), stepUuid)));
Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);
@@ -583,10 +573,6 @@ public class BigQueryIO {
}
}
- static String getExtractJobId(ValueProvider<String> jobIdToken) {
- return jobIdToken.get() + "-extract";
- }
-
static String getExtractDestinationUri(String extractDestinationDir) {
return String.format("%s/%s", extractDestinationDir, "*.avro");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 49da030..205f9cc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -19,7 +19,8 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
@@ -34,13 +35,10 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -51,17 +49,15 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
class BigQueryQuerySource extends BigQuerySourceBase {
static BigQueryQuerySource create(
- ValueProvider<String> jobIdToken,
+ String stepUuid,
ValueProvider<String> query,
- ValueProvider<TableReference> queryTempTableRef,
Boolean flattenResults,
Boolean useLegacySql,
String extractDestinationDir,
BigQueryServices bqServices) {
return new BigQueryQuerySource(
- jobIdToken,
+ stepUuid,
query,
- queryTempTableRef,
flattenResults,
useLegacySql,
extractDestinationDir,
@@ -69,25 +65,19 @@ class BigQueryQuerySource extends BigQuerySourceBase {
}
private final ValueProvider<String> query;
- private final ValueProvider<String> jsonQueryTempTable;
private final Boolean flattenResults;
private final Boolean useLegacySql;
private transient AtomicReference<JobStatistics> dryRunJobStats;
private BigQueryQuerySource(
- ValueProvider<String> jobIdToken,
+ String stepUuid,
ValueProvider<String> query,
- ValueProvider<TableReference> queryTempTableRef,
Boolean flattenResults,
Boolean useLegacySql,
String extractDestinationDir,
BigQueryServices bqServices) {
- super(jobIdToken, extractDestinationDir, bqServices,
- NestedValueProvider.of(
- checkNotNull(queryTempTableRef, "queryTempTableRef"), new TableRefToProjectId()));
+ super(stepUuid, extractDestinationDir, bqServices);
this.query = checkNotNull(query, "query");
- this.jsonQueryTempTable = NestedValueProvider.of(
- queryTempTableRef, new TableRefToJson());
this.flattenResults = checkNotNull(flattenResults, "flattenResults");
this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
this.dryRunJobStats = new AtomicReference<>();
@@ -103,7 +93,7 @@ class BigQueryQuerySource extends BigQuerySourceBase {
public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
return new BigQueryReader(this, bqServices.getReaderFromQuery(
- bqOptions, executingProject.get(), createBasicQueryConfig()));
+ bqOptions, bqOptions.getProject(), createBasicQueryConfig()));
}
@Override
@@ -120,8 +110,9 @@ class BigQueryQuerySource extends BigQuerySourceBase {
}
// 2. Create the temporary dataset in the query location.
- TableReference tableToExtract =
- BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
+ TableReference tableToExtract = createTempTableReference(
+ bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
+
tableService.createDataset(
tableToExtract.getProjectId(),
tableToExtract.getDatasetId(),
@@ -129,9 +120,9 @@ class BigQueryQuerySource extends BigQuerySourceBase {
"Dataset for BigQuery query job temporary table");
// 3. Execute the query.
- String queryJobId = jobIdToken.get() + "-query";
+ String queryJobId = createJobIdToken(bqOptions.getJobName(), stepUuid) + "-query";
executeQuery(
- executingProject.get(),
+ bqOptions.getProject(),
queryJobId,
tableToExtract,
bqServices.getJobService(bqOptions));
@@ -140,9 +131,8 @@ class BigQueryQuerySource extends BigQuerySourceBase {
@Override
protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
- checkState(jsonQueryTempTable.isAccessible());
- TableReference tableToRemove =
- BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
+ TableReference tableToRemove = createTempTableReference(
+ bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
DatasetService tableService = bqServices.getDatasetService(bqOptions);
tableService.deleteTable(tableToRemove);
@@ -159,7 +149,7 @@ class BigQueryQuerySource extends BigQuerySourceBase {
throws InterruptedException, IOException {
if (dryRunJobStats.get() == null) {
JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
- executingProject.get(), createBasicQueryConfig());
+ bqOptions.getProject(), createBasicQueryConfig());
dryRunJobStats.compareAndSet(null, jobStats);
}
return dryRunJobStats.get();
http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index c7a6cca..0171046 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -19,6 +19,8 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -38,7 +40,6 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,22 +63,16 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
// The maximum number of retries to poll a BigQuery job.
protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
- protected final ValueProvider<String> jobIdToken;
+ protected final String stepUuid;
protected final String extractDestinationDir;
protected final BigQueryServices bqServices;
- protected final ValueProvider<String> executingProject;
private transient List<BoundedSource<TableRow>> cachedSplitResult;
- BigQuerySourceBase(
- ValueProvider<String> jobIdToken,
- String extractDestinationDir,
- BigQueryServices bqServices,
- ValueProvider<String> executingProject) {
- this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
+ BigQuerySourceBase(String stepUuid, String extractDestinationDir, BigQueryServices bqServices) {
+ this.stepUuid = checkNotNull(stepUuid, "stepUuid");
this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
this.bqServices = checkNotNull(bqServices, "bqServices");
- this.executingProject = checkNotNull(executingProject, "executingProject");
}
@Override
@@ -91,8 +86,9 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
JobService jobService = bqServices.getJobService(bqOptions);
- String extractJobId = BigQueryIO.getExtractJobId(jobIdToken);
- List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
+ String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
+ List<String> tempFiles = executeExtract(
+ extractJobId, tableToExtract, jobService, bqOptions.getProject());
TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
.getTable(tableToExtract).getSchema();
@@ -118,10 +114,10 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
}
private List<String> executeExtract(
- String jobId, TableReference table, JobService jobService)
+ String jobId, TableReference table, JobService jobService, String executingProject)
throws InterruptedException, IOException {
JobReference jobRef = new JobReference()
- .setProjectId(executingProject.get())
+ .setProjectId(executingProject)
.setJobId(jobId);
String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir);
http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 5ec8b57..e754bd2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -43,25 +43,22 @@ class BigQueryTableSource extends BigQuerySourceBase {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSource.class);
static BigQueryTableSource create(
- ValueProvider<String> jobIdToken,
+ String stepUuid,
ValueProvider<TableReference> table,
String extractDestinationDir,
- BigQueryServices bqServices,
- ValueProvider<String> executingProject) {
- return new BigQueryTableSource(
- jobIdToken, table, extractDestinationDir, bqServices, executingProject);
+ BigQueryServices bqServices) {
+ return new BigQueryTableSource(stepUuid, table, extractDestinationDir, bqServices);
}
private final ValueProvider<String> jsonTable;
private final AtomicReference<Long> tableSizeBytes;
private BigQueryTableSource(
- ValueProvider<String> jobIdToken,
+ String stepUuid,
ValueProvider<TableReference> table,
String extractDestinationDir,
- BigQueryServices bqServices,
- ValueProvider<String> executingProject) {
- super(jobIdToken, extractDestinationDir, bqServices, executingProject);
+ BigQueryServices bqServices) {
+ super(stepUuid, extractDestinationDir, bqServices);
this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
this.tableSizeBytes = new AtomicReference<>();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index baa5621..ef3419e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -1213,11 +1215,9 @@ public class BigQueryIOTest implements Serializable {
datasetService.insertAll(table, expected, null);
Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI");
- String jobIdToken = "testJobIdToken";
+ String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
- StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
- baseDir.toString(), fakeBqServices,
- StaticValueProvider.of("project"));
+ stepUuid, StaticValueProvider.of(table), baseDir.toString(), fakeBqServices);
PipelineOptions options = PipelineOptionsFactory.create();
Assert.assertThat(
@@ -1255,15 +1255,15 @@ public class BigQueryIOTest implements Serializable {
Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit");
- String jobIdToken = "testJobIdToken";
+ String stepUuid = "testStepUuid";
String extractDestinationDir = baseDir.toString();
BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
- StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
- extractDestinationDir, fakeBqServices, StaticValueProvider.of("project"));
-
+ stepUuid, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices);
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation(baseDir.toString());
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ bqOptions.setProject("project");
List<TableRow> read = SourceTestUtils.readFromSource(bqSource, options);
assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
@@ -1316,10 +1316,17 @@ public class BigQueryIOTest implements Serializable {
new TableRow().set("name", "e").set("number", 5L),
new TableRow().set("name", "f").set("number", 6L));
- TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
- fakeDatasetService.createDataset("project", "data_set", "", "");
+ PipelineOptions options = PipelineOptionsFactory.create();
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ bqOptions.setProject("project");
+ String stepUuid = "testStepUuid";
+
+ TableReference tempTableReference = createTempTableReference(
+ bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
+ fakeDatasetService.createDataset(
+ bqOptions.getProject(), tempTableReference.getDatasetId(), "", "");
fakeDatasetService.createTable(new Table()
- .setTableReference(destinationTable)
+ .setTableReference(tempTableReference)
.setSchema(new TableSchema()
.setFields(
ImmutableList.of(
@@ -1327,24 +1334,21 @@ public class BigQueryIOTest implements Serializable {
new TableFieldSchema().setName("number").setType("INTEGER")))));
Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryQuerySourceInitSplit");
- String jobIdToken = "testJobIdToken";
+
String query = FakeBigQueryServices.encodeQuery(expected);
String extractDestinationDir = baseDir.toString();
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
- StaticValueProvider.of(jobIdToken), StaticValueProvider.of(query),
- StaticValueProvider.of(destinationTable),
+ stepUuid, StaticValueProvider.of(query),
true /* flattenResults */, true /* useLegacySql */,
extractDestinationDir, fakeBqServices);
-
- PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation(extractDestinationDir);
TableReference queryTable = new TableReference()
- .setProjectId("project")
- .setDatasetId("data_set")
- .setTableId("table_name");
+ .setProjectId(bqOptions.getProject())
+ .setDatasetId(tempTableReference.getDatasetId())
+ .setTableId(tempTableReference.getTableId());
- fakeJobService.expectDryRunQuery("project", query,
+ fakeJobService.expectDryRunQuery(bqOptions.getProject(), query,
new JobStatistics().setQuery(
new JobStatistics2()
.setTotalBytesProcessed(100L)
@@ -1387,7 +1391,13 @@ public class BigQueryIOTest implements Serializable {
.withJobService(jobService)
.withDatasetService(datasetService);
- TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+ PipelineOptions options = PipelineOptionsFactory.create();
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ bqOptions.setProject("project");
+ String stepUuid = "testStepUuid";
+
+ TableReference tempTableReference = createTempTableReference(
+ bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
List<TableRow> expected = ImmutableList.of(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
@@ -1395,10 +1405,10 @@ public class BigQueryIOTest implements Serializable {
new TableRow().set("name", "d").set("number", 4L),
new TableRow().set("name", "e").set("number", 5L),
new TableRow().set("name", "f").set("number", 6L));
- datasetService.createDataset(destinationTable.getProjectId(), destinationTable.getDatasetId(),
- "", "");
+ datasetService.createDataset(
+ tempTableReference.getProjectId(), tempTableReference.getDatasetId(), "", "");
Table table = new Table()
- .setTableReference(destinationTable)
+ .setTableReference(tempTableReference)
.setSchema(new TableSchema()
.setFields(
ImmutableList.of(
@@ -1413,18 +1423,15 @@ public class BigQueryIOTest implements Serializable {
.setTotalBytesProcessed(100L)
.setReferencedTables(ImmutableList.of(table.getTableReference()))));
- Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryNoTableQuerySourceInitSplit");
- String jobIdToken = "testJobIdToken";
+ Path baseDir = Files.createTempDirectory(
+ tempFolder, "testBigQueryNoTableQuerySourceInitSplit");
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
- StaticValueProvider.of(jobIdToken),
+ stepUuid,
StaticValueProvider.of(query),
- StaticValueProvider.of(destinationTable),
true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices);
-
-
- PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation(baseDir.toString());
+
List<TableRow> read = convertBigDecimaslToLong(
SourceTestUtils.readFromSource(bqSource, options));
assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index bef9a26..13d345e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -76,7 +76,6 @@ import org.joda.time.Duration;
*/
class FakeJobService implements JobService, Serializable {
static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
-
// Whenever a job is started, the first 5 calls to GetJob will report the job as pending,
// the next 5 will return the job as running, and only then will the job report as done.
private static final int GET_JOBS_TRANSITION_INTERVAL = 5;
@@ -240,7 +239,9 @@ class FakeJobService implements JobService, Serializable {
job.job.setStatus(runJob(job.job));
}
} catch (Exception e) {
- job.job.getStatus().setState("FAILED").setErrorResult(new ErrorProto());
+ job.job.getStatus().setState("FAILED").setErrorResult(
+ new ErrorProto().setMessage(
+ String.format("Job %s failed: %s", job.job.getConfiguration(), e.toString())));
}
return JSON_FACTORY.fromString(JSON_FACTORY.toString(job.job), Job.class);
}
[2/2] beam git commit: This closes #2846: BigQueryIO: Remove
PipelineOptions.getJobName usages at pipeline construction time
Posted by ke...@apache.org.
This closes #2846: BigQueryIO: Remove PipelineOptions.getJobName usages at pipeline construction time
Remove job name usages from BigQueryIO at pipeline construction time
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/17f0843e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/17f0843e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/17f0843e
Branch: refs/heads/master
Commit: 17f0843eba34d5b9adbba523477464d0b0651a3d
Parents: 57f449c 0ddf8d4
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 15:15:13 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 3 15:15:13 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 61 ++++-------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 32 +++------
.../io/gcp/bigquery/BigQueryQuerySource.java | 40 +++++-------
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 24 +++----
.../io/gcp/bigquery/BigQueryTableSource.java | 15 ++---
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 69 +++++++++++---------
.../sdk/io/gcp/bigquery/FakeJobService.java | 5 +-
7 files changed, 94 insertions(+), 152 deletions(-)
----------------------------------------------------------------------