You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/08 17:28:26 UTC
[2/3] incubator-beam git commit: [BEAM-98] [flink] disable stdout
logging
[BEAM-98] [flink] disable stdout logging
In addition, enable Maven batch mode which disables output for
interactive use (e.g. download progress indicator).
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ed74f10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ed74f10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ed74f10
Branch: refs/heads/master
Commit: 9ed74f1096dc028306e8d2122585a90c01834a70
Parents: 5a817d1
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Mar 8 14:53:22 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Mar 8 17:25:53 2016 +0100
----------------------------------------------------------------------
.travis.yml | 4 ++--
runners/flink/pom.xml | 17 +++++++++--------
.../translation/FlinkBatchPipelineTranslator.java | 14 +++++++++-----
.../FlinkStreamingPipelineTranslator.java | 15 ++++++++++-----
4 files changed, 30 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 096e5ad..f1d9e3b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,8 +29,8 @@ before_install:
- if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
install:
- - travis_retry mvn install clean -U -DskipTests=true
+ - travis_retry mvn -B install clean -U -DskipTests=true
script:
- - travis_retry mvn $MAVEN_OVERRIDE install -U
+ - travis_retry mvn -B $MAVEN_OVERRIDE install -U
- travis_retry travis/test_wordcount.sh
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 2110c2c..252c5cc 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -171,7 +171,7 @@
<!-- Integration Tests -->
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
- <version>2.17</version><!--$NO-MVN-MAN-VER$-->
+ <version>2.17</version>
<executions>
<execution>
<goals>
@@ -180,19 +180,20 @@
</goals>
</execution>
</executions>
- <configuration>
- <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
- </configuration>
+ <configuration>
+ <forkCount>1</forkCount>
+ <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
</plugin>
<!-- Unit Tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.17</version><!--$NO-MVN-MAN-VER$-->
- <configuration>
- <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
- </configuration>
+ <version>2.17</version>
+ <configuration>
+ <argLine>-Dlog4j.configuration=log4j-test.properties -XX:-UseGCOverheadLimit</argLine>
+ </configuration>
</plugin>
<!-- Eclipse Integration -->
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 9b47a08..28a10b7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -24,6 +24,8 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
import com.google.cloud.dataflow.sdk.values.PValue;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
@@ -31,6 +33,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
*/
public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
+
/**
* The necessary context in the case of a batch job.
*/
@@ -53,7 +57,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
@Override
public void enterCompositeTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
PTransform<?, ?> transform = node.getTransform();
if (transform != null && currentCompositeTransform == null) {
@@ -77,7 +81,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
if (translator != null) {
- System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
applyBatchTransform(transform, node, translator);
currentCompositeTransform = null;
} else {
@@ -86,12 +90,12 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
}
}
this.depth--;
- System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
}
@Override
public void visitTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
if (currentCompositeTransform != null) {
// ignore it
return;
@@ -103,7 +107,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
PTransform<?, ?> transform = node.getTransform();
BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
if (translator == null) {
- System.out.println(node.getTransform().getClass());
+ LOG.info(node.getTransform().getClass().toString());
throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
}
applyBatchTransform(transform, node, translator);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index 60fba0f..ac96807 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -22,7 +22,10 @@ import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate the user-provided
@@ -33,6 +36,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* */
public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
+
/** The necessary context in the case of a straming job. */
private final FlinkStreamingTranslationContext streamingContext;
@@ -51,7 +56,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
@Override
public void enterCompositeTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
PTransform<?, ?> transform = node.getTransform();
if (transform != null && currentCompositeTransform == null) {
@@ -71,7 +76,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
if (translator != null) {
- System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
applyStreamingTransform(transform, node, translator);
currentCompositeTransform = null;
} else {
@@ -80,12 +85,12 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
}
}
this.depth--;
- System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
}
@Override
public void visitTransform(TransformTreeNode node) {
- System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+ LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
if (currentCompositeTransform != null) {
// ignore it
return;
@@ -97,7 +102,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
PTransform<?, ?> transform = node.getTransform();
StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
if (translator == null) {
- System.out.println(node.getTransform().getClass());
+ LOG.info(node.getTransform().getClass().toString());
throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
}
applyStreamingTransform(transform, node, translator);