You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/05/10 00:07:16 UTC
[2/3] beam git commit: Register TestSparkPipelineOptions only in
src/test to avoid hard hamcrest dep
Register TestSparkPipelineOptions only in src/test to avoid hard hamcrest dep
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95d33c52
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95d33c52
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95d33c52
Branch: refs/heads/master
Commit: 95d33c521788ce8046201a6baf22e46950560cf1
Parents: c4adbd3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 9 12:44:19 2017 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Tue May 9 17:06:57 2017 -0700
----------------------------------------------------------------------
.../runners/spark/SparkRunnerRegistrar.java | 4 +--
.../beam/runners/spark/TestSparkRunner.java | 29 ++++++++--------
.../runners/spark/SparkRunnerRegistrarTest.java | 2 +-
.../TestSparkPipelineOptionsRegistrar.java | 36 ++++++++++++++++++++
4 files changed, 52 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
index e2e5ceb..325c86d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerRegistrar.java
@@ -54,9 +54,7 @@ public final class SparkRunnerRegistrar {
public static class Options implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(
- SparkPipelineOptions.class,
- TestSparkPipelineOptions.class);
+ return ImmutableList.<Class<? extends PipelineOptions>>of(SparkPipelineOptions.class);
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 6d10b75..eccee57 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -83,26 +83,25 @@ import org.slf4j.LoggerFactory;
public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
private static final Logger LOG = LoggerFactory.getLogger(TestSparkRunner.class);
- private final TestSparkPipelineOptions testSparkPipelineOptions;
-
+ private final PipelineOptions options;
private SparkRunner delegate;
- private boolean isForceStreaming;
- private TestSparkRunner(TestSparkPipelineOptions options) {
+ private TestSparkRunner(PipelineOptions options) {
this.delegate = SparkRunner.fromOptions(options);
- this.isForceStreaming = options.isForceStreaming();
- this.testSparkPipelineOptions = options;
+ this.options = options;
}
public static TestSparkRunner fromOptions(PipelineOptions options) {
- // Default options suffice to set it up as a test runner
- TestSparkPipelineOptions sparkOptions =
- PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options);
- return new TestSparkRunner(sparkOptions);
+ return new TestSparkRunner(options);
}
@Override
public SparkPipelineResult run(Pipeline pipeline) {
+ // Default options suffice to set it up as a test runner
+ TestSparkPipelineOptions testSparkOptions =
+ PipelineOptionsValidator.validate(TestSparkPipelineOptions.class, options);
+
+ boolean isForceStreaming = testSparkOptions.isForceStreaming();
// if the pipeline forces execution as a streaming pipeline,
// and the source is an adapted unbounded source (as bounded),
// read it as unbounded source via UnboundedReadFromBoundedSource.
@@ -116,13 +115,13 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
MetricsAccumulator.clear();
GlobalWatermarkHolder.clear();
- LOG.info("About to run test pipeline " + testSparkPipelineOptions.getJobName());
+ LOG.info("About to run test pipeline " + options.getJobName());
// if the pipeline was executed in streaming mode, validate aggregators.
if (isForceStreaming) {
try {
result = delegate.run(pipeline);
- awaitWatermarksOrTimeout(testSparkPipelineOptions, result);
+ awaitWatermarksOrTimeout(testSparkOptions, result);
result.stop();
PipelineResult.State finishState = result.getState();
// assert finish state.
@@ -133,7 +132,7 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
} finally {
try {
// cleanup checkpoint dir.
- FileUtils.deleteDirectory(new File(testSparkPipelineOptions.getCheckpointDir()));
+ FileUtils.deleteDirectory(new File(testSparkOptions.getCheckpointDir()));
} catch (IOException e) {
throw new RuntimeException("Failed to clear checkpoint tmp dir.", e);
}
@@ -150,8 +149,8 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
finishState,
is(PipelineResult.State.DONE));
// assert via matchers.
- assertThat(result, testSparkPipelineOptions.getOnCreateMatcher());
- assertThat(result, testSparkPipelineOptions.getOnSuccessMatcher());
+ assertThat(result, testSparkOptions.getOnCreateMatcher());
+ assertThat(result, testSparkOptions.getOnSuccessMatcher());
}
return result;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
index 75899f9..4e1fd7c 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerRegistrarTest.java
@@ -38,7 +38,7 @@ public class SparkRunnerRegistrarTest {
@Test
public void testOptions() {
assertEquals(
- ImmutableList.of(SparkPipelineOptions.class, TestSparkPipelineOptions.class),
+ ImmutableList.of(SparkPipelineOptions.class),
new SparkRunnerRegistrar.Options().getPipelineOptions());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95d33c52/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..e71880b
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TestSparkPipelineOptionsRegistrar.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/**
+ * A registrar for {@link TestSparkPipelineOptions} to temporarily work around some complexities in
+ * {@link PipelineOptions} parsing.
+ */
+@AutoService(PipelineOptionsRegistrar.class)
+public final class TestSparkPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(TestSparkPipelineOptions.class);
+ }
+}