You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/08 17:28:25 UTC
[1/3] incubator-beam git commit: [BEAM-100] [flink] update to Flink
1.0.0
Repository: incubator-beam
Updated Branches:
refs/heads/master b864ce82a -> 1f9d02c77
[BEAM-100] [flink] update to Flink 1.0.0
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1f9d02c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1f9d02c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1f9d02c7
Branch: refs/heads/master
Commit: 1f9d02c775825f8f780c214f27baa19a9a21aa4b
Parents: 9ed74f1
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Mar 8 15:08:28 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Mar 8 17:25:53 2016 +0100
----------------------------------------------------------------------
runners/flink/pom.xml | 9 ++-----
.../apache/beam/runners/flink/AvroITCase.java | 28 +++++++++++++++++++-
2 files changed, 29 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f9d02c7/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 252c5cc..9bd705c 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -47,7 +47,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <flink.version>1.0-SNAPSHOT</flink.version>
+ <flink.version>1.0.0</flink.version>
<beam.version>1.5.0-SNAPSHOT</beam.version>
<!-- Default parameters for mvn exec:exec -->
<clazz>org.apache.beam.runners.flink.examples.WordCount</clazz>
@@ -95,11 +95,6 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-avro_2.10</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
@@ -116,7 +111,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
+ <artifactId>flink-avro_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1f9d02c7/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
index 5b32d54..3536f87 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java
@@ -26,7 +26,6 @@ import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.common.base.Joiner;
-import org.apache.flink.api.io.avro.example.User;
import org.apache.flink.test.util.JavaProgramTestBase;
@@ -97,5 +96,32 @@ public class AvroITCase extends JavaProgramTestBase {
p.run();
}
+ private static class User {
+
+ private String name;
+ private int favoriteNumber;
+ private String favoriteColor;
+
+ public User() {}
+
+ public User(String name, int favoriteNumber, String favoriteColor) {
+ this.name = name;
+ this.favoriteNumber = favoriteNumber;
+ this.favoriteColor = favoriteColor;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getFavoriteColor() {
+ return favoriteColor;
+ }
+
+ public int getFavoriteNumber() {
+ return favoriteNumber;
+ }
+ }
+
}
[2/3] incubator-beam git commit: [BEAM-98] [flink] disable stdout
logging
Posted by mx...@apache.org.
[BEAM-98] [flink] disable stdout logging
In addition, enable Maven batch mode which disables output for
interactive use (e.g. download progress indicator).
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ed74f10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ed74f10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ed74f10
Branch: refs/heads/master
Commit: 9ed74f1096dc028306e8d2122585a90c01834a70
Parents: 5a817d1
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Mar 8 14:53:22 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Mar 8 17:25:53 2016 +0100
----------------------------------------------------------------------
.travis.yml | 4 ++--
runners/flink/pom.xml | 17 +++++++++--------
.../translation/FlinkBatchPipelineTranslator.java | 14 +++++++++-----
.../FlinkStreamingPipelineTranslator.java | 15 ++++++++++-----
4 files changed, 30 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 096e5ad..f1d9e3b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,8 +29,8 @@ before_install:
- if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
install:
- - travis_retry mvn install clean -U -DskipTests=true
+ - travis_retry mvn -B install clean -U -DskipTests=true
script:
- - travis_retry mvn $MAVEN_OVERRIDE install -U
+ - travis_retry mvn -B $MAVEN_OVERRIDE install -U
- travis_retry travis/test_wordcount.sh
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 2110c2c..252c5cc 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -171,7 +171,7 @@
<!-- Integration Tests -->
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
- <version>2.17</version><!--$NO-MVN-MAN-VER$-->
+ <version>2.17</version>
<executions>
<execution>
<goals>
@@ -180,19 +180,20 @@
</goals>
</execution>
</executions>
- <configuration>
- <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
- </configuration>
+ <configuration>
+ <forkCount>1</forkCount>
+ <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
</plugin>
<!-- Unit Tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.17</version><!--$NO-MVN-MAN-VER$-->
- <configuration>
- <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
- </configuration>
+ <version>2.17</version>
+ <configuration>
+ <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
</plugin>
<!-- Eclipse Integration -->
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 9b47a08..28a10b7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -24,6 +24,8 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
import com.google.cloud.dataflow.sdk.values.PValue;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
@@ -31,6 +33,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
*/
public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
+
/**
* The necessary context in the case of a batch job.
*/
@@ -53,7 +57,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
@Override
public void enterCompositeTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
PTransform<?, ?> transform = node.getTransform();
if (transform != null && currentCompositeTransform == null) {
@@ -77,7 +81,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
if (translator != null) {
- System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
applyBatchTransform(transform, node, translator);
currentCompositeTransform = null;
} else {
@@ -86,12 +90,12 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
}
}
this.depth--;
- System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
}
@Override
public void visitTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
if (currentCompositeTransform != null) {
// ignore it
return;
@@ -103,7 +107,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
PTransform<?, ?> transform = node.getTransform();
BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
if (translator == null) {
- System.out.println(node.getTransform().getClass());
+ LOG.info(node.getTransform().getClass().toString());
throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
}
applyBatchTransform(transform, node, translator);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index 60fba0f..ac96807 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -22,7 +22,10 @@ import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate the user-provided
@@ -33,6 +36,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* */
public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
+
/** The necessary context in the case of a straming job. */
private final FlinkStreamingTranslationContext streamingContext;
@@ -51,7 +56,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
@Override
public void enterCompositeTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
PTransform<?, ?> transform = node.getTransform();
if (transform != null && currentCompositeTransform == null) {
@@ -71,7 +76,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
if (translator != null) {
- System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
applyStreamingTransform(transform, node, translator);
currentCompositeTransform = null;
} else {
@@ -80,12 +85,12 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
}
}
this.depth--;
- System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
}
@Override
public void visitTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
if (currentCompositeTransform != null) {
// ignore it
return;
@@ -97,7 +102,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
PTransform<?, ?> transform = node.getTransform();
StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
if (translator == null) {
- System.out.println(node.getTransform().getClass());
+ LOG.info(node.getTransform().getClass().toString());
throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
}
applyStreamingTransform(transform, node, translator);
[3/3] incubator-beam git commit: [docs] update README Runner section
Posted by mx...@apache.org.
[docs] update README Runner section
This closes #29.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a817d11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a817d11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a817d11
Branch: refs/heads/master
Commit: 5a817d1123d14d8756e552027347082de015b713
Parents: b864ce8
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Mar 7 11:54:28 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Mar 8 17:25:53 2016 +0100
----------------------------------------------------------------------
README.md | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a817d11/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index db4a13f..6dd4bcc 100644
--- a/README.md
+++ b/README.md
@@ -44,11 +44,14 @@ Have ideas for new SDKs or DSLs? See the [Jira](https://issues.apache.org/jira/b
### Runners
-Beam supports executing programs on multiple distributed processing backends. After the Beam project's initial bootstrapping completes, it will include:
- 1. The `DirectPipelineRunner` runs the pipeline on your local machine.
- 2. The `DataflowPipelineRunner` submits the pipeline to the [Google Cloud Dataflow](http://cloud.google.com/dataflow/).
- 3. The `SparkPipelineRunner` runs the pipeline on an Apache Spark cluster. See the code that will be donated at [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow).
- 4. The `FlinkPipelineRunner` runs the pipeline on an Apache Flink cluster. See the code that will be donated at [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow).
+Beam supports executing programs on multiple distributed processing backends through PipelineRunners. Currently, the following PipelineRunners are available:
+
+- The `DirectPipelineRunner` runs the pipeline on your local machine.
+- The `DataflowPipelineRunner` submits the pipeline to the [Google Cloud Dataflow](http://cloud.google.com/dataflow/).
+- The `FlinkPipelineRunner` runs the pipeline on an Apache Flink cluster. The code has been donated from [dataArtisans/flink-dataflow](https://github.com/dataArtisans/flink-dataflow) and is now part of Beam.
+
+After the Beam project's initial bootstrapping completes, it will include:
+- The `SparkPipelineRunner` runs the pipeline on an Apache Spark cluster. See the code that will be donated at [cloudera/spark-dataflow](https://github.com/cloudera/spark-dataflow).
Have ideas for new Runners? See the [Jira](https://issues.apache.org/jira/browse/BEAM/component/12328916/).
@@ -57,6 +60,10 @@ Have ideas for new Runners? See the [Jira](https://issues.apache.org/jira/browse
_Coming soon!_
+### Flink Runner
+
+See the Flink Runner [README](https://github.com/apache/incubator-beam/runners/flink).
+
## Contact Us