You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/20 23:38:28 UTC
[2/2] incubator-beam git commit: [BEAM-48] Add BigQueryTornadoes
integration test
[BEAM-48] Add BigQueryTornadoes integration test
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/89d20a2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/89d20a2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/89d20a2d
Branch: refs/heads/master
Commit: 89d20a2d66319269082cdead70eb3cf10309b9e8
Parents: d795295
Author: Pei He <pe...@google.com>
Authored: Wed May 18 17:46:24 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 20 16:37:39 2016 -0700
----------------------------------------------------------------------
.../examples/cookbook/BigQueryTornadoes.java | 2 +-
.../examples/cookbook/BigQueryTornadoesIT.java | 52 ++++++++++++
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 87 ++++++++++++--------
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 17 ++--
4 files changed, 112 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 80a2f25..4c69efb 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -143,7 +143,7 @@ public class BigQueryTornadoes {
*
* <p>Inherits standard configuration options.
*/
- private static interface Options extends PipelineOptions {
+ static interface Options extends PipelineOptions {
@Description("Table to read from, specified as "
+ "<project_id>:<dataset_id>.<table_id>")
@Default.String(WEATHER_SAMPLES_TABLE)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
new file mode 100644
index 0000000..fbd775c
--- /dev/null
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesIT.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.cookbook;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * End-to-end tests of BigQueryTornadoes.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryTornadoesIT {
+
+ /**
+ * Options for the BigQueryTornadoes Integration Test.
+ */
+ public interface BigQueryTornadoesITOptions
+ extends TestPipelineOptions, BigQueryTornadoes.Options {
+ }
+
+ @Test
+ public void testE2EBigQueryTornadoes() throws Exception {
+ PipelineOptionsFactory.register(BigQueryTornadoesITOptions.class);
+ BigQueryTornadoesITOptions options =
+ TestPipeline.testingPipelineOptions().as(BigQueryTornadoesITOptions.class);
+ options.setOutput(String.format("%s.%s",
+ "BigQueryTornadoesIT", "monthly_tornadoes_" + System.currentTimeMillis()));
+
+ BigQueryTornadoes.main(TestPipeline.convertToArgs(options));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index e4a306a..030dde0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -559,34 +559,23 @@ public class BigQueryIO {
String.format("Failed to resolve extract destination directory in %s", tempLocation));
}
+ final String executingProject = bqOptions.getProject();
if (!Strings.isNullOrEmpty(query)) {
- String projectId = bqOptions.getProject();
String queryTempDatasetId = "temp_dataset_" + uuid;
String queryTempTableId = "temp_table_" + uuid;
TableReference queryTempTableRef = new TableReference()
- .setProjectId(projectId)
+ .setProjectId(executingProject)
.setDatasetId(queryTempDatasetId)
.setTableId(queryTempTableId);
- String jsonQueryTempTable;
- try {
- jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef);
- } catch (IOException e) {
- throw new RuntimeException("Cannot initialize table to JSON strings.", e);
- }
source = BigQueryQuerySource.create(
- jobIdToken, query, jsonQueryTempTable, flattenResults,
+ jobIdToken, query, queryTempTableRef, flattenResults,
extractDestinationDir, bqServices);
} else {
- String jsonTable;
- try {
- jsonTable = JSON_FACTORY.toString(getTableWithDefaultProject(bqOptions));
- } catch (IOException e) {
- throw new RuntimeException("Cannot initialize table to JSON strings.", e);
- }
+ TableReference inputTable = getTableWithDefaultProject(bqOptions);
source = BigQueryTableSource.create(
- jobIdToken, jsonTable, extractDestinationDir, bqServices);
+ jobIdToken, inputTable, extractDestinationDir, bqServices, executingProject);
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
new PassThroughThenCleanup.CleanupOperation() {
@@ -595,7 +584,7 @@ public class BigQueryIO {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
JobReference jobRef = new JobReference()
- .setProjectId(bqOptions.getProject())
+ .setProjectId(executingProject)
.setJobId(getExtractJobId(jobIdToken));
Job extractJob = bqServices.getJobService(bqOptions).pollJob(
jobRef, CLEANUP_JOB_POLL_MAX_RETRIES);
@@ -759,10 +748,12 @@ public class BigQueryIO {
static BigQueryTableSource create(
String jobIdToken,
- String jsonTable,
+ TableReference table,
String extractDestinationDir,
- BigQueryServices bqServices) {
- return new BigQueryTableSource(jobIdToken, jsonTable, extractDestinationDir, bqServices);
+ BigQueryServices bqServices,
+ String executingProject) {
+ return new BigQueryTableSource(
+ jobIdToken, table, extractDestinationDir, bqServices, executingProject);
}
private final String jsonTable;
@@ -770,11 +761,17 @@ public class BigQueryIO {
private BigQueryTableSource(
String jobIdToken,
- String jsonTable,
+ TableReference table,
String extractDestinationDir,
- BigQueryServices bqServices) {
- super(jobIdToken, extractDestinationDir, bqServices);
- this.jsonTable = checkNotNull(jsonTable, "jsonTable");
+ BigQueryServices bqServices,
+ String executingProject) {
+ super(jobIdToken, extractDestinationDir, bqServices, executingProject);
+ checkNotNull(table, "table");
+ try {
+ this.jsonTable = JSON_FACTORY.toString(table);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot initialize table to JSON strings.", e);
+ }
this.tableSizeBytes = new AtomicReference<>();
}
@@ -824,12 +821,17 @@ public class BigQueryIO {
static BigQueryQuerySource create(
String jobIdToken,
String query,
- String jsonQueryTempTable,
+ TableReference queryTempTableRef,
Boolean flattenResults,
String extractDestinationDir,
BigQueryServices bqServices) {
return new BigQueryQuerySource(
- jobIdToken, query, jsonQueryTempTable, flattenResults, extractDestinationDir, bqServices);
+ jobIdToken,
+ query,
+ queryTempTableRef,
+ flattenResults,
+ extractDestinationDir,
+ bqServices);
}
private final String query;
@@ -840,13 +842,18 @@ public class BigQueryIO {
private BigQueryQuerySource(
String jobIdToken,
String query,
- String jsonQueryTempTable,
+ TableReference queryTempTableRef,
Boolean flattenResults,
String extractDestinationDir,
BigQueryServices bqServices) {
- super(jobIdToken, extractDestinationDir, bqServices);
+ super(jobIdToken, extractDestinationDir, bqServices,
+ checkNotNull(queryTempTableRef, "queryTempTableRef").getProjectId());
this.query = checkNotNull(query, "query");
- this.jsonQueryTempTable = checkNotNull(jsonQueryTempTable, "jsonQueryTempTable");
+ try {
+ this.jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot initialize table to JSON strings.", e);
+ }
this.flattenResults = checkNotNull(flattenResults, "flattenResults");
this.dryRunJobStats = new AtomicReference<>();
}
@@ -861,7 +868,7 @@ public class BigQueryIO {
public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
return new BigQueryReader(this, bqServices.getReaderFromQuery(
- bqOptions, query, bqOptions.getProject(), flattenResults));
+ bqOptions, query, executingProject, flattenResults));
}
@Override
@@ -887,7 +894,12 @@ public class BigQueryIO {
// 3. Execute the query.
String queryJobId = jobIdToken + "-query";
executeQuery(
- queryJobId, query, tableToExtract, flattenResults, bqServices.getJobService(bqOptions));
+ executingProject,
+ queryJobId,
+ query,
+ tableToExtract,
+ flattenResults,
+ bqServices.getJobService(bqOptions));
return tableToExtract;
}
@@ -912,22 +924,22 @@ public class BigQueryIO {
private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
throws InterruptedException, IOException {
if (dryRunJobStats.get() == null) {
- String projectId = bqOptions.getProject();
JobStatistics jobStats =
- bqServices.getJobService(bqOptions).dryRunQuery(projectId, query);
+ bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query);
dryRunJobStats.compareAndSet(null, jobStats);
}
return dryRunJobStats.get();
}
private static void executeQuery(
+ String executingProject,
String jobId,
String query,
TableReference destinationTable,
boolean flattenResults,
JobService jobService) throws IOException, InterruptedException {
JobReference jobRef = new JobReference()
- .setProjectId(destinationTable.getProjectId())
+ .setProjectId(executingProject)
.setJobId(jobId);
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
queryConfig
@@ -978,14 +990,17 @@ public class BigQueryIO {
protected final String jobIdToken;
protected final String extractDestinationDir;
protected final BigQueryServices bqServices;
+ protected final String executingProject;
private BigQuerySourceBase(
String jobIdToken,
String extractDestinationDir,
- BigQueryServices bqServices) {
+ BigQueryServices bqServices,
+ String executingProject) {
this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
this.bqServices = checkNotNull(bqServices, "bqServices");
+ this.executingProject = checkNotNull(executingProject, "executingProject");
}
@Override
@@ -1029,7 +1044,7 @@ public class BigQueryIO {
String jobId, TableReference table, JobService jobService)
throws InterruptedException, IOException {
JobReference jobRef = new JobReference()
- .setProjectId(table.getProjectId())
+ .setProjectId(executingProject)
.setJobId(jobId);
String destinationUri = getExtractDestinationUri(extractDestinationDir);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/89d20a2d/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 6849018..2d1b550 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -870,10 +870,10 @@ public class BigQueryIOTest implements Serializable {
toJsonString(new TableRow().set("name", "c").set("number", "3")));
String jobIdToken = "testJobIdToken";
- String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name"));
+ TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name");
String extractDestinationDir = "mock://tempLocation";
- BoundedSource<TableRow> bqSource =
- BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices);
+ BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+ jobIdToken, table, extractDestinationDir, fakeBqServices, "project");
List<TableRow> expected = ImmutableList.of(
new TableRow().set("name", "a").set("number", "1"),
@@ -907,10 +907,10 @@ public class BigQueryIOTest implements Serializable {
toJsonString(new TableRow().set("name", "c").set("number", "3")));
String jobIdToken = "testJobIdToken";
- String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name"));
+ TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name");
String extractDestinationDir = "mock://tempLocation";
- BoundedSource<TableRow> bqSource =
- BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices);
+ BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+ jobIdToken, table, extractDestinationDir, fakeBqServices, "project");
List<TableRow> expected = ImmutableList.of(
new TableRow().set("name", "a").set("number", "1"),
@@ -973,10 +973,9 @@ public class BigQueryIOTest implements Serializable {
String jobIdToken = "testJobIdToken";
String extractDestinationDir = "mock://tempLocation";
- TableReference destinationTable = BigQueryIO.parseTableSpec("project.data_set.table_name");
- String jsonDestinationTable = toJsonString(destinationTable);
+ TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
- jobIdToken, "query", jsonDestinationTable, true /* flattenResults */,
+ jobIdToken, "query", destinationTable, true /* flattenResults */,
extractDestinationDir, fakeBqServices);
List<TableRow> expected = ImmutableList.of(