You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/08 17:28:26 UTC

[2/3] incubator-beam git commit: [BEAM-98] [flink] disable stdout logging

[BEAM-98] [flink] disable stdout logging

In addition, enable Maven batch mode which disables output for
interactive use (e.g. download progress indicator).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ed74f10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ed74f10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ed74f10

Branch: refs/heads/master
Commit: 9ed74f1096dc028306e8d2122585a90c01834a70
Parents: 5a817d1
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Mar 8 14:53:22 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Mar 8 17:25:53 2016 +0100

----------------------------------------------------------------------
 .travis.yml                                        |  4 ++--
 runners/flink/pom.xml                              | 17 +++++++++--------
 .../translation/FlinkBatchPipelineTranslator.java  | 14 +++++++++-----
 .../FlinkStreamingPipelineTranslator.java          | 15 ++++++++++-----
 4 files changed, 30 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 096e5ad..f1d9e3b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,8 +29,8 @@ before_install:
   - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
 
 install:
-  - travis_retry mvn install clean -U -DskipTests=true
+  - travis_retry mvn -B install clean -U -DskipTests=true
 
 script:
-  - travis_retry mvn $MAVEN_OVERRIDE install -U
+  - travis_retry mvn -B $MAVEN_OVERRIDE install -U
   - travis_retry travis/test_wordcount.sh

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 2110c2c..252c5cc 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -171,7 +171,7 @@
             <!-- Integration Tests -->
             <plugin>
                 <artifactId>maven-failsafe-plugin</artifactId>
-                <version>2.17</version><!--$NO-MVN-MAN-VER$-->
+                <version>2.17</version>
                 <executions>
                     <execution>
                         <goals>
@@ -180,19 +180,20 @@
                         </goals>
                     </execution>
                 </executions>
-				<configuration>
-					<argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
-				</configuration>
+                <configuration>
+                    <forkCount>1</forkCount>
+                    <argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
+                </configuration>
             </plugin>
 
             <!-- Unit Tests -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.17</version><!--$NO-MVN-MAN-VER$-->
-				<configuration>
-					<argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
-				</configuration>
+                <version>2.17</version>
+                <configuration>
+                    <argLine>-Dlog4j.configuration=log4j-test.properties  -XX:-UseGCOverheadLimit</argLine>
+                </configuration>
             </plugin>
 
             <!-- Eclipse Integration -->

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 9b47a08..28a10b7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -24,6 +24,8 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * FlinkBatchPipelineTranslator knows how to translate Pipeline objects into Flink Jobs.
@@ -31,6 +33,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  */
 public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkBatchPipelineTranslator.class);
+
   /**
    * The necessary context in the case of a batch job.
    */
@@ -53,7 +57,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 
   @Override
   public void enterCompositeTransform(TransformTreeNode node) {
-    System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
 
     PTransform<?, ?> transform = node.getTransform();
     if (transform != null && currentCompositeTransform == null) {
@@ -77,7 +81,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
 
       BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
       if (translator != null) {
-        System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+        LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
         applyBatchTransform(transform, node, translator);
         currentCompositeTransform = null;
       } else {
@@ -86,12 +90,12 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
       }
     }
     this.depth--;
-    System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
   }
 
   @Override
   public void visitTransform(TransformTreeNode node) {
-    System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
     if (currentCompositeTransform != null) {
       // ignore it
       return;
@@ -103,7 +107,7 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
     PTransform<?, ?> transform = node.getTransform();
     BatchTransformTranslator<?> translator = FlinkBatchTransformTranslators.getTranslator(transform);
     if (translator == null) {
-      System.out.println(node.getTransform().getClass());
+      LOG.info(node.getTransform().getClass().toString());
       throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
     }
     applyBatchTransform(transform, node, translator);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9ed74f10/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index 60fba0f..ac96807 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -22,7 +22,10 @@ import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.values.PValue;
+import org.apache.beam.runners.flink.FlinkPipelineRunner;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate the user-provided
@@ -33,6 +36,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  * */
 public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
+
   /** The necessary context in the case of a straming job. */
   private final FlinkStreamingTranslationContext streamingContext;
 
@@ -51,7 +56,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
   @Override
   public void enterCompositeTransform(TransformTreeNode node) {
-    System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node));
 
     PTransform<?, ?> transform = node.getTransform();
     if (transform != null && currentCompositeTransform == null) {
@@ -71,7 +76,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
       StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
       if (translator != null) {
-        System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
+        LOG.info(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node));
         applyStreamingTransform(transform, node, translator);
         currentCompositeTransform = null;
       } else {
@@ -80,12 +85,12 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
       }
     }
     this.depth--;
-    System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node));
   }
 
   @Override
   public void visitTransform(TransformTreeNode node) {
-    System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
+    LOG.info(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node));
     if (currentCompositeTransform != null) {
       // ignore it
       return;
@@ -97,7 +102,7 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     PTransform<?, ?> transform = node.getTransform();
     StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
     if (translator == null) {
-      System.out.println(node.getTransform().getClass());
+      LOG.info(node.getTransform().getClass().toString());
       throw new UnsupportedOperationException("The transform " + transform + " is currently not supported.");
     }
     applyStreamingTransform(transform, node, translator);