You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/05/30 10:17:17 UTC

[2/4] incubator-beam git commit: [BEAM-235] use streaming mode on unbounded sources

[BEAM-235] use streaming mode on unbounded sources

This change automatically discovers the execution mode of the Pipeline
during a preliminary "optimization" translation of the pipeline. When
unbounded sources are discovered, the pipeline translation mode is
switched to streaming.

Users may still supply the streaming flag to override this
behavior. Users who forget to supply the flag, will automatically use
streaming mode whenever they use unbounded sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5632bbf8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5632bbf8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5632bbf8

Branch: refs/heads/master
Commit: 5632bbf8a85a7d53fcaa53535030c4b406d8a09a
Parents: cca2577
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon May 30 09:59:12 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon May 30 12:11:24 2016 +0200

----------------------------------------------------------------------
 runners/flink/README.md                         |   9 +-
 .../FlinkPipelineExecutionEnvironment.java      | 149 +++++++------------
 .../beam/runners/flink/FlinkPipelineRunner.java |  13 +-
 .../FlinkBatchPipelineTranslator.java           |   8 -
 .../translation/FlinkPipelineTranslator.java    |  17 +++
 .../FlinkStreamingPipelineTranslator.java       |  10 --
 .../PipelineTranslationOptimizer.java           |  73 +++++++++
 .../flink/translation/TranslationMode.java      |  31 ++++
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 9 files changed, 186 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/README.md
----------------------------------------------------------------------
diff --git a/runners/flink/README.md b/runners/flink/README.md
index 7418f16..457e2a6 100644
--- a/runners/flink/README.md
+++ b/runners/flink/README.md
@@ -27,11 +27,14 @@ and sinks or use the provided support for Apache Kafka.
 
 ### Seamless integration
 
-To execute a Beam program in streaming mode, just enable streaming in the `PipelineOptions`:
+The Flink Runner decides to use batch or streaming execution mode based on whether programs use
+unbounded sources. When unbounded sources are used, it executes in streaming mode, otherwise it
+uses the batch execution mode.
 
-    options.setStreaming(true);
+If you wish to explicitly enable streaming mode, please set the streaming flag in the
+`PipelineOptions`:
 
-That's it. If you prefer batched execution, simply disable streaming mode.
+    options.setStreaming(true);
 
 ## Batch
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index d31d790..4cd8fb3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.flink;
 import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator;
 import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator;
 import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator;
+import org.apache.beam.runners.flink.translation.PipelineTranslationOptimizer;
+import org.apache.beam.runners.flink.translation.TranslationMode;
 import org.apache.beam.sdk.Pipeline;
 
 import com.google.common.base.Preconditions;
@@ -39,7 +41,7 @@ import java.util.List;
  * Depending on if the job is a Streaming or Batch processing one, it creates
  * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}),
  * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or
- * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and
+ * {@link FlinkStreamingPipelineTranslator}) to transform the Beam job into a Flink one, and
  * executes the (translated) job.
  */
 public class FlinkPipelineExecutionEnvironment {
@@ -57,7 +59,6 @@ public class FlinkPipelineExecutionEnvironment {
    */
   private ExecutionEnvironment flinkBatchEnv;
 
-
   /**
    * The Flink Streaming execution environment. This is instantiated to either a
    * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or
@@ -67,51 +68,13 @@ public class FlinkPipelineExecutionEnvironment {
   private StreamExecutionEnvironment flinkStreamEnv;
 
   /**
-   * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to
-   * their Flink counterparts. Based on the options provided by the user, if we have a streaming job,
-   * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job,
-   * a {@link FlinkBatchPipelineTranslator} is created.
-   */
-  private FlinkPipelineTranslator flinkPipelineTranslator;
-
-  /**
    * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the
    * provided {@link FlinkPipelineOptions}.
    *
    * @param options the user-defined pipeline options.
    * */
-  public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+  FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
     this.options = Preconditions.checkNotNull(options);
-    this.createPipelineExecutionEnvironment();
-    this.createPipelineTranslator();
-  }
-
-  /**
-   * Depending on the type of job (Streaming or Batch) and the user-specified options,
-   * this method creates the adequate ExecutionEnvironment.
-   */
-  private void createPipelineExecutionEnvironment() {
-    if (options.isStreaming()) {
-      createStreamExecutionEnvironment();
-    } else {
-      createBatchExecutionEnvironment();
-    }
-  }
-
-  /**
-   * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph
-   * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet},
-   * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}.
-   */
-  private void createPipelineTranslator() {
-    checkInitializationState();
-    if (this.flinkPipelineTranslator != null) {
-      throw new IllegalStateException("FlinkPipelineTranslator already initialized.");
-    }
-
-    this.flinkPipelineTranslator = options.isStreaming() ?
-        new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
-        new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
   }
 
   /**
@@ -122,36 +85,39 @@ public class FlinkPipelineExecutionEnvironment {
    * one.
    * */
   public void translate(Pipeline pipeline) {
-    checkInitializationState();
-    if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
-      createPipelineExecutionEnvironment();
-    }
-    if (this.flinkPipelineTranslator == null) {
-      createPipelineTranslator();
+    this.flinkBatchEnv = null;
+    this.flinkStreamEnv = null;
+
+    PipelineTranslationOptimizer optimizer =
+        new PipelineTranslationOptimizer(TranslationMode.BATCH, options);
+
+    optimizer.translate(pipeline);
+    TranslationMode translationMode = optimizer.getTranslationMode();
+
+    FlinkPipelineTranslator translator;
+    if (translationMode == TranslationMode.STREAMING) {
+      this.flinkStreamEnv = createStreamExecutionEnvironment();
+      translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options);
+    } else {
+      this.flinkBatchEnv = createBatchExecutionEnvironment();
+      translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
     }
-    this.flinkPipelineTranslator.translate(pipeline);
+
+    translator.translate(pipeline);
   }
 
   /**
    * Launches the program execution.
    * */
   public JobExecutionResult executePipeline() throws Exception {
-    if (options.isStreaming()) {
-      if (this.flinkStreamEnv == null) {
-        throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
-      }
-      if (this.flinkPipelineTranslator == null) {
-        throw new RuntimeException("FlinkPipelineTranslator not initialized.");
-      }
-      return this.flinkStreamEnv.execute(options.getJobName());
+    final String jobName = options.getJobName();
+
+    if (flinkBatchEnv != null) {
+      return flinkBatchEnv.execute(jobName);
+    } else if (flinkStreamEnv != null) {
+      return flinkStreamEnv.execute(jobName);
     } else {
-      if (this.flinkBatchEnv == null) {
-        throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized.");
-      }
-      if (this.flinkPipelineTranslator == null) {
-        throw new RuntimeException("FlinkPipelineTranslator not initialized.");
-      }
-      return this.flinkBatchEnv.execute(options.getJobName());
+      throw new IllegalStateException("The Pipeline has not yet been translated.");
     }
   }
 
@@ -160,41 +126,40 @@ public class FlinkPipelineExecutionEnvironment {
    * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
    * on the user-specified options.
    */
-  private void createBatchExecutionEnvironment() {
-    if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
-      throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
-    }
+  private ExecutionEnvironment createBatchExecutionEnvironment() {
 
     LOG.info("Creating the required Batch Execution Environment.");
 
     String masterUrl = options.getFlinkMaster();
-    this.flinkStreamEnv = null;
+    ExecutionEnvironment flinkBatchEnv;
 
     // depending on the master, create the right environment.
     if (masterUrl.equals("[local]")) {
-      this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+      flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
     } else if (masterUrl.equals("[collection]")) {
-      this.flinkBatchEnv = new CollectionEnvironment();
+      flinkBatchEnv = new CollectionEnvironment();
     } else if (masterUrl.equals("[auto]")) {
-      this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
     } else if (masterUrl.matches(".*:\\d*")) {
       String[] parts = masterUrl.split(":");
       List<String> stagingFiles = options.getFilesToStage();
-      this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
+      flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0],
           Integer.parseInt(parts[1]),
           stagingFiles.toArray(new String[stagingFiles.size()]));
     } else {
       LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
-      this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+      flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
     }
 
     // set the correct parallelism.
-    if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) {
-      this.flinkBatchEnv.setParallelism(options.getParallelism());
+    if (options.getParallelism() != -1 && !(flinkBatchEnv instanceof CollectionEnvironment)) {
+      flinkBatchEnv.setParallelism(options.getParallelism());
     }
 
     // set parallelism in the options (required by some execution code)
     options.setParallelism(flinkBatchEnv.getParallelism());
+
+    return flinkBatchEnv;
   }
 
   /**
@@ -202,51 +167,48 @@ public class FlinkPipelineExecutionEnvironment {
    * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
    * on the user-specified options.
    */
-  private void createStreamExecutionEnvironment() {
-    if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
-      throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized.");
-    }
+  private StreamExecutionEnvironment createStreamExecutionEnvironment() {
 
     LOG.info("Creating the required Streaming Environment.");
 
     String masterUrl = options.getFlinkMaster();
-    this.flinkBatchEnv = null;
+    StreamExecutionEnvironment flinkStreamEnv = null;
 
     // depending on the master, create the right environment.
     if (masterUrl.equals("[local]")) {
-      this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
+      flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment();
     } else if (masterUrl.equals("[auto]")) {
-      this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     } else if (masterUrl.matches(".*:\\d*")) {
       String[] parts = masterUrl.split(":");
       List<String> stagingFiles = options.getFilesToStage();
-      this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+      flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
           Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()]));
     } else {
       LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl);
-      this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+      flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
     }
 
     // set the correct parallelism.
     if (options.getParallelism() != -1) {
-      this.flinkStreamEnv.setParallelism(options.getParallelism());
+      flinkStreamEnv.setParallelism(options.getParallelism());
     }
 
     // set parallelism in the options (required by some execution code)
     options.setParallelism(flinkStreamEnv.getParallelism());
 
     // default to event time
-    this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+    flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
     // for the following 2 parameters, a value of -1 means that Flink will use
     // the default values as specified in the configuration.
     int numRetries = options.getNumberOfExecutionRetries();
     if (numRetries != -1) {
-      this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+      flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
     }
     long retryDelay = options.getExecutionRetryDelay();
     if (retryDelay != -1) {
-      this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+      flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
     }
 
     // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink).
@@ -257,15 +219,10 @@ public class FlinkPipelineExecutionEnvironment {
       if (checkpointInterval < 1) {
         throw new IllegalArgumentException("The checkpoint interval must be positive");
       }
-      this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
+      flinkStreamEnv.enableCheckpointing(checkpointInterval);
     }
-  }
 
-  private void checkInitializationState() {
-    if (options.isStreaming() && this.flinkBatchEnv != null) {
-      throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment.");
-    } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
-      throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment.");
-    }
+    return flinkStreamEnv;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index b5ffced..220e8e8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -55,8 +55,6 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
    */
   private final FlinkPipelineOptions options;
 
-  private final FlinkPipelineExecutionEnvironment flinkJobEnv;
-
   /**
    * Construct a runner from the provided options.
    *
@@ -96,22 +94,21 @@ public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
 
   private FlinkPipelineRunner(FlinkPipelineOptions options) {
     this.options = options;
-    this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options);
   }
 
   @Override
   public FlinkRunnerResult run(Pipeline pipeline) {
     LOG.info("Executing pipeline using FlinkPipelineRunner.");
 
-    LOG.info("Translating pipeline to Flink program.");
-
-    this.flinkJobEnv.translate(pipeline);
+    FlinkPipelineExecutionEnvironment env = new FlinkPipelineExecutionEnvironment(options);
 
-    LOG.info("Starting execution of Flink program.");
+    LOG.info("Translating pipeline to Flink program.");
+    env.translate(pipeline);
 
     JobExecutionResult result;
     try {
-      result = this.flinkJobEnv.executePipeline();
+      LOG.info("Starting execution of Flink program.");
+      result = env.executePipeline();
     } catch (Exception e) {
       LOG.error("Pipeline execution failed", e);
       throw new RuntimeException("Pipeline execution failed", e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index 69c02a2..a19f29d 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -149,14 +149,6 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator {
     return translator;
   }
 
-  private static String genSpaces(int n) {
-    String s = "";
-    for (int i = 0; i < n; i++) {
-      s += "|   ";
-    }
-    return s;
-  }
-
   private static String formatNodeName(TransformTreeNode node) {
     return node.toString().split("@")[1] + node.getTransform();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
index 46e5712..4db929b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java
@@ -30,7 +30,24 @@ import org.apache.beam.sdk.Pipeline;
  */
 public abstract class FlinkPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
 
+  /**
+   * Translates the pipeline by passing this class as a visitor.
+   * @param pipeline The pipeline to be translated
+   */
   public void translate(Pipeline pipeline) {
     pipeline.traverseTopologically(this);
   }
+
+  /**
+   * Utility formatting method
+   * @param n number of spaces to generate
+   * @return String with "|" followed by n spaces
+   */
+  protected static String genSpaces(int n) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < n; i++) {
+      builder.append("|   ");
+    }
+    return builder.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index 31b2bee..2e655a3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
  * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate the user-provided
  * {@link org.apache.beam.sdk.values.PCollection}-based job into a
  * {@link org.apache.flink.streaming.api.datastream.DataStream} one.
- *
- * This is based on {@link org.apache.beam.runners.dataflow.DataflowPipelineTranslator}
  * */
 public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
@@ -117,14 +115,6 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
     void translateNode(Type transform, FlinkStreamingTranslationContext context);
   }
 
-  private static String genSpaces(int n) {
-    String s = "";
-    for (int i = 0; i < n; i++) {
-      s += "|   ";
-    }
-    return s;
-  }
-
   private static String formatNodeName(TransformTreeNode node) {
     return node.toString().split("@")[1] + node.getTransform();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java
new file mode 100644
index 0000000..97d123c
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation;
+
+import org.apache.beam.runners.flink.FlinkPipelineOptions;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline.
+ */
+public class PipelineTranslationOptimizer extends FlinkPipelineTranslator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationOptimizer.class);
+
+  private TranslationMode translationMode;
+
+  private final FlinkPipelineOptions options;
+
+  public PipelineTranslationOptimizer(TranslationMode defaultMode, FlinkPipelineOptions options) {
+    this.translationMode = defaultMode;
+    this.options = options;
+  }
+
+  public TranslationMode getTranslationMode() {
+
+    // override user-specified translation mode
+    if (options.isStreaming()) {
+      return TranslationMode.STREAMING;
+    }
+
+    return translationMode;
+  }
+
+  @Override
+  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+    return CompositeBehavior.ENTER_TRANSFORM;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformTreeNode node) {}
+
+  @Override
+  public void visitPrimitiveTransform(TransformTreeNode node) {
+    Class<? extends PTransform> transformClass = node.getTransform().getClass();
+    if (transformClass == Read.Unbounded.class) {
+      LOG.info("Found {}. Switching to streaming execution.", transformClass);
+      translationMode = TranslationMode.STREAMING;
+    }
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformTreeNode producer) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
new file mode 100644
index 0000000..71eb655
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation;
+
+/**
+ * The translation mode of the Beam Pipeline
+ */
+public enum TranslationMode {
+
+  /** Uses the batch mode of Flink */
+  BATCH,
+
+  /** Uses the streaming mode of Flink */
+  STREAMING
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5632bbf8/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 4e7e63f..e264bc6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -275,7 +275,7 @@ public class Pipeline {
 
   /**
    * Invokes the {@link PipelineVisitor PipelineVisitor's}
-   * {@link PipelineVisitor#visitTransform} and
+   * {@link PipelineVisitor#visitPrimitiveTransform} and
    * {@link PipelineVisitor#visitValue} operations on each of this
    * {@link Pipeline Pipeline's} transform and value nodes, in forward
    * topological order.