You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/04/23 02:31:25 UTC

[1/2] incubator-beam git commit: Add failsafe plugin for integration tests.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 983bef0cc -> 039d71328


Add failsafe plugin for integration tests.

Signed-off-by: Jason Kuster <ja...@google.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0eacebed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0eacebed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0eacebed

Branch: refs/heads/master
Commit: 0eacebedea9e9818a1022f222a8129ff276f651c
Parents: 983bef0
Author: Jason Kuster <ja...@google.com>
Authored: Wed Mar 30 16:24:52 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Apr 22 17:17:39 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           | 26 ++++++++
 .../org/apache/beam/examples/WordCountIT.java   | 64 ++++++++++++++++++++
 pom.xml                                         |  6 ++
 .../testing/TestDataflowPipelineOptions.java    |  3 +-
 .../sdk/testing/TestDataflowPipelineRunner.java | 14 +++++
 .../testing/TestDataflowPipelineRunnerTest.java |  1 +
 .../apache/beam/sdk/testing/TestPipeline.java   | 20 +++++-
 .../beam/sdk/testing/TestPipelineOptions.java   | 30 +++++++++
 .../beam/sdk/testing/TestPipelineTest.java      | 16 +++++
 9 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index f0ed060..9626bf3 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -36,6 +36,10 @@
 
   <packaging>jar</packaging>
 
+  <properties>
+    <skipITs>true</skipITs>
+  </properties>
+
   <build>
     <plugins>
       <plugin>
@@ -221,6 +225,28 @@
         <groupId>org.jacoco</groupId>
         <artifactId>jacoco-maven-plugin</artifactId>
       </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <configuration>
+          <useManifestOnlyJar>false</useManifestOnlyJar>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>integration-test</goal>
+              <goal>verify</goal>
+            </goals>
+            <configuration>
+              <systemPropertyVariables>
+                <beamTestPipelineOptions>${integrationTestPipelineOptions}</beamTestPipelineOptions>
+              </systemPropertyVariables>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
new file mode 100644
index 0000000..a5ad707
--- /dev/null
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.beam.examples.WordCount.WordCountOptions;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.DataflowPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestDataflowPipelineRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import com.google.common.base.Joiner;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * End-to-end tests of WordCount.
+ */
+@RunWith(JUnit4.class)
+public class WordCountIT {
+
+  /**
+   * Options for the WordCount Integration Test.
+   */
+  public static interface WordCountITOptions extends TestPipelineOptions,
+         WordCountOptions, DataflowPipelineOptions {
+  }
+
+  @Test
+  public void testE2EWordCount() throws Exception {
+    PipelineOptionsFactory.register(WordCountITOptions.class);
+    WordCountITOptions options = TestPipeline.testingPipelineOptions().as(WordCountITOptions.class);
+    options.setOutput(Joiner.on("/").join(new String[]{options.getTempRoot(),
+        options.getJobName(), "output", "results"}));
+
+    WordCount.main(TestPipeline.convertToArgs(options));
+    PipelineResult result =
+        TestDataflowPipelineRunner.getPipelineResultByJobName(options.getJobName());
+
+    assertNotNull("Result was null.", result);
+    assertEquals("Pipeline state was not done.", PipelineResult.State.DONE, result.getState());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 75eb873..03f0fb2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -308,6 +308,12 @@
           </dependencies>
         </plugin>
 
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-failsafe-plugin</artifactId>
+          <version>2.19.1</version>
+        </plugin>
+
         <!-- This plugin's configuration tells the m2e plugin how to import this
              Maven project into the Eclipse environment. -->
         <plugin>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java
index 171231c..f8913af 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineOptions.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.options.BlockingDataflowPipelineOptions;
 /**
  * A set of options used to configure the {@link TestPipeline}.
  */
-public interface TestDataflowPipelineOptions extends BlockingDataflowPipelineOptions {
+public interface TestDataflowPipelineOptions extends TestPipelineOptions,
+       BlockingDataflowPipelineOptions {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
index ef3333d..d647b0d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunner.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.testing;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.DataflowJobExecutionException;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.values.POutput;
 import com.google.api.services.dataflow.model.JobMessage;
 import com.google.api.services.dataflow.model.JobMetrics;
 import com.google.api.services.dataflow.model.MetricUpdate;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 
@@ -42,7 +44,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -56,6 +60,8 @@ import java.util.concurrent.TimeUnit;
 public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> {
   private static final String TENTATIVE_COUNTER = "tentative";
   private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
+  private static final Map<String, PipelineResult> EXECUTION_RESULTS =
+      new ConcurrentHashMap<String, PipelineResult>();
 
   private final TestDataflowPipelineOptions options;
   private final DataflowPipelineRunner runner;
@@ -72,10 +78,17 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
   public static TestDataflowPipelineRunner fromOptions(
       PipelineOptions options) {
     TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
+    dataflowOptions.setStagingLocation(Joiner.on("/").join(
+        new String[]{dataflowOptions.getTempRoot(),
+          dataflowOptions.getJobName(), "output", "results"}));
 
     return new TestDataflowPipelineRunner(dataflowOptions);
   }
 
+  public static PipelineResult getPipelineResultByJobName(String jobName) {
+    return EXECUTION_RESULTS.get(jobName);
+  }
+
   @Override
   public DataflowPipelineJob run(Pipeline pipeline) {
     return run(pipeline, runner);
@@ -146,6 +159,7 @@ public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJ
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
+    EXECUTION_RESULTS.put(options.getJobName(), job);
     return job;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
index b480c0b..185ab51 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/sdk/testing/TestDataflowPipelineRunnerTest.java
@@ -98,6 +98,7 @@ public class TestDataflowPipelineRunnerTest {
     options.setAppName("TestAppName");
     options.setProject("test-project");
     options.setTempLocation("gs://test/temp/location");
+    options.setTempRoot("gs://test");
     options.setGcpCredential(new TestCredential());
     options.setDataflowClient(service);
     options.setRunner(TestDataflowPipelineRunner.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 72c8cbc..cdd3b84 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -37,7 +37,9 @@ import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Map;
 
 import javax.annotation.Nullable;
 
@@ -136,7 +138,7 @@ public class TestPipeline extends Pipeline {
               : PipelineOptionsFactory.fromArgs(
                       MAPPER.readValue(
                           System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS), String[].class))
-                  .as(PipelineOptions.class);
+                  .as(TestPipelineOptions.class);
 
       options.as(ApplicationNameOptions.class).setAppName(getAppName());
       // If no options were specified, use a test credential object on all pipelines.
@@ -152,6 +154,22 @@ public class TestPipeline extends Pipeline {
     }
   }
 
+
+  public static String[] convertToArgs(PipelineOptions options) {
+    try {
+      Map<String, Object> stringOpts = (Map<String, Object>) MAPPER.readValue(
+          MAPPER.writeValueAsBytes(options), Map.class).get("options");
+
+      ArrayList<String> optArrayList = new ArrayList<>();
+      for (Map.Entry<String, Object> entry : stringOpts.entrySet()) {
+        optArrayList.add("--" + entry.getKey() + "=" + entry.getValue());
+      }
+      return optArrayList.toArray(new String[optArrayList.size()]);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
   /** Returns the class + method name of the test, or a default name. */
   private static String getAppName() {
     Optional<StackTraceElement> stackTraceElement = findCallersStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
new file mode 100644
index 0000000..2599ae2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * {@link TestPipelineOptions} is a set of options for test pipelines.
+ *
+ * @see TestPipeline
+ */
+public interface TestPipelineOptions extends PipelineOptions {
+  String getTempRoot();
+  void setTempRoot(String value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0eacebed/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index fec0b48..9460e13 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.testing;
 
 import static org.hamcrest.CoreMatchers.startsWith;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -25,6 +26,7 @@ import static org.junit.Assert.assertThat;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.DirectPipelineRunner;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,6 +37,9 @@ import org.junit.rules.TestRule;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+import java.util.Arrays;
+import java.util.List;
+
 /** Tests for {@link TestPipeline}. */
 @RunWith(JUnit4.class)
 public class TestPipelineTest {
@@ -86,6 +91,17 @@ public class TestPipelineTest {
   }
 
   @Test
+  public void testConvertToArgs() {
+    String[] args = new String[]{"--tempLocation=Test_Location"};
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
+    String[] arr = TestPipeline.convertToArgs(options);
+    List<String> lst = Arrays.asList(arr);
+    assertEquals(lst.size(), 2);
+    assertThat(lst, containsInAnyOrder("--tempLocation=Test_Location",
+          "--appName=TestPipelineTest"));
+  }
+
+  @Test
   public void testToStringNestedClassMethod() {
     TestPipeline p = new NestedTester().p();
 


[2/2] incubator-beam git commit: This closes #101

Posted by da...@apache.org.
This closes #101


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/039d7132
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/039d7132
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/039d7132

Branch: refs/heads/master
Commit: 039d7132867a6d810f4025b151267f3861697aab
Parents: 983bef0 0eacebe
Author: Davor Bonaci <da...@google.com>
Authored: Fri Apr 22 17:17:59 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Apr 22 17:17:59 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           | 26 ++++++++
 .../org/apache/beam/examples/WordCountIT.java   | 64 ++++++++++++++++++++
 pom.xml                                         |  6 ++
 .../testing/TestDataflowPipelineOptions.java    |  3 +-
 .../sdk/testing/TestDataflowPipelineRunner.java | 14 +++++
 .../testing/TestDataflowPipelineRunnerTest.java |  1 +
 .../apache/beam/sdk/testing/TestPipeline.java   | 20 +++++-
 .../beam/sdk/testing/TestPipelineOptions.java   | 30 +++++++++
 .../beam/sdk/testing/TestPipelineTest.java      | 16 +++++
 9 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------