You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/03 22:15:47 UTC

[1/2] beam git commit: Remove job name usages from BigQueryIO at pipeline construction time

Repository: beam
Updated Branches:
  refs/heads/master 57f449c4c -> 17f0843eb


Remove job name usages from BigQueryIO at pipeline construction time


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ddf8d49
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ddf8d49
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ddf8d49

Branch: refs/heads/master
Commit: 0ddf8d49d94288e693494ac0685b0c6df78dcd3b
Parents: 57f449c
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Tue May 2 13:55:32 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 3 15:15:02 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    | 61 ++++-------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 32 +++------
 .../io/gcp/bigquery/BigQueryQuerySource.java    | 40 +++++-------
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 24 +++----
 .../io/gcp/bigquery/BigQueryTableSource.java    | 15 ++---
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 69 +++++++++++---------
 .../sdk/io/gcp/bigquery/FakeJobService.java     |  5 +-
 7 files changed, 94 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index e04361c..3850cbd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -256,15 +256,6 @@ public class BigQueryHelpers {
     }
   }
 
-  @VisibleForTesting
-  static class BeamJobUuidToBigQueryJobUuid
-      implements SerializableFunction<String, String> {
-    @Override
-    public String apply(String from) {
-      return "beam_job_" + from;
-    }
-  }
-
   static class TableSchemaToJsonSchema
       implements SerializableFunction<TableSchema, String> {
     @Override
@@ -297,14 +288,6 @@ public class BigQueryHelpers {
     }
   }
 
-  static class TableRefToProjectId
-      implements SerializableFunction<TableReference, String> {
-    @Override
-    public String apply(TableReference from) {
-      return from.getProjectId();
-    }
-  }
-
   @VisibleForTesting
   static class TableSpecToTableRef
       implements SerializableFunction<String, TableReference> {
@@ -314,39 +297,21 @@ public class BigQueryHelpers {
     }
   }
 
-  @VisibleForTesting
-  static class CreatePerBeamJobUuid
-      implements SerializableFunction<String, String> {
-    private final String stepUuid;
-
-    CreatePerBeamJobUuid(String stepUuid) {
-      this.stepUuid = stepUuid;
-    }
-
-    @Override
-    public String apply(String jobUuid) {
-      return stepUuid + "_" + jobUuid.replaceAll("-", "");
-    }
+  static String createJobIdToken(String jobName, String stepUuid) {
+    return String.format("beam_job_%s_%s", stepUuid, jobName.replaceAll("-", ""));
   }
 
-  @VisibleForTesting
-  static class CreateJsonTableRefFromUuid
-      implements SerializableFunction<String, TableReference> {
-    private final String executingProject;
-
-    CreateJsonTableRefFromUuid(String executingProject) {
-      this.executingProject = executingProject;
-    }
+  static String getExtractJobId(String jobIdToken) {
+    return String.format("%s-extract", jobIdToken);
+  }
 
-    @Override
-    public TableReference apply(String jobUuid) {
-      String queryTempDatasetId = "temp_dataset_" + jobUuid;
-      String queryTempTableId = "temp_table_" + jobUuid;
-      TableReference queryTempTableRef = new TableReference()
-          .setProjectId(executingProject)
-          .setDatasetId(queryTempDatasetId)
-          .setTableId(queryTempTableId);
-      return queryTempTableRef;
-    }
+  static TableReference createTempTableReference(String projectId, String jobUuid) {
+    String queryTempDatasetId = "temp_dataset_" + jobUuid;
+    String queryTempTableId = "temp_table_" + jobUuid;
+    TableReference queryTempTableRef = new TableReference()
+        .setProjectId(projectId)
+        .setDatasetId(queryTempDatasetId)
+        .setTableId(queryTempTableId);
+    return queryTempTableRef;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index ea97906..2ff5cd7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
 
 import com.google.api.client.json.JsonFactory;
 import com.google.api.services.bigquery.model.Job;
@@ -44,9 +46,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreatePerBeamJobUuid;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
@@ -468,15 +467,9 @@ public class BigQueryIO {
 
     @Override
     public PCollection<TableRow> expand(PBegin input) {
-      String stepUuid = BigQueryHelpers.randomUUIDString();
+      final String stepUuid = BigQueryHelpers.randomUUIDString();
       BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-      ValueProvider<String> jobUuid = NestedValueProvider.of(
-         StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
-      final ValueProvider<String> jobIdToken = NestedValueProvider.of(
-          jobUuid, new BeamJobUuidToBigQueryJobUuid());
-
       BoundedSource<TableRow> source;
-
       final String extractDestinationDir;
       String tempLocation = bqOptions.getTempLocation();
       try {
@@ -487,15 +480,12 @@ public class BigQueryIO {
             String.format("Failed to resolve extract destination directory in %s", tempLocation));
       }
 
-      final String executingProject = bqOptions.getProject();
       if (getQuery() != null
           && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) {
         source =
             BigQueryQuerySource.create(
-                jobIdToken,
+                stepUuid,
                 getQuery(),
-                NestedValueProvider.of(
-                    jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
                 getFlattenResults(),
                 getUseLegacySql(),
                 extractDestinationDir,
@@ -503,11 +493,10 @@ public class BigQueryIO {
       } else {
         source =
             BigQueryTableSource.create(
-                jobIdToken,
+                stepUuid,
                 getTableProvider(),
                 extractDestinationDir,
-                getBigQueryServices(),
-                StaticValueProvider.of(executingProject));
+                getBigQueryServices());
       }
       PassThroughThenCleanup.CleanupOperation cleanupOperation =
           new PassThroughThenCleanup.CleanupOperation() {
@@ -517,8 +506,9 @@ public class BigQueryIO {
 
               JobReference jobRef =
                   new JobReference()
-                      .setProjectId(executingProject)
-                      .setJobId(getExtractJobId(jobIdToken));
+                      .setProjectId(bqOptions.getProject())
+                      .setJobId(
+                          getExtractJobId(createJobIdToken(bqOptions.getJobName(), stepUuid)));
 
               Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);
 
@@ -583,10 +573,6 @@ public class BigQueryIO {
     }
   }
 
-  static String getExtractJobId(ValueProvider<String> jobIdToken) {
-    return jobIdToken.get() + "-extract";
-  }
-
   static String getExtractDestinationUri(String extractDestinationDir) {
     return String.format("%s/%s", extractDestinationDir, "*.avro");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 49da030..205f9cc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -19,7 +19,8 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
 
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
@@ -34,13 +35,10 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
 
@@ -51,17 +49,15 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 class BigQueryQuerySource extends BigQuerySourceBase {
 
   static BigQueryQuerySource create(
-      ValueProvider<String> jobIdToken,
+      String stepUuid,
       ValueProvider<String> query,
-      ValueProvider<TableReference> queryTempTableRef,
       Boolean flattenResults,
       Boolean useLegacySql,
       String extractDestinationDir,
       BigQueryServices bqServices) {
     return new BigQueryQuerySource(
-        jobIdToken,
+        stepUuid,
         query,
-        queryTempTableRef,
         flattenResults,
         useLegacySql,
         extractDestinationDir,
@@ -69,25 +65,19 @@ class BigQueryQuerySource extends BigQuerySourceBase {
   }
 
   private final ValueProvider<String> query;
-  private final ValueProvider<String> jsonQueryTempTable;
   private final Boolean flattenResults;
   private final Boolean useLegacySql;
   private transient AtomicReference<JobStatistics> dryRunJobStats;
 
   private BigQueryQuerySource(
-      ValueProvider<String> jobIdToken,
+      String stepUuid,
       ValueProvider<String> query,
-      ValueProvider<TableReference> queryTempTableRef,
       Boolean flattenResults,
       Boolean useLegacySql,
       String extractDestinationDir,
       BigQueryServices bqServices) {
-    super(jobIdToken, extractDestinationDir, bqServices,
-        NestedValueProvider.of(
-            checkNotNull(queryTempTableRef, "queryTempTableRef"), new TableRefToProjectId()));
+    super(stepUuid, extractDestinationDir, bqServices);
     this.query = checkNotNull(query, "query");
-    this.jsonQueryTempTable = NestedValueProvider.of(
-        queryTempTableRef, new TableRefToJson());
     this.flattenResults = checkNotNull(flattenResults, "flattenResults");
     this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
     this.dryRunJobStats = new AtomicReference<>();
@@ -103,7 +93,7 @@ class BigQueryQuerySource extends BigQuerySourceBase {
   public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     return new BigQueryReader(this, bqServices.getReaderFromQuery(
-        bqOptions, executingProject.get(), createBasicQueryConfig()));
+        bqOptions, bqOptions.getProject(), createBasicQueryConfig()));
   }
 
   @Override
@@ -120,8 +110,9 @@ class BigQueryQuerySource extends BigQuerySourceBase {
     }
 
     // 2. Create the temporary dataset in the query location.
-    TableReference tableToExtract =
-        BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
+    TableReference tableToExtract = createTempTableReference(
+        bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
+
     tableService.createDataset(
         tableToExtract.getProjectId(),
         tableToExtract.getDatasetId(),
@@ -129,9 +120,9 @@ class BigQueryQuerySource extends BigQuerySourceBase {
         "Dataset for BigQuery query job temporary table");
 
     // 3. Execute the query.
-    String queryJobId = jobIdToken.get() + "-query";
+    String queryJobId = createJobIdToken(bqOptions.getJobName(), stepUuid) + "-query";
     executeQuery(
-        executingProject.get(),
+        bqOptions.getProject(),
         queryJobId,
         tableToExtract,
         bqServices.getJobService(bqOptions));
@@ -140,9 +131,8 @@ class BigQueryQuerySource extends BigQuerySourceBase {
 
   @Override
   protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
-    checkState(jsonQueryTempTable.isAccessible());
-    TableReference tableToRemove =
-        BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
+    TableReference tableToRemove = createTempTableReference(
+        bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
 
     DatasetService tableService = bqServices.getDatasetService(bqOptions);
     tableService.deleteTable(tableToRemove);
@@ -159,7 +149,7 @@ class BigQueryQuerySource extends BigQuerySourceBase {
       throws InterruptedException, IOException {
     if (dryRunJobStats.get() == null) {
       JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
-          executingProject.get(), createBasicQueryConfig());
+          bqOptions.getProject(), createBasicQueryConfig());
       dryRunJobStats.compareAndSet(null, jobStats);
     }
     return dryRunJobStats.get();

http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index c7a6cca..0171046 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -19,6 +19,8 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
 
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
@@ -38,7 +40,6 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,22 +63,16 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
   // The maximum number of retries to poll a BigQuery job.
   protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
-  protected final ValueProvider<String> jobIdToken;
+  protected final String stepUuid;
   protected final String extractDestinationDir;
   protected final BigQueryServices bqServices;
-  protected final ValueProvider<String> executingProject;
 
   private transient List<BoundedSource<TableRow>> cachedSplitResult;
 
-  BigQuerySourceBase(
-      ValueProvider<String> jobIdToken,
-      String extractDestinationDir,
-      BigQueryServices bqServices,
-      ValueProvider<String> executingProject) {
-    this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
+  BigQuerySourceBase(String stepUuid, String extractDestinationDir, BigQueryServices bqServices) {
+    this.stepUuid = checkNotNull(stepUuid, "stepUuid");
     this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
     this.bqServices = checkNotNull(bqServices, "bqServices");
-    this.executingProject = checkNotNull(executingProject, "executingProject");
   }
 
   @Override
@@ -91,8 +86,9 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
       BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
       TableReference tableToExtract = getTableToExtract(bqOptions);
       JobService jobService = bqServices.getJobService(bqOptions);
-      String extractJobId = BigQueryIO.getExtractJobId(jobIdToken);
-      List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
+      String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
+      List<String> tempFiles = executeExtract(
+          extractJobId, tableToExtract, jobService, bqOptions.getProject());
 
       TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
           .getTable(tableToExtract).getSchema();
@@ -118,10 +114,10 @@ abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
   }
 
   private List<String> executeExtract(
-      String jobId, TableReference table, JobService jobService)
+      String jobId, TableReference table, JobService jobService, String executingProject)
           throws InterruptedException, IOException {
     JobReference jobRef = new JobReference()
-        .setProjectId(executingProject.get())
+        .setProjectId(executingProject)
         .setJobId(jobId);
 
     String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir);

http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 5ec8b57..e754bd2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -43,25 +43,22 @@ class BigQueryTableSource extends BigQuerySourceBase {
   private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSource.class);
 
   static BigQueryTableSource create(
-      ValueProvider<String> jobIdToken,
+      String stepUuid,
       ValueProvider<TableReference> table,
       String extractDestinationDir,
-      BigQueryServices bqServices,
-      ValueProvider<String> executingProject) {
-    return new BigQueryTableSource(
-        jobIdToken, table, extractDestinationDir, bqServices, executingProject);
+      BigQueryServices bqServices) {
+    return new BigQueryTableSource(stepUuid, table, extractDestinationDir, bqServices);
   }
 
   private final ValueProvider<String> jsonTable;
   private final AtomicReference<Long> tableSizeBytes;
 
   private BigQueryTableSource(
-      ValueProvider<String> jobIdToken,
+      String stepUuid,
       ValueProvider<TableReference> table,
       String extractDestinationDir,
-      BigQueryServices bqServices,
-      ValueProvider<String> executingProject) {
-    super(jobIdToken, extractDestinationDir, bqServices, executingProject);
+      BigQueryServices bqServices) {
+    super(stepUuid, extractDestinationDir, bqServices);
     this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
     this.tableSizeBytes = new AtomicReference<>();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index baa5621..ef3419e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -1213,11 +1215,9 @@ public class BigQueryIOTest implements Serializable {
     datasetService.insertAll(table, expected, null);
 
     Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceThroughJsonAPI");
-    String jobIdToken = "testJobIdToken";
+    String stepUuid = "testStepUuid";
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
-        StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
-        baseDir.toString(), fakeBqServices,
-        StaticValueProvider.of("project"));
+        stepUuid, StaticValueProvider.of(table), baseDir.toString(), fakeBqServices);
 
     PipelineOptions options = PipelineOptionsFactory.create();
     Assert.assertThat(
@@ -1255,15 +1255,15 @@ public class BigQueryIOTest implements Serializable {
 
     Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryTableSourceInitSplit");
 
-    String jobIdToken = "testJobIdToken";
+    String stepUuid = "testStepUuid";
     String extractDestinationDir = baseDir.toString();
     BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
-        StaticValueProvider.of(jobIdToken), StaticValueProvider.of(table),
-        extractDestinationDir, fakeBqServices, StaticValueProvider.of("project"));
-
+        stepUuid, StaticValueProvider.of(table), extractDestinationDir, fakeBqServices);
 
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(baseDir.toString());
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    bqOptions.setProject("project");
 
     List<TableRow> read = SourceTestUtils.readFromSource(bqSource, options);
     assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
@@ -1316,10 +1316,17 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "e").set("number", 5L),
         new TableRow().set("name", "f").set("number", 6L));
 
-    TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
-    fakeDatasetService.createDataset("project", "data_set", "", "");
+    PipelineOptions options = PipelineOptionsFactory.create();
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    bqOptions.setProject("project");
+    String stepUuid = "testStepUuid";
+
+    TableReference tempTableReference = createTempTableReference(
+        bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
+    fakeDatasetService.createDataset(
+        bqOptions.getProject(), tempTableReference.getDatasetId(), "", "");
     fakeDatasetService.createTable(new Table()
-        .setTableReference(destinationTable)
+        .setTableReference(tempTableReference)
         .setSchema(new TableSchema()
             .setFields(
                 ImmutableList.of(
@@ -1327,24 +1334,21 @@ public class BigQueryIOTest implements Serializable {
                     new TableFieldSchema().setName("number").setType("INTEGER")))));
     Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryQuerySourceInitSplit");
 
-    String jobIdToken = "testJobIdToken";
+
     String query = FakeBigQueryServices.encodeQuery(expected);
     String extractDestinationDir = baseDir.toString();
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        StaticValueProvider.of(jobIdToken), StaticValueProvider.of(query),
-        StaticValueProvider.of(destinationTable),
+        stepUuid, StaticValueProvider.of(query),
         true /* flattenResults */, true /* useLegacySql */,
         extractDestinationDir, fakeBqServices);
-
-    PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(extractDestinationDir);
 
     TableReference queryTable = new TableReference()
-        .setProjectId("project")
-        .setDatasetId("data_set")
-        .setTableId("table_name");
+        .setProjectId(bqOptions.getProject())
+        .setDatasetId(tempTableReference.getDatasetId())
+        .setTableId(tempTableReference.getTableId());
 
-    fakeJobService.expectDryRunQuery("project", query,
+    fakeJobService.expectDryRunQuery(bqOptions.getProject(), query,
         new JobStatistics().setQuery(
             new JobStatistics2()
                 .setTotalBytesProcessed(100L)
@@ -1387,7 +1391,13 @@ public class BigQueryIOTest implements Serializable {
         .withJobService(jobService)
         .withDatasetService(datasetService);
 
-    TableReference destinationTable = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
+    PipelineOptions options = PipelineOptionsFactory.create();
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    bqOptions.setProject("project");
+    String stepUuid = "testStepUuid";
+
+    TableReference tempTableReference = createTempTableReference(
+        bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
     List<TableRow> expected = ImmutableList.of(
         new TableRow().set("name", "a").set("number", 1L),
         new TableRow().set("name", "b").set("number", 2L),
@@ -1395,10 +1405,10 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "d").set("number", 4L),
         new TableRow().set("name", "e").set("number", 5L),
         new TableRow().set("name", "f").set("number", 6L));
-    datasetService.createDataset(destinationTable.getProjectId(), destinationTable.getDatasetId(),
-        "", "");
+    datasetService.createDataset(
+        tempTableReference.getProjectId(), tempTableReference.getDatasetId(), "", "");
     Table table = new Table()
-        .setTableReference(destinationTable)
+        .setTableReference(tempTableReference)
         .setSchema(new TableSchema()
                 .setFields(
                     ImmutableList.of(
@@ -1413,18 +1423,15 @@ public class BigQueryIOTest implements Serializable {
                 .setTotalBytesProcessed(100L)
                 .setReferencedTables(ImmutableList.of(table.getTableReference()))));
 
-    Path baseDir = Files.createTempDirectory(tempFolder, "testBigQueryNoTableQuerySourceInitSplit");
-    String jobIdToken = "testJobIdToken";
+    Path baseDir = Files.createTempDirectory(
+        tempFolder, "testBigQueryNoTableQuerySourceInitSplit");
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        StaticValueProvider.of(jobIdToken),
+        stepUuid,
         StaticValueProvider.of(query),
-        StaticValueProvider.of(destinationTable),
         true /* flattenResults */, true /* useLegacySql */, baseDir.toString(), fakeBqServices);
 
-
-
-    PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(baseDir.toString());
+
     List<TableRow> read = convertBigDecimaslToLong(
         SourceTestUtils.readFromSource(bqSource, options));
     assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));

http://git-wip-us.apache.org/repos/asf/beam/blob/0ddf8d49/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index bef9a26..13d345e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -76,7 +76,6 @@ import org.joda.time.Duration;
  */
 class FakeJobService implements JobService, Serializable {
   static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
-
   // Whenever a job is started, the first 5 calls to GetJob will report the job as pending,
   // the next 5 will return the job as running, and only then will the job report as done.
   private static final int GET_JOBS_TRANSITION_INTERVAL = 5;
@@ -240,7 +239,9 @@ class FakeJobService implements JobService, Serializable {
             job.job.setStatus(runJob(job.job));
           }
         } catch (Exception e) {
-          job.job.getStatus().setState("FAILED").setErrorResult(new ErrorProto());
+          job.job.getStatus().setState("FAILED").setErrorResult(
+              new ErrorProto().setMessage(
+                  String.format("Job %s failed: %s", job.job.getConfiguration(), e.toString())));
         }
         return JSON_FACTORY.fromString(JSON_FACTORY.toString(job.job), Job.class);
       }


[2/2] beam git commit: This closes #2846: BigQueryIO: Remove PipelineOptions.getJobName usages at pipeline construction time

Posted by ke...@apache.org.
This closes #2846: BigQueryIO: Remove PipelineOptions.getJobName usages at pipeline construction time

  Remove job name usages from BigQueryIO at pipeline construction time


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/17f0843e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/17f0843e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/17f0843e

Branch: refs/heads/master
Commit: 17f0843eba34d5b9adbba523477464d0b0651a3d
Parents: 57f449c 0ddf8d4
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 15:15:13 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 3 15:15:13 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    | 61 ++++-------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 32 +++------
 .../io/gcp/bigquery/BigQueryQuerySource.java    | 40 +++++-------
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 24 +++----
 .../io/gcp/bigquery/BigQueryTableSource.java    | 15 ++---
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 69 +++++++++++---------
 .../sdk/io/gcp/bigquery/FakeJobService.java     |  5 +-
 7 files changed, 94 insertions(+), 152 deletions(-)
----------------------------------------------------------------------