You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/08 17:28:25 UTC

[1/3] incubator-beam git commit: [BEAM-100] [flink] update to Flink 1.0.0

Repository: incubator-beam
Updated Branches:
  refs/heads/master b864ce82a -> 1f9d02c77


[BEAM-100] [flink] update to Flink 1.0.0


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f9d02c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f9d02c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f9d02c7

Branch: refs/heads/master
Commit: 1f9d02c775825f8f780c214f27baa19a9a21aa4b
Parents: 9ed74f1
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Mar 8 15:08:28 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Mar 8 17:25:53 2016 +0100

----------------------------------------------------------------------
 runners/flink/pom.xml                           |  9 ++-----
 .../apache/beam/runners/flink/AvroITCase.java   | 28 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f9d02c7/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 252c5cc..9bd705c 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -47,7 +47,7 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <flink.version>1.0-SNAPSHOT</flink.version>
+        <flink.version>1.0.0</flink.version>
         <beam.version>1.5.0-SNAPSHOT</beam.version>
         <!-- Default parameters for mvn exec:exec -->
         <clazz>org.apache.beam.runners.flink.examples.WordCount</clazz>
@@ -95,11 +95,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro_2.10</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
             <artifactId>flink-clients_2.10</artifactId>
             <version>${flink.version}</version>
         </dependency>
@@ -116,7 +111,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro</artifactId>
+            <artifactId>flink-avro_2.10</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f9d02c7/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
index 5b32d54..3536f87 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
@@ -26,7 +26,6 @@ import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.common.base.Joiner;
-import org.apache.flink.api.io.avro.example.User;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 
@@ -97,5 +96,32 @@ public class AvroITCase extends JavaProgramTestBase {
     p.run();
   }
 
+  private static class User {
+
+    private String name;
+    private int favoriteNumber;
+    private String favoriteColor;
+
+    public User() {}
+
+    public User(String name, int favoriteNumber, String favoriteColor) {
+      this.name = name;
+      this.favoriteNumber = favoriteNumber;
+      this.favoriteColor = favoriteColor;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public String getFavoriteColor() {
+      return favoriteColor;
+    }
+
+    public int getFavoriteNumber() {
+      return favoriteNumber;
+    }
+  }
+
 }
 


[2/3] incubator-beam git commit: [BEAM-98] [flink] disable stdout logging

Posted by mx...@apache.org.
[BEAM-98] [flink] disable stdout logging

In addition, enable Maven batch mode which disables output for
interactive use (e.g. download progress indicator).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ed74f10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ed74f10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ed74f10

Branch: refs/heads/master
Commit: 9ed74f1096dc028306e8d2122585a90c01834a70
Parents: 5a817d1
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Mar 8 14:53:22 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Mar 8 17:25:53 2016 +0100

----------------------------------------------------------------------
 .travis.yml                                        |  4 ++--
 runners/flink/pom.xml                              | 17 +++++++++--------
 .../translation/FlinkBatchPipelineTranslator.java  | 14 +++++++++-----
 .../FlinkStreamingPipelineTranslator.java          | 15 ++++++++++-----
 4 files changed, 30 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 096e5ad..f1d9e3b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,8 +29,8 @@ before_install:
   - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
 
 install:
-  - travis_retry mvn install clean -U -DskipTests=true
+  - travis_retry mvn -B install clean -U -DskipTests=true
 
 script:
-  - travis_retry mvn $MAVEN_OVERRIDE install -U
+  - travis_retry mvn -B $MAVEN_OVERRIDE install -U
   - travis_retry travis/test_wordcount.sh

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 2110c2c..252c5cc 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -171,7 +171,7 @@
             <!-- Integration Tests -->
             <plugin>
                 <artifactId>maven-failsafe-plugin</artifactId>
-                <version>2.17</version><!--$NO-MVN-MAN-VER$-->
+                <version>2.17</version>
                 <executions>
                     <execution>
                         <goals>
@@ -180,19 +180,20 @@
                         </goals>
                     </execution>
                 </executions>
-				<configuration>
-					<argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
-				</configuration>
+                <configuration>
+                    <forkCount>1</forkCount>
+                    <argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
+                </configuration>
             </plugin>
 
             <!-- Unit Tests -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.17</version><!--$NO-MVN-MAN-VER$-->
-				<configuration>
-					<argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
-				</configuration>
+                <version>2.17</version>
+                <configuration>
+                    <argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
+                </configuration>
             </plugin>
 
             <!-- Eclipse Integration -->

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 9b47a08..28a10b7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -24,6 +24,8 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
@@ -31,6 +33,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  */
 public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
+
   /**
    * The necessary context in the case of a batch job.
    */
@@ -53,7 +57,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 
   @Override
   public void enterCompositeTransform(TransformTreeNode node) {
-    System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
 
     PTransform<?, ?> transform = node.getTransform();
     if (transform != null && currentCompositeTransform == null) {
@@ -77,7 +81,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 
       BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
       if (translator != null) {
-        System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+        LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
         applyBatchTransform(transform, node, translator);
         currentCompositeTransform = null;
       } else {
@@ -86,12 +90,12 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
       }
     }
     this.depth--;
-    System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
   }
 
   @Override
   public void visitTransform(TransformTreeNode node) {
-    System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
     if (currentCompositeTransform != null) {
       // ignore it
       return;
@@ -103,7 +107,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
     PTransform<?, ?> transform = node.getTransform();
     BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
     if (translator == null) {
-      System.out.println(node.getTransform().getClass());
+      LOG.info(node.getTransform().getClass().toString());
       throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
     }
     applyBatchTransform(transform, node, translator);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index 60fba0f..ac96807 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -22,7 +22,10 @@ import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate the user-provided
@@ -33,6 +36,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  * */
 public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
+
   /** The necessary context in the case of a straming job. */
   private final FlinkStreamingTranslationContext streamingContext;
 
@@ -51,7 +56,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
   @Override
   public void enterCompositeTransform(TransformTreeNode node) {
-    System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
 
     PTransform<?, ?> transform = node.getTransform();
     if (transform != null && currentCompositeTransform == null) {
@@ -71,7 +76,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
       StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
       if (translator != null) {
-        System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+        LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
         applyStreamingTransform(transform, node, translator);
         currentCompositeTransform = null;
       } else {
@@ -80,12 +85,12 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
       }
     }
     this.depth--;
-    System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
   }
 
   @Override
   public void visitTransform(TransformTreeNode node) {
-    System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
     if (currentCompositeTransform != null) {
       // ignore it
       return;
@@ -97,7 +102,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     PTransform<?, ?> transform = node.getTransform();
     StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
     if (translator == null) {
-      System.out.println(node.getTransform().getClass());
+      LOG.info(node.getTransform().getClass().toString());
       throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
     }
     applyStreamingTransform(transform, node, translator);


[3/3] incubator-beam git commit: [docs] update README Runner section

Posted by mx...@apache.org.
[docs] update README Runner section

This closes #29.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a817d11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a817d11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a817d11

Branch: refs/heads/master
Commit: 5a817d1123d14d8756e552027347082de015b713
Parents: b864ce8
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Mar 7 11:54:28 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Mar 8 17:25:53 2016 +0100

----------------------------------------------------------------------
 README.md | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a817d11/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index db4a13f..6dd4bcc 100644
--- a/README.md
+++ b/README.md
@@ -44,11 +44,14 @@ Have ideas for new SDKs or DSLs? See the [Jira](https://issues.apache.org/jira/b
 
 ### Runners
 
-Beam supports executing programs on multiple distributed processing backends. After the Beam project's initial bootstrapping completes, it will include:
-  1. The `DirectPipelineRunner` runs the pipeline on your local machine.
-  2. The `DataflowPipelineRunner` submits the pipeline to the [Google Cloud Dataflow](http://cloud.google.com/dataflow/).
-  3. The `SparkPipelineRunner` runs the pipeline on an Apache Spark cluster. See the code that will be donated at [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow).
-  4. The `FlinkPipelineRunner` runs the pipeline on an Apache Flink cluster. See the code that will be donated at [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow).
+Beam supports executing programs on multiple distributed processing backends through PipelineRunners. Currently, the following PipelineRunners are available:
+
+- The `DirectPipelineRunner` runs the pipeline on your local machine.
+- The `DataflowPipelineRunner` submits the pipeline to the [Google Cloud Dataflow](http://cloud.google.com/dataflow/).
+- The `FlinkPipelineRunner` runs the pipeline on an Apache Flink cluster. The code has been donated from [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow) and is now part of Beam.
+
+After the Beam project's initial bootstrapping completes, it will include:
+- The `SparkPipelineRunner` runs the pipeline on an Apache Spark cluster. See the code that will be donated at [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow).
 
 Have ideas for new Runners? See the [Jira](https://issues.apache.org/jira/browse/BEAM/component/12328916/).
 
@@ -57,6 +60,10 @@ Have ideas for new Runners? See the [Jira](https://issues.apache.org/jira/browse
 
 _Coming soon!_
 
+### Flink Runner
+
+See the Flink Runner [README](https://github.com/apache/incubator-beam/runners/flink).
+
 
 ## Contact Us