You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/17 10:16:24 UTC
[4/8] flink git commit: [FLINK-1671] [core] Add execution mode to
execution config
[FLINK-1671] [core] Add execution mode to execution config
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f8e1566
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f8e1566
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f8e1566
Branch: refs/heads/master
Commit: 7f8e156617ae68d8339249ee276bcc36552a49da
Parents: 4782770
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Mar 10 20:25:01 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 17 10:10:08 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/compiler/PactCompiler.java | 78 +++++-------
.../flink/compiler/dag/PactConnection.java | 51 ++++++--
.../flink/api/common/ExecutionConfig.java | 127 ++++++++++++++-----
.../apache/flink/api/common/ExecutionMode.java | 83 ++++++++++++
4 files changed, 251 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7f8e1566/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index a82bb74..d5cbf68 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -353,25 +353,18 @@ public class PactCompiler {
// ------------------------------------------------------------------------
/**
- * Creates a new compiler instance. The compiler has no access to statistics about the
+ * Creates a new optimizer instance. The optimizer has no access to statistics about the
* inputs and can hence not determine any properties. It will perform all optimization with
- * unknown sizes and default to the most robust execution strategies. The
- * compiler also uses conservative default estimates for the operator costs, since
- * it has no access to another cost estimator.
- * <p>
- * The address of the job manager (to obtain system characteristics) is determined via the global configuration.
+ * unknown sizes and hence use only the heuristic cost functions, which result in the selection
+ * of the most robust execution strategies.
*/
public PactCompiler() {
this(null, new DefaultCostEstimator());
}
/**
- * Creates a new compiler instance that uses the statistics object to determine properties about the input.
- * Given those statistics, the compiler can make better choices for the execution strategies.
- * as if no filesystem was given. The compiler uses conservative default estimates for the operator costs, since
- * it has no access to another cost estimator.
- * <p>
- * The address of the job manager (to obtain system characteristics) is determined via the global configuration.
+ * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
+ * Given those statistics, the optimizer can make better choices for the execution strategies.
*
* @param stats
* The statistics to be used to determine the input properties.
@@ -381,27 +374,24 @@ public class PactCompiler {
}
/**
- * Creates a new compiler instance. The compiler has no access to statistics about the
+ * Creates a new optimizer instance. The optimizer has no access to statistics about the
* inputs and can hence not determine any properties. It will perform all optimization with
- * unknown sizes and default to the most robust execution strategies. It uses
- * however the given cost estimator to compute the costs of the individual operations.
- * <p>
- * The address of the job manager (to obtain system characteristics) is determined via the global configuration.
+ * unknown sizes and hence use only the heuristic cost functions, which result in the selection
+ * of the most robust execution strategies.
+ *
+ * The optimizer uses the given cost estimator to compute the costs of the individual operations.
*
- * @param estimator
- * The <tt>CostEstimator</tt> to use to cost the individual operations.
+ * @param estimator The cost estimator to use to cost the individual operations.
*/
public PactCompiler(CostEstimator estimator) {
this(null, estimator);
}
/**
- * Creates a new compiler instance that uses the statistics object to determine properties about the input.
- * Given those statistics, the compiler can make better choices for the execution strategies.
- * as if no filesystem was given. It uses the given cost estimator to compute the costs of the individual
- * operations.
- * <p>
- * The address of the job manager (to obtain system characteristics) is determined via the global configuration.
+ * Creates a new optimizer instance that uses the statistics object to determine properties about the input.
+ * Given those statistics, the optimizer can make better choices for the execution strategies.
+ *
+ * The optimizer uses the given cost estimator to compute the costs of the individual operations.
*
* @param stats
* The statistics to be used to determine the input properties.
@@ -412,9 +402,10 @@ public class PactCompiler {
this.statistics = stats;
this.costEstimator = estimator;
- // determine the default parallelization degree
- this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
- ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
+ // determine the default parallelism
+ this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(
+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
if (defaultDegreeOfParallelism < 1) {
LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
@@ -444,36 +435,35 @@ public class PactCompiler {
// ------------------------------------------------------------------------
/**
- * Translates the given plan in to an OptimizedPlan, where all nodes have their local strategy assigned
- * and all channels have a shipping strategy assigned. The compiler connects to the job manager to obtain information
- * about the available instances and their memory and then chooses an instance type to schedule the execution on.
- * <p>
- * The compilation process itself goes through several phases:
- * <ol>
- * <li>Create an optimizer data flow representation of the program, assign parallelism and compute size estimates.</li>
- * <li>Compute interesting properties and auxiliary structures.</li>
- * <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
- * opposed to the Database approaches), because we support plans that are not trees.</li>
- * </ol>
+ * Translates the given program to an OptimizedPlan, where all nodes have their local strategy assigned
+ * and all channels have a shipping strategy assigned.
+ *
+ * For more details on the optimization phase, see the comments for
+ * {@link #compile(org.apache.flink.api.common.Plan, org.apache.flink.compiler.postpass.OptimizerPostPass)}.
*
* @param program The program to be translated.
* @return The optimized plan.
+ *
* @throws CompilerException
* Thrown, if the plan is invalid or the optimizer encountered an inconsistent
* situation during the compilation process.
*/
public OptimizedPlan compile(Plan program) throws CompilerException {
- // -------------------- try to get the connection to the job manager ----------------------
- // --------------------------to obtain instance information --------------------------------
final OptimizerPostPass postPasser = getPostPassFromPlan(program);
return compile(program, postPasser);
}
/**
- * Translates the given pact plan in to an OptimizedPlan, where all nodes have their local strategy assigned
- * and all channels have a shipping strategy assigned. The process goes through several phases:
+ * Translates the given program to an OptimizedPlan. The optimized plan describes for each operator
+ * which strategy to use (such as hash join versus sort-merge join), what data exchange method to use
+ * (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined, batch),
+ * where to cache intermediate results, etc,
+ *
+ * The optimization happens in multiple phases:
* <ol>
- * <li>Create <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
+ * <li>Create optimizer dag implementation of the program.
+ *
+ * <tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute size estimates.</li>
* <li>Compute interesting properties and auxiliary structures.</li>
* <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting property computation (as
* opposed to the Database approaches), because we support plans that are not trees.</li>
http://git-wip-us.apache.org/repos/asf/flink/blob/7f8e1566/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java
index c5ba395..41369c9 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PactConnection.java
@@ -16,20 +16,24 @@
* limitations under the License.
*/
-
package org.apache.flink.compiler.dag;
+import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.compiler.dataproperties.InterestingProperties;
import org.apache.flink.compiler.plandump.DumpableConnection;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
/**
- * A connection between to PACTs. Represents a channel together with a data shipping
- * strategy. The data shipping strategy of a channel may be fixed though compiler hints. In that
- * case, it is not modifiable.
- * <p>
- * The connections are also used by the optimization algorithm to propagate interesting properties from the sinks in the
- * direction of the sources.
+ * A connection between to operators. Represents an intermediate result
+ * and a data exchange between the two operators.
+ *
+ * The data exchange has a mode in which it performs (batch / pipelined)
+ *
+ * The data exchange strategy may be set on this connection, in which case
+ * it is fixed and will not be determined during candidate plan enumeration.
+ *
+ * During the enumeration of interesting properties, this connection also holds
+ * all interesting properties generated by the successor operator.
*/
public class PactConnection implements EstimateProvider, DumpableConnection<OptimizerNode> {
@@ -37,11 +41,13 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
private final OptimizerNode target; // The target node of the connection.
+ private final ExecutionMode dataExchangeMode; // defines whether to use batch or pipelined data exchange
+
private InterestingProperties interestingProps; // local properties that succeeding nodes are interested in
- private ShipStrategyType shipStrategy; // The data distribution strategy, if preset
+ private ShipStrategyType shipStrategy; // The data shipping strategy, if predefined.
- private TempMode materializationMode = TempMode.NONE;
+ private TempMode materializationMode = TempMode.NONE; // the materialization mode
private int maxDepth = -1;
@@ -54,8 +60,8 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
* @param target
* The target node.
*/
- public PactConnection(OptimizerNode source, OptimizerNode target) {
- this(source, target, null);
+ public PactConnection(OptimizerNode source, OptimizerNode target, ExecutionMode exchangeMode) {
+ this(source, target, null, exchangeMode);
}
/**
@@ -67,18 +73,24 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
* The target node.
* @param shipStrategy
* The shipping strategy.
+ * @param exchangeMode
+ * The data exchange mode (pipelined / batch)
*/
- public PactConnection(OptimizerNode source, OptimizerNode target, ShipStrategyType shipStrategy) {
+ public PactConnection(OptimizerNode source, OptimizerNode target,
+ ShipStrategyType shipStrategy, ExecutionMode exchangeMode)
+ {
if (source == null || target == null) {
throw new NullPointerException("Source and target must not be null.");
}
this.source = source;
this.target = target;
this.shipStrategy = shipStrategy;
+ this.dataExchangeMode = exchangeMode;
}
/**
- * Creates a new Connection between two nodes.
+ * Constructor to create a result from an operator that is not
+ * consumed by another operator.
*
* @param source
* The source node.
@@ -90,6 +102,7 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
this.source = source;
this.target = null;
this.shipStrategy = ShipStrategyType.NONE;
+ this.dataExchangeMode = null;
}
/**
@@ -130,6 +143,18 @@ public class PactConnection implements EstimateProvider, DumpableConnection<Opti
}
/**
+ * Gets the data exchange mode to use for this connection.
+ *
+ * @return The data exchange mode to use for this connection.
+ */
+ public ExecutionMode getDataExchangeMode() {
+ if (dataExchangeMode == null) {
+ throw new IllegalStateException("This connection does not have a data exchange");
+ }
+ return dataExchangeMode;
+ }
+
+ /**
* Gets the interesting properties object for this pact connection.
* If the interesting properties for this connections have not yet been set,
* this method returns null.
http://git-wip-us.apache.org/repos/asf/flink/blob/7f8e1566/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index d315440..48a9c76 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -25,8 +25,24 @@ import java.util.ArrayList;
import java.util.List;
/**
- * A configuration config for configuring behavior of the system, such as whether to use
- * the closure cleaner, object-reuse mode...
+ * A config to define the behavior of the program execution. It allows to define (among other
+ * options) the following settings:
+ *
+ * <ul>
+ * <li>The default parallelism of the program, i.e., how many parallel tasks to use for
+ * all functions that do not define a specific value directly.</li>
+ * <li>The number of retries in the case of failed executions.</li>
+ * <li>The {@link ExecutionMode} of the program: Batch or Pipelined.
+ * The default execution mode is {@link ExecutionMode#PIPELINED}</li>
+ * <li>Enabling or disabling the "closure cleaner". The closure cleaner pre-processes
+ * the implementations of functions. In case they are (anonymous) inner classes,
+ * it removes unused references to the enclosing class to fix certain serialization-related
+ * problems and to reduce the size of the closure.</li>
+ * <li>The config allows to register types and serializers to increase the efficiency of
+ * handling <i>generic types</i> and <i>POJOs</i>. This is usually only needed
+ * when the functions return not only the types declared in their signature, but
+ * also subclasses of those types.</li>
+ * </ul>
*/
public class ExecutionConfig implements Serializable {
@@ -41,9 +57,17 @@ public class ExecutionConfig implements Serializable {
*/
public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
+ // --------------------------------------------------------------------------------------------
+
+ /** Defines how data exchange happens - batch or pipelined */
+ private ExecutionMode executionMode = ExecutionMode.PIPELINED;
+
private boolean useClosureCleaner = true;
- private int degreeOfParallelism = -1;
+
+ private int parallelism = -1;
+
private int numberOfExecutionRetries = -1;
+
private boolean forceKryo = false;
private boolean objectReuse = false;
@@ -52,17 +76,27 @@ public class ExecutionConfig implements Serializable {
private boolean serializeGenericTypesWithAvro = false;
-
-
// Serializers and types registered with Kryo and the PojoSerializer
// we store them in lists to ensure they are registered in order in all kryo instances.
- private final List<Entry<Class<?>, Serializer<?>>> registeredTypesWithKryoSerializers = new ArrayList<Entry<Class<?>, Serializer<?>>>();
- private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses = new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
- private final List<Entry<Class<?>, Serializer<?>>> defaultKryoSerializers = new ArrayList<Entry<Class<?>, Serializer<?>>>();
- private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses = new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+
+ private final List<Entry<Class<?>, Serializer<?>>> registeredTypesWithKryoSerializers =
+ new ArrayList<Entry<Class<?>, Serializer<?>>>();
+
+ private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> registeredTypesWithKryoSerializerClasses =
+ new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+
+ private final List<Entry<Class<?>, Serializer<?>>> defaultKryoSerializers =
+ new ArrayList<Entry<Class<?>, Serializer<?>>>();
+
+ private final List<Entry<Class<?>, Class<? extends Serializer<?>>>> defaultKryoSerializerClasses =
+ new ArrayList<Entry<Class<?>, Class<? extends Serializer<?>>>>();
+
private final List<Class<?>> registeredKryoTypes = new ArrayList<Class<?>>();
+
private final List<Class<?>> registeredPojoTypes = new ArrayList<Class<?>>();
+ // --------------------------------------------------------------------------------------------
+
/**
* Enables the ClosureCleaner. This analyzes user code functions and sets fields to null
* that are not used. This will in most cases make closures or anonymous inner classes
@@ -75,7 +109,9 @@ public class ExecutionConfig implements Serializable {
}
/**
- * Disables the ClosureCleaner. @see #enableClosureCleaner()
+ * Disables the ClosureCleaner.
+ *
+ * @see #enableClosureCleaner()
*/
public ExecutionConfig disableClosureCleaner() {
useClosureCleaner = false;
@@ -83,31 +119,32 @@ public class ExecutionConfig implements Serializable {
}
/**
- * Returns whether the ClosureCleaner is enabled. @see #enableClosureCleaner()
+ * Returns whether the ClosureCleaner is enabled.
+ *
+ * @see #enableClosureCleaner()
*/
public boolean isClosureCleanerEnabled() {
return useClosureCleaner;
}
/**
- * Gets the degree of parallelism with which operation are executed by default. Operations can
- * individually override this value to use a specific degree of parallelism.
- * Other operations may need to run with a different
- * degree of parallelism - for example calling
- * a reduce operation over the entire
- * set will insert eventually an operation that runs non-parallel (degree of parallelism of one).
+ * Gets the parallelism with which operation are executed by default. Operations can
+ * individually override this value to use a specific parallelism.
*
- * @return The degree of parallelism used by operations, unless they override that value. This method
- * returns {@code -1}, if the environments default parallelism should be used.
+ * Other operations may need to run with a different parallelism - for example calling
+ * a reduce operation over the entire data set will involve an operation that runs
+ * with a parallelism of one (the final reduce to the single result value).
+ *
+ * @return The parallelism used by operations, unless they override that value. This method
+ * returns {@code -1}, if the environment's default parallelism should be used.
*/
-
public int getDegreeOfParallelism() {
- return degreeOfParallelism;
+ return parallelism;
}
/**
- * Sets the degree of parallelism (DOP) for operations executed through this environment.
- * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
+ * Sets the parallelism for operations executed through this environment.
+ * Setting a parallelism of x here will cause all operators (such as join, map, reduce) to run with
* x parallel instances.
* <p>
* This method overrides the default parallelism for this environment.
@@ -115,14 +152,14 @@ public class ExecutionConfig implements Serializable {
* contexts (CPU cores / threads). When executing the program via the command line client
* from a JAR file, the default degree of parallelism is the one configured for that setup.
*
- * @param degreeOfParallelism The degree of parallelism
+ * @param parallelism The parallelism to use
*/
-
- public ExecutionConfig setDegreeOfParallelism(int degreeOfParallelism) {
- if (degreeOfParallelism < 1) {
- throw new IllegalArgumentException("Degree of parallelism must be at least one.");
+ public ExecutionConfig setDegreeOfParallelism(int parallelism) {
+ if (parallelism < 1 && parallelism != -1) {
+ throw new IllegalArgumentException(
+ "Degree of parallelism must be at least one, or -1 (use system default).");
}
- this.degreeOfParallelism = degreeOfParallelism;
+ this.parallelism = parallelism;
return this;
}
@@ -146,13 +183,38 @@ public class ExecutionConfig implements Serializable {
*/
public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) {
if (numberOfExecutionRetries < -1) {
- throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+ throw new IllegalArgumentException(
+ "The number of execution retries must be non-negative, or -1 (use system default)");
}
this.numberOfExecutionRetries = numberOfExecutionRetries;
return this;
}
/**
+ * Sets the execution mode to execute the program. The execution mode defines whether
+ * data exchanges are performed in a batch or on a pipelined manner.
+ *
+ * The default execution mode is {@link ExecutionMode#PIPELINED}.
+ *
+ * @param executionMode The execution mode to use.
+ */
+ public void setExecutionMode(ExecutionMode executionMode) {
+ this.executionMode = executionMode;
+ }
+
+ /**
+ * Gets the execution mode used to execute the program. The execution mode defines whether
+ * data exchanges are performed in a batch or on a pipelined manner.
+ *
+ * The default execution mode is {@link ExecutionMode#PIPELINED}.
+ *
+ * @return The execution mode for the program.
+ */
+ public ExecutionMode getExecutionMode() {
+ return executionMode;
+ }
+
+ /**
* Force TypeExtractor to use Kryo serializer for POJOS even though we could analyze as POJO.
* In some cases this might be preferable. For example, when using interfaces
* with subclasses that cannot be analyzed as POJO.
@@ -161,7 +223,6 @@ public class ExecutionConfig implements Serializable {
forceKryo = true;
}
-
/**
* Disable use of Kryo serializer for all POJOs.
*/
@@ -403,8 +464,12 @@ public class ExecutionConfig implements Serializable {
public static class Entry<K, V> implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
private final K k;
private final V v;
+
public Entry(K k, V v) {
this.k = k;
this.v = v;
http://git-wip-us.apache.org/repos/asf/flink/blob/7f8e1566/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
new file mode 100644
index 0000000..f3e958e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionMode.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+/**
+ * The execution mode specifies how a batch program is executed in terms
+ * of data exchange: pipelining or batched.
+ */
+public enum ExecutionMode {
+
+ /**
+ * Executes the program in a pipelined fashion (including shuffles and broadcasts),
+ * except for data exchanges that are susceptible to deadlocks when pipelining.
+ * These data exchanges are performed in a batch manner.
+ *
+ * An example of situations that are susceptible to deadlocks (when executed in a
+ * pipelined manner) are data flows that branch (one data set consumed by multiple
+ * operations) and re-join later:
+ * <pre>{@code
+ * DataSet data = ...;
+ * DataSet mapped1 = data.map(new MyMapper());
+ * DataSet mapped2 = data.map(new AnotherMapper());
+ * mapped1.join(mapped2).where(...).equalTo(...);
+ * }</pre>
+ */
+ PIPELINED,
+
+ /**
+ * Executes the program in a pipelined fashion (including shuffles and broadcasts),
+ * <strong>including</strong> data exchanges that are susceptible to deadlocks when
+ * executed via pipelining.
+ *
+ * Usually, {@link #PIPELINED} is the preferable option, which pipelines most
+ * data exchanges and only uses batch data exchanges in situations that are
+ * susceptible to deadlocks.
+ *
+ * This option should only be used with care and only in situations where the
+ * programmer is sure that the program is safe for full pipelining and that
+ * Flink was too conservative when choosing the batch exchange at a certain
+ * point.
+ */
+ PIPELINED_FORCED,
+
+// This is for later, we are missing a bit of infrastructure for this.
+// /**
+// * The execution mode starts executing the program in a pipelined fashion
+// * (except for deadlock prone situations), similar to the {@link #PIPELINED}
+// * option. In the case of a task failure, re-execution happens in a batched
+// * mode, as defined for the {@link #BATCH} option.
+// */
+// PIPELINED_WITH_BATCH_FALLBACK,
+
+ /**
+ * This mode executes all shuffles and broadcasts in a batch fashion, while
+ * pipelining data between operations that exchange data only locally
+ * between one producer and one consumer.
+ */
+ BATCH,
+
+ /**
+ * This mode executes the program in a strict batch way, including all points
+ * where data is forwarded locally from one producer to one consumer. This mode
+ * is typically more expensive to execute than the {@link #BATCH} mode. It does
+ * guarantee that no successive operations are ever executed concurrently.
+ */
+ BATCH_FORCED
+}