You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/17 10:16:24 UTC

[4/8] flink git commit: [FLINK-1671] [core] Add execution mode to execution config

[FLINK-1671] [core] Add execution mode to execution config


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f8e1566
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f8e1566
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f8e1566

Branch: refs/heads/master
Commit: 7f8e156617ae68d8339249ee276bcc36552a49da
Parents: 4782770
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 10 20:25:01 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 17 10:10:08 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |  78 +++++-------
 .../flink/compiler/dag/PactConnection.java      |  51 ++++++--
 .../flink/api/common/ExecutionConfig.java       | 127 ++++++++++++++-----
 .../apache/flink/api/common/ExecutionMode.java  |  83 ++++++++++++
 4 files changed, 251 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f8e1566/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index a82bb74..d5cbf68 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -353,25 +353,18 @@ public class PactCompiler {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new compiler instance. The compiler has no access to statistics about the
+	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
 	 * inputs and can hence not determine any properties. It will perform all optimization with
-	 * unknown sizes and default to the most robust execution strategies. The
-	 * compiler also uses conservative default estimates for the operator costs, since
-	 * it has no access to another cost estimator.
-	 * <p>
-	 * The address of the job manager (to obtain system characteristics) is determined via the global configuration.
+	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
+	 * of the most robust execution strategies.
 	 */
 	public PactCompiler() {
 		this(null, new DefaultCostEstimator());
 	}
 
 	/**
-	 * Creates a new compiler instance that uses the statistics object to determine properties about the input.
-	 * Given those statistics, the compiler can make better choices for the execution strategies.
-	 * as if no filesystem was given. The compiler uses conservative default estimates for the operator costs, since
-	 * it has no access to another cost estimator.
-	 * <p>
-	 * The address of the job manager (to obtain system characteristics) is determined via the global configuration.
+	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
+	 * Given those statistics, the optimizer can make better choices for the execution strategies.
 	 * 
 	 * @param stats
 	 *        The statistics to be used to determine the input properties.
@@ -381,27 +374,24 @@ public class PactCompiler {
 	}
 
 	/**
-	 * Creates a new compiler instance. The compiler has no access to statistics about the
+	 * Creates a new optimizer instance. The optimizer has no access to statistics about the
 	 * inputs and can hence not determine any properties. It will perform all optimization with
-	 * unknown sizes and default to the most robust execution strategies. It uses
-	 * however the given cost estimator to compute the costs of the individual operations.
-	 * <p>
-	 * The address of the job manager (to obtain system characteristics) is determined via the global configuration.
+	 * unknown sizes and hence use only the heuristic cost functions, which result in the selection
+	 * of the most robust execution strategies.
+	 *
+	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
 	 * 
-	 * @param estimator
-	 *        The <tt>CostEstimator</tt> to use to cost the individual operations.
+	 * @param estimator The cost estimator to use to cost the individual operations.
 	 */
 	public PactCompiler(CostEstimator estimator) {
 		this(null, estimator);
 	}
 
 	/**
-	 * Creates a new compiler instance that uses the statistics object to determine properties about the input.
-	 * Given those statistics, the compiler can make better choices for the execution strategies.
-	 * as if no filesystem was given. It uses the given cost estimator to compute the costs of the individual
-	 * operations.
-	 * <p>
-	 * The address of the job manager (to obtain system characteristics) is determined via the global configuration.
+	 * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
+	 * Given those statistics, the optimizer can make better choices for the execution strategies.
+	 *
+	 * The optimizer uses the given cost estimator to compute the costs of the individual operations.
 	 * 
 	 * @param stats
 	 *        The statistics to be used to determine the input properties.
@@ -412,9 +402,10 @@ public class PactCompiler {
 		this.statistics = stats;
 		this.costEstimator = estimator;
 
-		// determine the default parallelization degree
-		this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
-			ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
+		// determine the default parallelism
+		this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(
+				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+				ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
 		
 		if (defaultDegreeOfParallelism < 1) {
 			LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
@@ -444,36 +435,35 @@ public class PactCompiler {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Translates the given plan in to an OptimizedPlan, where all nodes have their local strategy assigned
-	 * and all channels have a shipping strategy assigned. The compiler connects to the job manager to obtain information
-	 * about the available instances and their memory and then chooses an instance type to schedule the execution on.
-	 * <p>
-	 * The compilation process itself goes through several phases:
-	 * <ol>
-	 * <li>Create an optimizer data flow representation of the program, assign parallelism and compute size estimates.</li>
-	 * <li>Compute interesting properties and auxiliary structures.</li>
-	 * <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
-	 * opposed to the Database approaches), because we support plans that are not trees.</li>
-	 * </ol>
+	 * Translates the given program to an OptimizedPlan, where all nodes have their local strategy assigned
+	 * and all channels have a shipping strategy assigned.
+	 *
+	 * For more details on the optimization phase, see the comments for
+	 * {@link #compile(org.apache.flink.api.common.Plan, org.apache.flink.compiler.postpass.OptimizerPostPass)}.
 	 * 
 	 * @param program The program to be translated.
 	 * @return The optimized plan.
+	 *
 	 * @throws CompilerException
 	 *         Thrown, if the plan is invalid or the optimizer encountered an inconsistent
 	 *         situation during the compilation process.
 	 */
 	public OptimizedPlan compile(Plan program) throws CompilerException {
-		// -------------------- try to get the connection to the job manager ----------------------
-		// --------------------------to obtain instance information --------------------------------
 		final OptimizerPostPass postPasser = getPostPassFromPlan(program);
 		return compile(program, postPasser);
 	}
 
 	/**
-	 * Translates the given pact plan in to an OptimizedPlan, where all nodes have their local strategy assigned
-	 * and all channels have a shipping strategy assigned. The process goes through several phases:
+	 * Translates the given program to an OptimizedPlan. The optimized plan describes for each operator
+	 * which strategy to use (such as hash join versus sort-merge join), what data exchange method to use
+	 * (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined, batch),
+	 * where to cache intermediate results, etc,
+	 *
+	 * The optimization happens in multiple phases:
 	 * <ol>
-	 * <li>Create <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
+	 *     <li>Create optimizer dag implementation of the program.
+	 *
+	 *     <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
 	 * <li>Compute interesting properties and auxiliary structures.</li>
 	 * <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
 	 * opposed to the Database approaches), because we support plans that are not trees.</li>

http://git-wip-us.apache.org/repos/asf/flink/blob/7f8e1566/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java
index c5ba395..41369c9 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java
@@ -16,20 +16,24 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
+import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.compiler.dataproperties.InterestingProperties;
 import org.apache.flink.compiler.plandump.DumpableConnection;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 
 /**
- * A connection between to PACTs. Represents a channel together with a data shipping
- * strategy. The data shipping strategy of a channel may be fixed though compiler hints. In that
- * case, it is not modifiable.
- * <p>
- * The connections are also used by the optimization algorithm to propagate interesting properties from the sinks in the
- * direction of the sources.
+ * A connection between to operators. Represents an intermediate result
+ * and a data exchange between the two operators.
+ *
+ * The data exchange has a mode in which it performs (batch / pipelined)
+ *
+ * The data exchange strategy may be set on this connection, in which case
+ * it is fixed and will not be determined during candidate plan enumeration.
+ *
+ * During the enumeration of interesting properties, this connection also holds
+ * all interesting properties generated by the successor operator.
  */
 public class PactConnection implements EstimateProvider, DumpableConnection<OptimizerNode> {
 	
@@ -37,11 +41,13 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
 
 	private final OptimizerNode target; // The target node of the connection.
 
+	private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange
+
 	private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in
 
-	private ShipStrategyType shipStrategy; // The data distribution strategy, if preset
+	private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined.
 	
-	private TempMode materializationMode = TempMode.NONE;
+	private TempMode materializationMode = TempMode.NONE; // the materialization mode
 	
 	private int maxDepth = -1;
 
@@ -54,8 +60,8 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
 	 * @param target
 	 *        The target node.
 	 */
-	public PactConnection(OptimizerNode source, OptimizerNode target) {
-		this(source, target, null);
+	public PactConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) {
+		this(source, target, null, exchangeMode);
 	}
 
 	/**
@@ -67,18 +73,24 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
 	 *        The target node.
 	 * @param shipStrategy
 	 *        The shipping strategy.
+	 * @param exchangeMode
+	 *        The data exchange mode (pipelined / batch)
 	 */
-	public PactConnection(OptimizerNode source, OptimizerNode target, ShipStrategyType shipStrategy) {
+	public PactConnection(OptimizerNode source, OptimizerNode target,
+						  ShipStrategyType shipStrategy, ExecutionMode exchangeMode)
+	{
 		if (source == null || target == null) {
 			throw new NullPointerException("Source and target must not be null.");
 		}
 		this.source = source;
 		this.target = target;
 		this.shipStrategy = shipStrategy;
+		this.dataExchangeMode = exchangeMode;
 	}
 	
 	/**
-	 * Creates a new Connection between two nodes.
+	 * Constructor to create a result from an operator that is not
+	 * consumed by another operator.
 	 * 
 	 * @param source
 	 *        The source node.
@@ -90,6 +102,7 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
 		this.source = source;
 		this.target = null;
 		this.shipStrategy = ShipStrategyType.NONE;
+		this.dataExchangeMode = null;
 	}
 
 	/**
@@ -130,6 +143,18 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
 	}
 
 	/**
+	 * Gets the data exchange mode to use for this connection.
+	 *
+	 * @return The data exchange mode to use for this connection.
+	 */
+	public ExecutionMode getDataExchangeMode() {
+		if (dataExchangeMode == null) {
+			throw new IllegalStateException("This connection does not have a data exchange");
+		}
+		return dataExchangeMode;
+	}
+
+	/**
 	 * Gets the interesting properties object for this pact connection.
 	 * If the interesting properties for this connections have not yet been set,
 	 * this method returns null.

http://git-wip-us.apache.org/repos/asf/flink/blob/7f8e1566/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index d315440..48a9c76 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -25,8 +25,24 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * A configuration config for configuring behavior of the system, such as whether to use
- * the closure cleaner, object-reuse mode...
+ * A config to define the behavior of the program execution. It allows to define (among other
+ * options) the following settings:
+ *
+ * <ul>
+ *     <li>The default parallelism of the program, i.e., how many parallel tasks to use for
+ *         all functions that do not define a specific value directly.</li>
+ *     <li>The number of retries in the case of failed executions.</li>
+ *     <li>The {@link ExecutionMode} of the program: Batch or Pipelined.
+ *         The default execution mode is {@link ExecutionMode#PIPELINED}</li>
+ *     <li>Enabling or disabling the "closure cleaner". The closure cleaner pre-processes
+ *         the implementations of functions. In case they are (anonymous) inner classes,
+ *         it removes unused references to the enclosing class to fix certain serialization-related
+ *         problems and to reduce the size of the closure.</li>
+ *     <li>The config allows to register types and serializers to increase the efficiency of
+ *         handling <i>generic types</i> and <i>POJOs</i>. This is usually only needed
+ *         when the functions return not only the types declared in their signature, but
+ *         also subclasses of those types.</li>
+ * </ul>
  */
 public class ExecutionConfig implements Serializable {
 	
@@ -41,9 +57,17 @@ public class ExecutionConfig implements Serializable {
 	 */
 	public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
 
+	// --------------------------------------------------------------------------------------------
+
+	/** Defines how data exchange happens - batch or pipelined */
+	private ExecutionMode executionMode = ExecutionMode.PIPELINED;
+
 	private boolean useClosureCleaner = true;
-	private int degreeOfParallelism = -1;
+
+	private int parallelism = -1;
+
 	private int numberOfExecutionRetries = -1;
+
 	private boolean forceKryo = false;
 
 	private boolean objectReuse = false;
@@ -52,17 +76,27 @@ public class ExecutionConfig implements Serializable {
 
 	private boolean serializeGenericTypesWithAvro = false;
 
-
-
 	// Serializers and types registered with Kryo and the PojoSerializer
 	// we store them in lists to ensure they are registered in order in all kryo instances.
-	private final List<Entry<Class<?>, Serializer<?>>> registeredTypesWithKryoSerializers = new ArrayList<Entry<Class<?>, Serializer<?>>>();
-	private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses = new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
-	private final List<Entry<Class<?>, Serializer<?>>> defaultKryoSerializers = new ArrayList<Entry<Class<?>, Serializer<?>>>();
-	private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses = new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+
+	private final List<Entry<Class<?>, Serializer<?>>> registeredTypesWithKryoSerializers =
+			new ArrayList<Entry<Class<?>, Serializer<?>>>();
+
+	private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses =
+			new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+
+	private final List<Entry<Class<?>, Serializer<?>>> defaultKryoSerializers =
+			new ArrayList<Entry<Class<?>, Serializer<?>>>();
+
+	private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses =
+			new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+
 	private final List<Class<?>> registeredKryoTypes = new ArrayList<Class<?>>();
+
 	private final List<Class<?>> registeredPojoTypes = new ArrayList<Class<?>>();
 
+	// --------------------------------------------------------------------------------------------
+
 	/**
 	 * Enables the ClosureCleaner. This analyzes user code functions and sets fields to null
 	 * that are not used. This will in most cases make closures or anonymous inner classes
@@ -75,7 +109,9 @@ public class ExecutionConfig implements Serializable {
 	}
 
 	/**
-	 * Disables the ClosureCleaner. @see #enableClosureCleaner()
+	 * Disables the ClosureCleaner.
+	 *
+	 * @see #enableClosureCleaner()
 	 */
 	public ExecutionConfig disableClosureCleaner() {
 		useClosureCleaner = false;
@@ -83,31 +119,32 @@ public class ExecutionConfig implements Serializable {
 	}
 
 	/**
-	 * Returns whether the ClosureCleaner is enabled. @see #enableClosureCleaner()
+	 * Returns whether the ClosureCleaner is enabled.
+	 *
+	 * @see #enableClosureCleaner()
 	 */
 	public boolean isClosureCleanerEnabled() {
 		return useClosureCleaner;
 	}
 
 	/**
-	 * Gets the degree of parallelism with which operation are executed by default. Operations can
-	 * individually override this value to use a specific degree of parallelism.
-	 * Other operations may need to run with a different
-	 * degree of parallelism - for example calling
-	 * a reduce operation over the entire
-	 * set will insert eventually an operation that runs non-parallel (degree of parallelism of one).
+	 * Gets the parallelism with which operation are executed by default. Operations can
+	 * individually override this value to use a specific parallelism.
 	 *
-	 * @return The degree of parallelism used by operations, unless they override that value. This method
-	 *         returns {@code -1}, if the environments default parallelism should be used.
+	 * Other operations may need to run with a different parallelism - for example calling
+	 * a reduce operation over the entire data set will involve an operation that runs
+	 * with a parallelism of one (the final reduce to the single result value).
+	 *
+	 * @return The parallelism used by operations, unless they override that value. This method
+	 *         returns {@code -1}, if the environment's default parallelism should be used.
 	 */
-
 	public int getDegreeOfParallelism() {
-		return degreeOfParallelism;
+		return parallelism;
 	}
 
 	/**
-	 * Sets the degree of parallelism (DOP) for operations executed through this environment.
-	 * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
+	 * Sets the parallelism for operations executed through this environment.
+	 * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
 	 * x parallel instances.
 	 * <p>
 	 * This method overrides the default parallelism for this environment.
@@ -115,14 +152,14 @@ public class ExecutionConfig implements Serializable {
 	 * contexts (CPU cores / threads). When executing the program via the command line client
 	 * from a JAR file, the default degree of parallelism is the one configured for that setup.
 	 *
-	 * @param degreeOfParallelism The degree of parallelism
+	 * @param parallelism The parallelism to use
 	 */
-
-	public ExecutionConfig setDegreeOfParallelism(int degreeOfParallelism) {
-		if (degreeOfParallelism < 1) {
-			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
+	public ExecutionConfig setDegreeOfParallelism(int parallelism) {
+		if (parallelism < 1 && parallelism != -1) {
+			throw new IllegalArgumentException(
+					"Degree of parallelism must be at least one, or -1 (use system default).");
 		}
-		this.degreeOfParallelism = degreeOfParallelism;
+		this.parallelism = parallelism;
 		return this;
 	}
 
@@ -146,13 +183,38 @@ public class ExecutionConfig implements Serializable {
 	 */
 	public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) {
 		if (numberOfExecutionRetries < -1) {
-			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+			throw new IllegalArgumentException(
+					"The number of execution retries must be non-negative, or -1 (use system default)");
 		}
 		this.numberOfExecutionRetries = numberOfExecutionRetries;
 		return this;
 	}
 
 	/**
+	 * Sets the execution mode to execute the program. The execution mode defines whether
+	 * data exchanges are performed in a batch or on a pipelined manner.
+	 *
+	 * The default execution mode is {@link ExecutionMode#PIPELINED}.
+	 *
+	 * @param executionMode The execution mode to use.
+	 */
+	public void setExecutionMode(ExecutionMode executionMode) {
+		this.executionMode = executionMode;
+	}
+
+	/**
+	 * Gets the execution mode used to execute the program. The execution mode defines whether
+	 * data exchanges are performed in a batch or on a pipelined manner.
+	 *
+	 * The default execution mode is {@link ExecutionMode#PIPELINED}.
+	 *
+	 * @return The execution mode for the program.
+	 */
+	public ExecutionMode getExecutionMode() {
+		return executionMode;
+	}
+
+	/**
 	 * Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
 	 * In some cases this might be preferable. For example, when using interfaces
 	 * with subclasses that cannot be analyzed as POJO.
@@ -161,7 +223,6 @@ public class ExecutionConfig implements Serializable {
 		forceKryo = true;
 	}
 
-
 	/**
 	 * Disable use of Kryo serializer for all POJOs.
 	 */
@@ -403,8 +464,12 @@ public class ExecutionConfig implements Serializable {
 
 
 	public static class Entry<K, V> implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
 		private final K k;
 		private final V v;
+
 		public Entry(K k, V v) {
 			this.k = k;
 			this.v = v;

http://git-wip-us.apache.org/repos/asf/flink/blob/7f8e1566/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
new file mode 100644
index 0000000..f3e958e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * The execution mode specifies how a batch program is executed in terms
+ * of data exchange: pipelining or batched.
+ */
+public enum ExecutionMode {
+
+	/**
+	 * Executes the program in a pipelined fashion (including shuffles and broadcasts),
+	 * except for data exchanges that are susceptible to deadlocks when pipelining.
+	 * These data exchanges are performed in a batch manner.
+	 *
+	 * An example of situations that are susceptible to deadlocks (when executed in a
+	 * pipelined manner) are data flows that branch (one data set consumed by multiple
+	 * operations) and re-join later:
+	 * <pre>{@code
+	 *    DataSet data = ...;
+	 *    DataSet mapped1 = data.map(new MyMapper());
+	 *    DataSet mapped2 = data.map(new AnotherMapper());
+	 *    mapped1.join(mapped2).where(...).equalTo(...);
+	 * }</pre>
+	 */
+	PIPELINED,
+
+	/**
+	 * Executes the program in a pipelined fashion (including shuffles and broadcasts),
+	 * <strong>including</strong> data exchanges that are susceptible to deadlocks when
+	 * executed via pipelining.
+	 *
+	 * Usually, {@link #PIPELINED} is the preferable option, which pipelines most
+	 * data exchanges and only uses batch data exchanges in situations that are
+	 * susceptible to deadlocks.
+	 *
+	 * This option should only be used with care and only in situations where the
+	 * programmer is sure that the program is safe for full pipelining and that
+	 * Flink was too conservative when choosing the batch exchange at a certain
+	 * point.
+	 */
+	PIPELINED_FORCED,
+
+//	This is for later, we are missing a bit of infrastructure for this.
+//	/**
+//	 * The execution mode starts executing the program in a pipelined fashion
+//	 * (except for deadlock prone situations), similar to the {@link #PIPELINED}
+//	 * option. In the case of a task failure, re-execution happens in a batched
+//	 * mode, as defined for the {@link #BATCH} option.
+//	 */
+//	PIPELINED_WITH_BATCH_FALLBACK,
+
+	/**
+	 * This mode executes all shuffles and broadcasts in a batch fashion, while
+	 * pipelining data between operations that exchange data only locally
+	 * between one producer and one consumer.
+	 */
+	BATCH,
+
+	/**
+	 * This mode executes the program in a strict batch way, including all points
+	 * where data is forwarded locally from one producer to one consumer. This mode
+	 * is typically more expensive to execute than the {@link #BATCH} mode. It does
+	 * guarantee that no successive operations are ever executed concurrently.
+	 */
+	BATCH_FORCED
+}