You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 20:24:34 UTC
[1/2] beam git commit: Eliminate Pipeline.getOptions
Repository: beam
Updated Branches:
refs/heads/master 70e53e7dc -> 1b363ae9f
Eliminate Pipeline.getOptions
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/55351dce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/55351dce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/55351dce
Branch: refs/heads/master
Commit: 55351dcebec8ba9e166c4f90555edca6b90b1b14
Parents: 70e53e7
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 03:06:36 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 13:00:15 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 7 +++-
.../examples/complete/game/HourlyTeamScore.java | 6 ++-
.../examples/complete/game/LeaderBoard.java | 26 ++++++++----
.../beam/examples/complete/game/UserScore.java | 20 +++++----
.../complete/game/utils/WriteToBigQuery.java | 32 +++++++-------
.../game/utils/WriteWindowedToBigQuery.java | 8 ++--
.../beam/runners/direct/DirectRunner.java | 1 +
.../runners/direct/DisplayDataValidator.java | 6 +--
.../dataflow/testing/TestDataflowRunner.java | 3 +-
.../testing/TestDataflowRunnerTest.java | 18 +++-----
.../apache/beam/runners/spark/SparkRunner.java | 4 +-
.../beam/runners/spark/SparkRunnerDebugger.java | 4 +-
.../beam/runners/spark/TestSparkRunner.java | 5 +--
.../spark/translation/EvaluationContext.java | 16 +++++--
.../spark/translation/SparkRuntimeContext.java | 4 +-
.../SparkRunnerStreamingContextFactory.java | 2 +-
.../apache/beam/runners/spark/CacheTest.java | 2 +-
.../streaming/TrackStreamingSourcesTest.java | 2 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 14 +------
.../apache/beam/sdk/testing/TestPipeline.java | 26 +++++-------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 44 ++++++++++----------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 1 +
22 files changed, 131 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index a46d3c5..abbb13b 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -24,6 +24,7 @@ import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
@@ -300,6 +301,8 @@ public class GameStats extends LeaderBoard {
// Write the result to BigQuery
.apply("WriteTeamSums",
new WriteWindowedToBigQuery<KV<String, Integer>>(
+ options.as(GcpOptions.class).getProject(),
+ options.getDataset(),
options.getGameStatsTablePrefix() + "_team", configureWindowedWrite()));
@@ -327,7 +330,9 @@ public class GameStats extends LeaderBoard {
// Write this info to a BigQuery table.
.apply("WriteAvgSessionLength",
new WriteWindowedToBigQuery<Double>(
- options.getGameStatsTablePrefix() + "_sessions", configureSessionWindowWrite()));
+ options.as(GcpOptions.class).getProject(),
+ options.getDataset(),
+ options.getGameStatsTablePrefix() + "_sessions", configureSessionWindowWrite()));
// [END DocInclude_Rewindow]
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 3f1ffb0..2928882 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.TimeZone;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -185,7 +186,10 @@ public class HourlyTeamScore extends UserScore {
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndSumScore("team"))
.apply("WriteTeamScoreSums",
- new WriteWindowedToBigQuery<KV<String, Integer>>(options.getHourlyTeamScoreTableName(),
+ new WriteWindowedToBigQuery<KV<String, Integer>>(
+ options.as(GcpOptions.class).getProject(),
+ options.getDataset(),
+ options.getHourlyTeamScoreTableName(),
configureWindowedTableWrite()));
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 9af34c5..bfad9f6 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -27,6 +27,7 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
@@ -194,14 +195,20 @@ public class LeaderBoard extends HourlyTeamScore {
.withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()));
- gameEvents.apply("CalculateTeamScores",
- new CalculateTeamScores(
- Duration.standardMinutes(options.getTeamWindowDuration()),
- Duration.standardMinutes(options.getAllowedLateness())))
+ gameEvents
+ .apply(
+ "CalculateTeamScores",
+ new CalculateTeamScores(
+ Duration.standardMinutes(options.getTeamWindowDuration()),
+ Duration.standardMinutes(options.getAllowedLateness())))
// Write the results to BigQuery.
- .apply("WriteTeamScoreSums",
- new WriteWindowedToBigQuery<KV<String, Integer>>(
- options.getLeaderBoardTableName() + "_team", configureWindowedTableWrite()));
+ .apply(
+ "WriteTeamScoreSums",
+ new WriteWindowedToBigQuery<KV<String, Integer>>(
+ options.as(GcpOptions.class).getProject(),
+ options.getDataset(),
+ options.getLeaderBoardTableName() + "_team",
+ configureWindowedTableWrite()));
gameEvents
.apply(
"CalculateUserScores",
@@ -210,7 +217,10 @@ public class LeaderBoard extends HourlyTeamScore {
.apply(
"WriteUserScoreSums",
new WriteToBigQuery<KV<String, Integer>>(
- options.getLeaderBoardTableName() + "_user", configureGlobalWindowBigQueryWrite()));
+ options.as(GcpOptions.class).getProject(),
+ options.getDataset(),
+ options.getLeaderBoardTableName() + "_user",
+ configureGlobalWindowBigQueryWrite()));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index c136c2e..8110146 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -24,6 +24,7 @@ import org.apache.beam.examples.complete.game.utils.WriteToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
@@ -226,13 +227,18 @@ public class UserScore {
Pipeline pipeline = Pipeline.create(options);
// Read events from a text file and parse them.
- pipeline.apply(TextIO.read().from(options.getInput()))
- .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
- // Extract and sum username/score pairs from the event data.
- .apply("ExtractUserScore", new ExtractAndSumScore("user"))
- .apply("WriteUserScoreSums",
- new WriteToBigQuery<KV<String, Integer>>(options.getUserScoreTableName(),
- configureBigQueryWrite()));
+ pipeline
+ .apply(TextIO.read().from(options.getInput()))
+ .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
+ // Extract and sum username/score pairs from the event data.
+ .apply("ExtractUserScore", new ExtractAndSumScore("user"))
+ .apply(
+ "WriteUserScoreSums",
+ new WriteToBigQuery<KV<String, Integer>>(
+ options.as(GcpOptions.class).getProject(),
+ options.getDataset(),
+ options.getUserScoreTableName(),
+ configureBigQueryWrite()));
// Run the batch pipeline.
pipeline.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index f767d21..2ec4e5c 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -25,13 +25,9 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.beam.examples.complete.game.UserScore;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -47,14 +43,21 @@ import org.apache.beam.sdk.values.PDone;
public class WriteToBigQuery<InputT>
extends PTransform<PCollection<InputT>, PDone> {
+ protected String projectId;
+ protected String datasetId;
protected String tableName;
protected Map<String, FieldInfo<InputT>> fieldInfo;
public WriteToBigQuery() {
}
- public WriteToBigQuery(String tableName,
+ public WriteToBigQuery(
+ String projectId,
+ String datasetId,
+ String tableName,
Map<String, FieldInfo<InputT>> fieldInfo) {
+ this.projectId = projectId;
+ this.datasetId = datasetId;
this.tableName = tableName;
this.fieldInfo = fieldInfo;
}
@@ -120,20 +123,21 @@ public class WriteToBigQuery<InputT>
@Override
public PDone expand(PCollection<InputT> teamAndScore) {
teamAndScore
- .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
- .apply(BigQueryIO.writeTableRows().to(getTable(teamAndScore.getPipeline(), tableName))
- .withSchema(getSchema())
- .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(getTable(projectId, datasetId, tableName))
+ .withSchema(getSchema())
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND));
return PDone.in(teamAndScore.getPipeline());
}
/** Utility to construct an output table reference. */
- static TableReference getTable(Pipeline pipeline, String tableName) {
- PipelineOptions options = pipeline.getOptions();
+ static TableReference getTable(String projectId, String datasetId, String tableName) {
TableReference table = new TableReference();
- table.setDatasetId(options.as(UserScore.Options.class).getDataset());
- table.setProjectId(options.as(GcpOptions.class).getProject());
+ table.setDatasetId(datasetId);
+ table.setProjectId(projectId);
table.setTableId(tableName);
return table;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index e602258..deb9db2 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -37,9 +37,9 @@ import org.apache.beam.sdk.values.PDone;
public class WriteWindowedToBigQuery<T>
extends WriteToBigQuery<T> {
- public WriteWindowedToBigQuery(String tableName,
- Map<String, FieldInfo<T>> fieldInfo) {
- super(tableName, fieldInfo);
+ public WriteWindowedToBigQuery(
+ String projectId, String datasetId, String tableName, Map<String, FieldInfo<T>> fieldInfo) {
+ super(projectId, datasetId, tableName, fieldInfo);
}
/** Convert each key/score pair into a BigQuery TableRow. */
@@ -62,7 +62,7 @@ public class WriteWindowedToBigQuery<T>
teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(BigQueryIO.writeTableRows()
- .to(getTable(teamAndScore.getPipeline(), tableName))
+ .to(getTable(projectId, datasetId, tableName))
.withSchema(getSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index c6168b3e..984598a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -163,6 +163,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
pipeline.traverseTopologically(keyedPValueVisitor);
DisplayDataValidator.validatePipeline(pipeline);
+ DisplayDataValidator.validateOptions(getPipelineOptions());
DirectGraph graph = graphVisitor.getGraph();
EvaluationContext context =
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
index c77cb48..209c801 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -31,12 +32,11 @@ class DisplayDataValidator {
private DisplayDataValidator() {}
static void validatePipeline(Pipeline pipeline) {
- validateOptions(pipeline);
validateTransforms(pipeline);
}
- private static void validateOptions(Pipeline pipeline) {
- evaluateDisplayData(pipeline.getOptions());
+ static void validateOptions(PipelineOptions options) {
+ evaluateDisplayData(options);
}
private static void validateTransforms(Pipeline pipeline) {
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
index ba9d971..c238d80 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
@@ -98,7 +98,7 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
DataflowPipelineJob run(Pipeline pipeline, DataflowRunner runner) {
updatePAssertCount(pipeline);
- TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
+ TestPipelineOptions testPipelineOptions = options.as(TestPipelineOptions.class);
final DataflowPipelineJob job;
job = runner.run(pipeline);
@@ -188,7 +188,6 @@ public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@VisibleForTesting
void updatePAssertCount(Pipeline pipeline) {
- DataflowPipelineOptions options = pipeline.getOptions().as(DataflowPipelineOptions.class);
if (DataflowRunner.hasExperiment(options, "beam_fn_api")) {
// TODO[BEAM-1866]: FnAPI does not support metrics, so expect 0 assertions.
expectedNumberOfAssertions = 0;
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
index 54eb88d..eb068e6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java
@@ -464,8 +464,7 @@ public class TestDataflowRunnerTest {
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- p.getOptions().as(TestPipelineOptions.class)
- .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+ options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
@@ -488,8 +487,7 @@ public class TestDataflowRunnerTest {
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- p.getOptions().as(TestPipelineOptions.class)
- .setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
+ options.as(TestPipelineOptions.class).setOnCreateMatcher(new TestSuccessMatcher(mockJob, 0));
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
@@ -515,8 +513,7 @@ public class TestDataflowRunnerTest {
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- p.getOptions().as(TestPipelineOptions.class)
- .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+ options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(true /* success */, true /* tentative */));
@@ -544,8 +541,7 @@ public class TestDataflowRunnerTest {
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- p.getOptions().as(TestPipelineOptions.class)
- .setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
+ options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestSuccessMatcher(mockJob, 1));
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.DONE);
@@ -570,8 +566,7 @@ public class TestDataflowRunnerTest {
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- p.getOptions().as(TestPipelineOptions.class)
- .setOnSuccessMatcher(new TestFailureMatcher());
+ options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
when(mockClient.getJobMetrics(anyString()))
.thenReturn(generateMockMetricResponse(false /* success */, true /* tentative */));
@@ -606,8 +601,7 @@ public class TestDataflowRunnerTest {
when(mockRunner.run(any(Pipeline.class))).thenReturn(mockJob);
TestDataflowRunner runner = TestDataflowRunner.fromOptionsAndClient(options, mockClient);
- p.getOptions().as(TestPipelineOptions.class)
- .setOnSuccessMatcher(new TestFailureMatcher());
+ options.as(TestPipelineOptions.class).setOnSuccessMatcher(new TestFailureMatcher());
when(mockJob.waitUntilFinish(any(Duration.class), any(JobMessagesHandler.class)))
.thenReturn(State.FAILED);
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 97487f3..1a0c042 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -193,7 +193,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
} else {
// create the evaluation context
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
- final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline);
+ final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline, mOptions);
translator = new TransformTranslator.Translator();
// update the cache candidates
@@ -383,7 +383,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
LOG.info(
"Deferring combine transformation {} for job {}",
transform,
- ctxt.getPipeline().getOptions().getJobName());
+ ctxt.getOptions().getJobName());
return true;
}
// default.
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
index 7f7aefc..8d47e1a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java
@@ -89,10 +89,10 @@ public final class SparkRunnerDebugger extends PipelineRunner<SparkPipelineResul
&& ((TestSparkPipelineOptions) options).isForceStreaming()) {
SparkPipelineTranslator streamingTranslator =
new StreamingTransformTranslator.Translator(translator);
- EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
+ EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
visitor = new SparkNativePipelineVisitor(streamingTranslator, ctxt);
} else {
- EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
+ EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
visitor = new SparkNativePipelineVisitor(translator, ctxt);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 1e67813..6808d7b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -83,6 +83,7 @@ import org.slf4j.LoggerFactory;
public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(TestSparkRunner.class);
+ private final TestSparkPipelineOptions testSparkPipelineOptions;
private SparkRunner delegate;
private boolean isForceStreaming;
@@ -90,6 +91,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
private TestSparkRunner(TestSparkPipelineOptions options) {
this.delegate = SparkRunner.fromOptions(options);
this.isForceStreaming = options.isForceStreaming();
+ this.testSparkPipelineOptions = options;
}
public static TestSparkRunner fromOptions(PipelineOptions options) {
@@ -101,9 +103,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
@Override
public SparkPipelineResult run(Pipeline pipeline) {
- TestSparkPipelineOptions testSparkPipelineOptions =
- pipeline.getOptions().as(TestSparkPipelineOptions.class);
- //
// if the pipeline forces execution as a streaming pipeline,
// and the source is an adapted unbounded source (as bounded),
// read it as unbounded source via UnboundedReadFromBoundedSource.
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 838c504..5d77e91 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.coders.CoderHelpers;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.WindowedValue;
@@ -57,15 +58,18 @@ public class EvaluationContext {
private AppliedPTransform<?, ?, ?> currentTransform;
private final SparkPCollectionView pviews = new SparkPCollectionView();
private final Map<PCollection, Long> cacheCandidates = new HashMap<>();
+ private final PipelineOptions options;
- public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
+ public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options) {
this.jsc = jsc;
this.pipeline = pipeline;
- this.runtime = new SparkRuntimeContext(pipeline);
+ this.options = options;
+ this.runtime = new SparkRuntimeContext(pipeline, options);
}
- public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline, JavaStreamingContext jssc) {
- this(jsc, pipeline);
+ public EvaluationContext(
+ JavaSparkContext jsc, Pipeline pipeline, PipelineOptions options, JavaStreamingContext jssc) {
+ this(jsc, pipeline, options);
this.jssc = jssc;
}
@@ -81,6 +85,10 @@ public class EvaluationContext {
return pipeline;
}
+ public PipelineOptions getOptions() {
+ return options;
+ }
+
public SparkRuntimeContext getRuntimeContext() {
return runtime;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index e006143..3db1ab5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -35,8 +35,8 @@ public class SparkRuntimeContext implements Serializable {
private final String serializedPipelineOptions;
private transient CoderRegistry coderRegistry;
- SparkRuntimeContext(Pipeline pipeline) {
- this.serializedPipelineOptions = serializePipelineOptions(pipeline.getOptions());
+ SparkRuntimeContext(Pipeline pipeline, PipelineOptions options) {
+ this.serializedPipelineOptions = serializePipelineOptions(options);
}
private String serializePipelineOptions(PipelineOptions pipelineOptions) {
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 2dd18f3..6a153ff 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -82,7 +82,7 @@ public class SparkRunnerStreamingContextFactory implements Function0<JavaStreami
// We must first init accumulators since translators expect them to be instantiated.
SparkRunner.initAccumulators(options, jsc);
- EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc);
+ EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options, jssc);
// update cache candidates
SparkRunner.updateCacheCandidates(pipeline, translator, ctxt);
pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt));
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
index 24b2e7b..d3d0823 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/CacheTest.java
@@ -51,7 +51,7 @@ public class CacheTest {
pCollection.apply(Count.<String>globally());
JavaSparkContext jsc = SparkContextFactory.getSparkContext(options);
- EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
+ EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, options);
SparkRunner.CacheVisitor cacheVisitor =
new SparkRunner.CacheVisitor(new TransformTranslator.Translator(), ctxt);
pipeline.traverseTopologically(cacheVisitor);
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
index 41ccd08..3dcab26 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
@@ -135,7 +135,7 @@ public class TrackStreamingSourcesTest {
Pipeline pipeline,
Class<? extends PTransform> transformClassToAssert,
Integer... expected) {
- this.ctxt = new EvaluationContext(jssc.sparkContext(), pipeline, jssc);
+ this.ctxt = new EvaluationContext(jssc.sparkContext(), pipeline, options, jssc);
this.evaluator = new SparkRunner.Evaluator(
new StreamingTransformTranslator.Translator(new TransformTranslator.Translator()), ctxt);
this.transformClassToAssert = transformClassToAssert;
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 6fa7a5a..f7c3f24 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -460,7 +460,7 @@ public class Pipeline {
private Set<String> usedFullNames = new HashSet<>();
private CoderRegistry coderRegistry;
private final List<String> unstableNames = new ArrayList<>();
- protected final PipelineOptions defaultOptions;
+ private final PipelineOptions defaultOptions;
protected Pipeline(PipelineOptions options) {
this.defaultOptions = options;
@@ -472,18 +472,6 @@ public class Pipeline {
}
/**
- * Returns the default {@link PipelineOptions} provided to {@link #create(PipelineOptions)}.
- *
- * @deprecated see BEAM-818 Remove Pipeline.getPipelineOptions. Configuration should be explicitly
- * provided to a transform if it is required.
- */
- @Deprecated
- public PipelineOptions getOptions() {
- return defaultOptions;
- }
-
-
- /**
* Applies a {@link PTransform} to the given {@link PInput}.
*
* @see Pipeline#apply
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 2d34b22..96cae51 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -26,7 +26,6 @@ import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
@@ -104,6 +103,8 @@ import org.junit.runners.model.Statement;
*/
public class TestPipeline extends Pipeline implements TestRule {
+ private final PipelineOptions options;
+
private static class PipelineRunEnforcement {
@SuppressWarnings("WeakerAccess")
@@ -183,10 +184,7 @@ public class TestPipeline extends Pipeline implements TestRule {
private void verifyPipelineExecution() {
if (!isEmptyPipeline(pipeline)) {
if (!runAttempted && !enableAutoRunIfMissing) {
- throw new PipelineRunMissingException(
- "The pipeline has not been run (runner: "
- + pipeline.getOptions().getRunner().getSimpleName()
- + ")");
+ throw new PipelineRunMissingException("The pipeline has not been run.");
} else {
final List<TransformHierarchy.Node> pipelineNodes = recordPipelineNodes(pipeline);
@@ -272,6 +270,11 @@ public class TestPipeline extends Pipeline implements TestRule {
private TestPipeline(final PipelineOptions options) {
super(options);
+ this.options = options;
+ }
+
+ public PipelineOptions getOptions() {
+ return this.options;
}
@Override
@@ -288,7 +291,7 @@ public class TestPipeline extends Pipeline implements TestRule {
.anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true));
final boolean crashingRunner =
- CrashingRunner.class.isAssignableFrom(getOptions().getRunner());
+ CrashingRunner.class.isAssignableFrom(options.getRunner());
checkState(
!(annotatedWithNeedsRunner && crashingRunner),
@@ -381,18 +384,9 @@ public class TestPipeline extends Pipeline implements TestRule {
return this;
}
- @VisibleForTesting
- @Override
- /**
- * Get this pipeline's options.
- */
- public PipelineOptions getOptions() {
- return defaultOptions;
- }
-
@Override
public String toString() {
- return "TestPipeline#" + getOptions().as(ApplicationNameOptions.class).getAppName();
+ return "TestPipeline#" + options.as(ApplicationNameOptions.class).getAppName();
}
/** Creates {@link PipelineOptions} for testing. */
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 0e36393..fbbf862 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -888,6 +888,26 @@ public class BigQueryIO {
public void validate(PipelineOptions pipelineOptions) {
BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
+ // The user specified a table.
+ if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) {
+ TableReference table = getTableWithDefaultProject(options).get();
+ DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+ // Check for destination table presence and emptiness for early failure notification.
+ // Note that a presence check can fail when the table or dataset is created by an earlier
+ // stage of the pipeline. For these cases the #withoutValidation method can be used to
+ // disable the check.
+ BigQueryHelpers.verifyDatasetPresence(datasetService, table);
+ if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+ BigQueryHelpers.verifyTablePresence(datasetService, table);
+ }
+ if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+ BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
+ }
+ }
+ }
+
+ @Override
+ public WriteResult expand(PCollection<T> input) {
// We must have a destination to write to!
checkState(
getTableFunction() != null || getJsonTableRef() != null
@@ -916,29 +936,7 @@ public class BigQueryIO {
checkArgument(2
> Iterables.size(Iterables.filter(allSchemaArgs, Predicates.notNull())),
"No more than one of jsonSchema, schemaFromView, or dynamicDestinations may "
- + "be set");
-
- // The user specified a table.
- if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) {
- TableReference table = getTableWithDefaultProject(options).get();
- DatasetService datasetService = getBigQueryServices().getDatasetService(options);
- // Check for destination table presence and emptiness for early failure notification.
- // Note that a presence check can fail when the table or dataset is created by an earlier
- // stage of the pipeline. For these cases the #withoutValidation method can be used to
- // disable the check.
- BigQueryHelpers.verifyDatasetPresence(datasetService, table);
- if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
- BigQueryHelpers.verifyTablePresence(datasetService, table);
- }
- if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
- BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
- }
- }
- }
-
- @Override
- public WriteResult expand(PCollection<T> input) {
- validate(input.getPipeline().getOptions());
+ + "be set");
DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
if (dynamicDestinations == null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/55351dce/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index aabae3e..b893ad5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1175,6 +1175,7 @@ public class BigQueryIOTest implements Serializable {
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema())
.withTestServices(fakeBqServices));
+ p.run();
}
@Test
[2/2] beam git commit: This closed #2857: Eliminate
Pipeline.getOptions
Posted by ke...@apache.org.
This closed #2857: Eliminate Pipeline.getOptions
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1b363ae9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1b363ae9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1b363ae9
Branch: refs/heads/master
Commit: 1b363ae9f4e3ec1479befef66c5edd785d34ed01
Parents: 70e53e7 55351dc
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 4 13:01:32 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 13:01:32 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/game/GameStats.java | 7 +++-
.../examples/complete/game/HourlyTeamScore.java | 6 ++-
.../examples/complete/game/LeaderBoard.java | 26 ++++++++----
.../beam/examples/complete/game/UserScore.java | 20 +++++----
.../complete/game/utils/WriteToBigQuery.java | 32 +++++++-------
.../game/utils/WriteWindowedToBigQuery.java | 8 ++--
.../beam/runners/direct/DirectRunner.java | 1 +
.../runners/direct/DisplayDataValidator.java | 6 +--
.../dataflow/testing/TestDataflowRunner.java | 3 +-
.../testing/TestDataflowRunnerTest.java | 18 +++-----
.../apache/beam/runners/spark/SparkRunner.java | 4 +-
.../beam/runners/spark/SparkRunnerDebugger.java | 4 +-
.../beam/runners/spark/TestSparkRunner.java | 5 +--
.../spark/translation/EvaluationContext.java | 16 +++++--
.../spark/translation/SparkRuntimeContext.java | 4 +-
.../SparkRunnerStreamingContextFactory.java | 2 +-
.../apache/beam/runners/spark/CacheTest.java | 2 +-
.../streaming/TrackStreamingSourcesTest.java | 2 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 14 +------
.../apache/beam/sdk/testing/TestPipeline.java | 26 +++++-------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 44 ++++++++++----------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 1 +
22 files changed, 131 insertions(+), 120 deletions(-)
----------------------------------------------------------------------