You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/20 23:38:28 UTC

[2/2] incubator-beam git commit: [BEAM-48] Add BigQueryTornadoes integration test

[BEAM-48] Add BigQueryTornadoes integration test


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89d20a2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89d20a2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89d20a2d

Branch: refs/heads/master
Commit: 89d20a2d66319269082cdead70eb3cf10309b9e8
Parents: d795295
Author: Pei He <pe...@google.com>
Authored: Wed May 18 17:46:24 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 20 16:37:39 2016 -0700

----------------------------------------------------------------------
 .../examples/cookbook/BigQueryTornadoes.java    |  2 +-
 .../examples/cookbook/BigQueryTornadoesIT.java  | 52 ++++++++++++
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 87 ++++++++++++--------
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 17 ++--
 4 files changed, 112 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 80a2f25..4c69efb 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -143,7 +143,7 @@ public class BigQueryTornadoes {
    *
    * <p>Inherits standard configuration options.
    */
-  private static interface Options extends PipelineOptions {
+  static interface Options extends PipelineOptions {
     @Description("Table to read from, specified as "
         + "<project_id>:<dataset_id>.<table_id>")
     @Default.String(WEATHER_SAMPLES_TABLE)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
new file mode 100644
index 0000000..fbd775c
--- /dev/null
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.cookbook;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * End-to-end tests of BigQueryTornadoes.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryTornadoesIT {
+
+  /**
+   * Options for the BigQueryTornadoes Integration Test.
+   */
+  public interface BigQueryTornadoesITOptions
+      extends TestPipelineOptions, BigQueryTornadoes.Options {
+  }
+
+  @Test
+  public void testE2EBigQueryTornadoes() throws Exception {
+    PipelineOptionsFactory.register(BigQueryTornadoesITOptions.class);
+    BigQueryTornadoesITOptions options =
+        TestPipeline.testingPipelineOptions().as(BigQueryTornadoesITOptions.class);
+    options.setOutput(String.format("%s.%s",
+        "BigQueryTornadoesIT", "monthly_tornadoes_" + System.currentTimeMillis()));
+
+    BigQueryTornadoes.main(TestPipeline.convertToArgs(options));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index e4a306a..030dde0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -559,34 +559,23 @@ public class BigQueryIO {
               String.format("Failed to resolve extract destination directory in %s", tempLocation));
         }
 
+        final String executingProject = bqOptions.getProject();
         if (!Strings.isNullOrEmpty(query)) {
-          String projectId = bqOptions.getProject();
           String queryTempDatasetId = "temp_dataset_" + uuid;
           String queryTempTableId = "temp_table_" + uuid;
 
           TableReference queryTempTableRef = new TableReference()
-              .setProjectId(projectId)
+              .setProjectId(executingProject)
               .setDatasetId(queryTempDatasetId)
               .setTableId(queryTempTableId);
 
-          String jsonQueryTempTable;
-          try {
-            jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef);
-          } catch (IOException e) {
-            throw new RuntimeException("Cannot initialize table to JSON strings.", e);
-          }
           source = BigQueryQuerySource.create(
-              jobIdToken, query, jsonQueryTempTable, flattenResults,
+              jobIdToken, query, queryTempTableRef, flattenResults,
               extractDestinationDir, bqServices);
         } else {
-          String jsonTable;
-          try {
-            jsonTable = JSON_FACTORY.toString(getTableWithDefaultProject(bqOptions));
-          } catch (IOException e) {
-            throw new RuntimeException("Cannot initialize table to JSON strings.", e);
-          }
+          TableReference inputTable = getTableWithDefaultProject(bqOptions);
           source = BigQueryTableSource.create(
-              jobIdToken, jsonTable, extractDestinationDir, bqServices);
+              jobIdToken, inputTable, extractDestinationDir, bqServices, executingProject);
         }
         PassThroughThenCleanup.CleanupOperation cleanupOperation =
             new PassThroughThenCleanup.CleanupOperation() {
@@ -595,7 +584,7 @@ public class BigQueryIO {
                 BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
 
                 JobReference jobRef = new JobReference()
-                    .setProjectId(bqOptions.getProject())
+                    .setProjectId(executingProject)
                     .setJobId(getExtractJobId(jobIdToken));
                 Job extractJob = bqServices.getJobService(bqOptions).pollJob(
                     jobRef, CLEANUP_JOB_POLL_MAX_RETRIES);
@@ -759,10 +748,12 @@ public class BigQueryIO {
 
     static BigQueryTableSource create(
         String jobIdToken,
-        String jsonTable,
+        TableReference table,
         String extractDestinationDir,
-        BigQueryServices bqServices) {
-      return new BigQueryTableSource(jobIdToken, jsonTable, extractDestinationDir, bqServices);
+        BigQueryServices bqServices,
+        String executingProject) {
+      return new BigQueryTableSource(
+          jobIdToken, table, extractDestinationDir, bqServices, executingProject);
     }
 
     private final String jsonTable;
@@ -770,11 +761,17 @@ public class BigQueryIO {
 
     private BigQueryTableSource(
         String jobIdToken,
-        String jsonTable,
+        TableReference table,
         String extractDestinationDir,
-        BigQueryServices bqServices) {
-      super(jobIdToken, extractDestinationDir, bqServices);
-      this.jsonTable = checkNotNull(jsonTable, "jsonTable");
+        BigQueryServices bqServices,
+        String executingProject) {
+      super(jobIdToken, extractDestinationDir, bqServices, executingProject);
+      checkNotNull(table, "table");
+      try {
+        this.jsonTable = JSON_FACTORY.toString(table);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot initialize table to JSON strings.", e);
+      }
       this.tableSizeBytes = new AtomicReference<>();
     }
 
@@ -824,12 +821,17 @@ public class BigQueryIO {
     static BigQueryQuerySource create(
         String jobIdToken,
         String query,
-        String jsonQueryTempTable,
+        TableReference queryTempTableRef,
         Boolean flattenResults,
         String extractDestinationDir,
         BigQueryServices bqServices) {
       return new BigQueryQuerySource(
-          jobIdToken, query, jsonQueryTempTable, flattenResults, extractDestinationDir, bqServices);
+          jobIdToken,
+          query,
+          queryTempTableRef,
+          flattenResults,
+          extractDestinationDir,
+          bqServices);
     }
 
     private final String query;
@@ -840,13 +842,18 @@ public class BigQueryIO {
     private BigQueryQuerySource(
         String jobIdToken,
         String query,
-        String jsonQueryTempTable,
+        TableReference queryTempTableRef,
         Boolean flattenResults,
         String extractDestinationDir,
         BigQueryServices bqServices) {
-      super(jobIdToken, extractDestinationDir, bqServices);
+      super(jobIdToken, extractDestinationDir, bqServices,
+          checkNotNull(queryTempTableRef, "queryTempTableRef").getProjectId());
       this.query = checkNotNull(query, "query");
-      this.jsonQueryTempTable = checkNotNull(jsonQueryTempTable, "jsonQueryTempTable");
+      try {
+        this.jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot initialize table to JSON strings.", e);
+      }
       this.flattenResults = checkNotNull(flattenResults, "flattenResults");
       this.dryRunJobStats = new AtomicReference<>();
     }
@@ -861,7 +868,7 @@ public class BigQueryIO {
     public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
       BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
       return new BigQueryReader(this, bqServices.getReaderFromQuery(
-          bqOptions, query, bqOptions.getProject(), flattenResults));
+          bqOptions, query, executingProject, flattenResults));
     }
 
     @Override
@@ -887,7 +894,12 @@ public class BigQueryIO {
       // 3. Execute the query.
       String queryJobId = jobIdToken + "-query";
       executeQuery(
-          queryJobId, query, tableToExtract, flattenResults, bqServices.getJobService(bqOptions));
+          executingProject,
+          queryJobId,
+          query,
+          tableToExtract,
+          flattenResults,
+          bqServices.getJobService(bqOptions));
       return tableToExtract;
     }
 
@@ -912,22 +924,22 @@ public class BigQueryIO {
     private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
         throws InterruptedException, IOException {
       if (dryRunJobStats.get() == null) {
-        String projectId = bqOptions.getProject();
         JobStatistics jobStats =
-            bqServices.getJobService(bqOptions).dryRunQuery(projectId, query);
+            bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query);
         dryRunJobStats.compareAndSet(null, jobStats);
       }
       return dryRunJobStats.get();
     }
 
     private static void executeQuery(
+        String executingProject,
         String jobId,
         String query,
         TableReference destinationTable,
         boolean flattenResults,
         JobService jobService) throws IOException, InterruptedException {
       JobReference jobRef = new JobReference()
-          .setProjectId(destinationTable.getProjectId())
+          .setProjectId(executingProject)
           .setJobId(jobId);
       JobConfigurationQuery queryConfig = new JobConfigurationQuery();
       queryConfig
@@ -978,14 +990,17 @@ public class BigQueryIO {
     protected final String jobIdToken;
     protected final String extractDestinationDir;
     protected final BigQueryServices bqServices;
+    protected final String executingProject;
 
     private BigQuerySourceBase(
         String jobIdToken,
         String extractDestinationDir,
-        BigQueryServices bqServices) {
+        BigQueryServices bqServices,
+        String executingProject) {
       this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
       this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
       this.bqServices = checkNotNull(bqServices, "bqServices");
+      this.executingProject = checkNotNull(executingProject, "executingProject");
     }
 
     @Override
@@ -1029,7 +1044,7 @@ public class BigQueryIO {
         String jobId, TableReference table, JobService jobService)
             throws InterruptedException, IOException {
       JobReference jobRef = new JobReference()
-          .setProjectId(table.getProjectId())
+          .setProjectId(executingProject)
           .setJobId(jobId);
 
       String destinationUri = getExtractDestinationUri(extractDestinationDir);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 6849018..2d1b550 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -870,10 +870,10 @@ public class BigQueryIOTest implements Serializable {
             toJsonString(new TableRow().set("name", "c").set("number", "3")));
 
     String jobIdToken = "testJobIdToken";
-    String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name"));
+    TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name");
     String extractDestinationDir = "mock://tempLocation";
-    BoundedSource<TableRow> bqSource =
-        BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices);
+    BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+        jobIdToken, table, extractDestinationDir, fakeBqServices, "project");
 
     List<TableRow> expected = ImmutableList.of(
         new TableRow().set("name", "a").set("number", "1"),
@@ -907,10 +907,10 @@ public class BigQueryIOTest implements Serializable {
             toJsonString(new TableRow().set("name", "c").set("number", "3")));
 
     String jobIdToken = "testJobIdToken";
-    String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name"));
+    TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name");
     String extractDestinationDir = "mock://tempLocation";
-    BoundedSource<TableRow> bqSource =
-        BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices);
+    BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+        jobIdToken, table, extractDestinationDir, fakeBqServices, "project");
 
     List<TableRow> expected = ImmutableList.of(
         new TableRow().set("name", "a").set("number", "1"),
@@ -973,10 +973,9 @@ public class BigQueryIOTest implements Serializable {
 
     String jobIdToken = "testJobIdToken";
     String extractDestinationDir = "mock://tempLocation";
-    TableReference destinationTable = BigQueryIO.parseTableSpec("project.data_set.table_name");
-    String jsonDestinationTable = toJsonString(destinationTable);
+    TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
     BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        jobIdToken, "query", jsonDestinationTable, true /* flattenResults */,
+        jobIdToken, "query", destinationTable, true /* flattenResults */,
         extractDestinationDir, fakeBqServices);
 
     List<TableRow> expected = ImmutableList.of(