You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/23 02:31:25 UTC
[1/2] incubator-beam git commit: Add failsafe plugin for integration
tests.
Repository: incubator-beam
Updated Branches:
refs/heads/master 983bef0cc -> 039d71328
Add failsafe plugin for integration tests.
Signed-off-by: Jason Kuster <ja...@google.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0eacebed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0eacebed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0eacebed
Branch: refs/heads/master
Commit: 0eacebedea9e9818a1022f222a8129ff276f651c
Parents: 983bef0
Author: Jason Kuster <ja...@google.com>
Authored: Wed Mar 30 16:24:52 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Apr 22 17:17:39 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 26 ++++++++
.../org/apache/beam/examples/WordCountIT.java | 64 ++++++++++++++++++++
pom.xml | 6 ++
.../testing/TestDataflowPipelineOptions.java | 3 +-
.../sdk/testing/TestDataflowPipelineRunner.java | 14 +++++
.../testing/TestDataflowPipelineRunnerTest.java | 1 +
.../apache/beam/sdk/testing/TestPipeline.java | 20 +++++-
.../beam/sdk/testing/TestPipelineOptions.java | 30 +++++++++
.../beam/sdk/testing/TestPipelineTest.java | 16 +++++
9 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index f0ed060..9626bf3 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -36,6 +36,10 @@
<packaging>jar</packaging>
+ <properties>
+ <skipITs>true</skipITs>
+ </properties>
+
<build>
<plugins>
<plugin>
@@ -221,6 +225,28 @@
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <useManifestOnlyJar>false</useManifestOnlyJar>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>${integrationTestPipelineOptions}</beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
new file mode 100644
index 0000000..a5ad707
--- /dev/null
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.beam.examples.WordCount.WordCountOptions;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestDataflowPipelineRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import com.google.common.base.Joiner;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * End-to-end tests of WordCount.
+ */
+@RunWith(JUnit4.class)
+public class WordCountIT {
+
+ /**
+ * Options for the WordCount Integration Test.
+ */
+ public static interface WordCountITOptions extends TestPipelineOptions,
+ WordCountOptions, DataflowPipelineOptions {
+ }
+
+ @Test
+ public void testE2EWordCount() throws Exception {
+ PipelineOptionsFactory.register(WordCountITOptions.class);
+ WordCountITOptions options = TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
+ options.setOutput(Joiner.on("/").join(new String[]{options.getTempRoot(),
+ options.getJobName(), "output", "results"}));
+
+ WordCount.main(TestPipeline.convertToArgs(options));
+ PipelineResult result =
+ TestDataflowPipelineRunner.getPipelineResultByJobName(options.getJobName());
+
+ assertNotNull("Result was null.", result);
+ assertEquals("Pipeline state was not done.", PipelineResult.State.DONE, result.getState());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 75eb873..03f0fb2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -308,6 +308,12 @@
</dependencies>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.19.1</version>
+ </plugin>
+
<!-- This plugin's configuration tells the m2e plugin how to import this
Maven project into the Eclipse environment. -->
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java
index 171231c..f8913af 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.options.BlockingDataflowPipelineOptions;
/**
* A set of options used to configure the {@link TestPipeline}.
*/
-public interface TestDataflowPipelineOptions extends BlockingDataflowPipelineOptions {
+public interface TestDataflowPipelineOptions extends TestPipelineOptions,
+ BlockingDataflowPipelineOptions {
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
index ef3333d..d647b0d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.testing;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.DataflowJobExecutionException;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.values.POutput;
import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
@@ -42,7 +44,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -56,6 +60,8 @@ import java.util.concurrent.TimeUnit;
public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
private static final String TENTATIVE_COUNTER = "tentative";
private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
+ private static final Map<String, PipelineResult> EXECUTION_RESULTS =
+ new ConcurrentHashMap<String, PipelineResult>();
private final TestDataflowPipelineOptions options;
private final DataflowPipelineRunner runner;
@@ -72,10 +78,17 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
public static TestDataflowPipelineRunner fromOptions(
PipelineOptions options) {
TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
+ dataflowOptions.setStagingLocation(Joiner.on("/").join(
+ new String[]{dataflowOptions.getTempRoot(),
+ dataflowOptions.getJobName(), "output", "results"}));
return new TestDataflowPipelineRunner(dataflowOptions);
}
+ public static PipelineResult getPipelineResultByJobName(String jobName) {
+ return EXECUTION_RESULTS.get(jobName);
+ }
+
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
return run(pipeline, runner);
@@ -146,6 +159,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
} catch (IOException e) {
throw new RuntimeException(e);
}
+ EXECUTION_RESULTS.put(options.getJobName(), job);
return job;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
index b480c0b..185ab51 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
@@ -98,6 +98,7 @@ public class TestDataflowPipelineRunnerTest {
options.setAppName("TestAppName");
options.setProject("test-project");
options.setTempLocation("gs://test/temp/location");
+ options.setTempRoot("gs://test");
options.setGcpCredential(new TestCredential());
options.setDataflowClient(service);
options.setRunner(TestDataflowPipelineRunner.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 72c8cbc..cdd3b84 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -37,7 +37,9 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Map;
import javax.annotation.Nullable;
@@ -136,7 +138,7 @@ public class TestPipeline extends Pipeline {
: PipelineOptionsFactory.fromArgs(
MAPPER.readValue(
System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS), String[].class))
- .as(PipelineOptions.class);
+ .as(TestPipelineOptions.class);
options.as(ApplicationNameOptions.class).setAppName(getAppName());
// If no options were specified, use a test credential object on all pipelines.
@@ -152,6 +154,22 @@ public class TestPipeline extends Pipeline {
}
}
+
+ public static String[] convertToArgs(PipelineOptions options) {
+ try {
+ Map<String, Object> stringOpts = (Map<String, Object>) MAPPER.readValue(
+ MAPPER.writeValueAsBytes(options), Map.class).get("options");
+
+ ArrayList<String> optArrayList = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : stringOpts.entrySet()) {
+ optArrayList.add("--" + entry.getKey() + "=" + entry.getValue());
+ }
+ return optArrayList.toArray(new String[optArrayList.size()]);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
/** Returns the class + method name of the test, or a default name. */
private static String getAppName() {
Optional<StackTraceElement> stackTraceElement = findCallersStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
new file mode 100644
index 0000000..2599ae2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link TestPipelineOptions} is a set of options for test pipelines.
+ *
+ * @see TestPipeline
+ */
+public interface TestPipelineOptions extends PipelineOptions {
+ String getTempRoot();
+ void setTempRoot(String value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index fec0b48..9460e13 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.testing;
import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -25,6 +26,7 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.DirectPipelineRunner;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,6 +37,9 @@ import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.util.Arrays;
+import java.util.List;
+
/** Tests for {@link TestPipeline}. */
@RunWith(JUnit4.class)
public class TestPipelineTest {
@@ -86,6 +91,17 @@ public class TestPipelineTest {
}
@Test
+ public void testConvertToArgs() {
+ String[] args = new String[]{"--tempLocation=Test_Location"};
+ PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
+ String[] arr = TestPipeline.convertToArgs(options);
+ List<String> lst = Arrays.asList(arr);
+ assertEquals(lst.size(), 2);
+ assertThat(lst, containsInAnyOrder("--tempLocation=Test_Location",
+ "--appName=TestPipelineTest"));
+ }
+
+ @Test
public void testToStringNestedClassMethod() {
TestPipeline p = new NestedTester().p();
[2/2] incubator-beam git commit: This closes #101
Posted by da...@apache.org.
This closes #101
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/039d7132
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/039d7132
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/039d7132
Branch: refs/heads/master
Commit: 039d7132867a6d810f4025b151267f3861697aab
Parents: 983bef0 0eacebe
Author: Davor Bonaci <da...@google.com>
Authored: Fri Apr 22 17:17:59 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Apr 22 17:17:59 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 26 ++++++++
.../org/apache/beam/examples/WordCountIT.java | 64 ++++++++++++++++++++
pom.xml | 6 ++
.../testing/TestDataflowPipelineOptions.java | 3 +-
.../sdk/testing/TestDataflowPipelineRunner.java | 14 +++++
.../testing/TestDataflowPipelineRunnerTest.java | 1 +
.../apache/beam/sdk/testing/TestPipeline.java | 20 +++++-
.../beam/sdk/testing/TestPipelineOptions.java | 30 +++++++++
.../beam/sdk/testing/TestPipelineTest.java | 16 +++++
9 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------