You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/16 22:24:06 UTC
[2/5] incubator-flink git commit: [APIs] Add ExecutionConfig
[APIs] Add ExecutionConfig
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/026311ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/026311ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/026311ae
Branch: refs/heads/master
Commit: 026311ae59b74ac5e5bdbcf79e9b38c683e38110
Parents: fb96f12
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Dec 15 18:37:42 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 16 20:52:25 2014 +0100
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfig.java | 157 +++++++++++++++++++
.../flink/api/java/ExecutionEnvironment.java | 40 +++--
.../org/apache/flink/api/scala/DataSet.scala | 12 +-
.../flink/api/scala/ExecutionEnvironment.scala | 18 ++-
4 files changed, 203 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/026311ae/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
new file mode 100644
index 0000000..3d110bd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import java.io.Serializable;
+
+/**
+ * A configuration config for configuring behaviour of the system, such as whether to use
+ * the closure cleaner, object-reuse mode...
+ */
+public class ExecutionConfig implements Serializable {
+
+ // Key for storing it in the Job Configuration
+ public static final String CONFIG_KEY = "runtime.config";
+
+ private boolean useClosureCleaner = true;
+ private int degreeOfParallelism = -1;
+ private int numberOfExecutionRetries = -1;
+
+ // For future use...
+// private boolean forceGenericSerializer = false;
+// private boolean objectReuse = false;
+
+ /**
+ * Enables the ClosureCleaner. This analyzes user code functions and sets fields to null
+ * that are not used. This will in most cases make closures or anonymous inner classes
+ * serializable that where not serializable due to some Scala or Java implementation artifact.
+ * User code must be serializable because it needs to be sent to worker nodes.
+ */
+ public ExecutionConfig enableClosureCleaner() {
+ useClosureCleaner = true;
+ return this;
+ }
+
+ /**
+ * Disables the ClosureCleaner. @see #enableClosureCleaner()
+ */
+ public ExecutionConfig disableClosureCleaner() {
+ useClosureCleaner = false;
+ return this;
+ }
+
+ /**
+ * Returns whether the ClosureCleaner is enabled. @see #enableClosureCleaner()
+ */
+ public boolean isClosureCleanerEnabled() {
+ return useClosureCleaner;
+ }
+
+ /**
+ * Gets the degree of parallelism with which operation are executed by default. Operations can
+ * individually override this value to use a specific degree of parallelism.
+ * Other operations may need to run with a different
+ * degree of parallelism - for example calling
+ * a reduce operation over the entire
+ * set will insert eventually an operation that runs non-parallel (degree of parallelism of one).
+ *
+ * @return The degree of parallelism used by operations, unless they override that value. This method
+ * returns {@code -1}, if the environments default parallelism should be used.
+ */
+
+ public int getDegreeOfParallelism() {
+ return degreeOfParallelism;
+ }
+
+ /**
+ * Sets the degree of parallelism (DOP) for operations executed through this environment.
+ * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
+ * x parallel instances.
+ * <p>
+ * This method overrides the default parallelism for this environment.
+ * The local execution environment uses by default a value equal to the number of hardware
+ * contexts (CPU cores / threads). When executing the program via the command line client
+ * from a JAR file, the default degree of parallelism is the one configured for that setup.
+ *
+ * @param degreeOfParallelism The degree of parallelism
+ */
+
+ public ExecutionConfig setDegreeOfParallelism(int degreeOfParallelism) {
+ if (degreeOfParallelism < 1) {
+ throw new IllegalArgumentException("Degree of parallelism must be at least one.");
+ }
+ this.degreeOfParallelism = degreeOfParallelism;
+ return this;
+ }
+
+ /**
+ * Gets the number of times the system will try to re-execute failed tasks. A value
+ * of {@code -1} indicates that the system default value (as defined in the configuration)
+ * should be used.
+ *
+ * @return The number of times the system will try to re-execute failed tasks.
+ */
+ public int getNumberOfExecutionRetries() {
+ return numberOfExecutionRetries;
+ }
+
+ /**
+ * Sets the number of times that failed tasks are re-executed. A value of zero
+ * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+ * default value (as defined in the configuration) should be used.
+ *
+ * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+ */
+ public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+ if (numberOfExecutionRetries < -1) {
+ throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+ }
+ this.numberOfExecutionRetries = numberOfExecutionRetries;
+ return this;
+ }
+
+ // These are for future use...
+// public ExecutionConfig forceGenericSerializer() {
+// forceGenericSerializer = true;
+// return this;
+// }
+//
+// public ExecutionConfig disableForceGenericSerializer() {
+// forceGenericSerializer = false;
+// return this;
+// }
+//
+// public boolean isForceGenericSerializerEnabled() {
+// return forceGenericSerializer;
+// }
+//
+// public ExecutionConfig enableObjectReuse() {
+// objectReuse = true;
+// return this;
+// }
+//
+// public ExecutionConfig disableObjectReuse() {
+// objectReuse = false;
+// return this;
+// }
+//
+// public boolean isObjectReuseEnabled() {
+// return objectReuse;
+// }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/026311ae/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3804679..c19e9aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.UUID;
import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
@@ -95,9 +96,7 @@ public abstract class ExecutionEnvironment {
private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
- private int degreeOfParallelism = -1;
-
- private int numberOfExecutionRetries = -1;
+ private ExecutionConfig config = new ExecutionConfig();
// --------------------------------------------------------------------------------------------
@@ -110,7 +109,22 @@ public abstract class ExecutionEnvironment {
protected ExecutionEnvironment() {
this.executionId = UUID.randomUUID();
}
-
+
+ /**
+ * Sets the config object.
+ */
+ public void setConfig(ExecutionConfig config) {
+ Validate.notNull(config);
+ this.config = config;
+ }
+
+ /**
+ * Gets the config object.
+ */
+ public ExecutionConfig getConfig() {
+ return config;
+ }
+
/**
* Gets the degree of parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific degree of parallelism via
@@ -123,7 +137,7 @@ public abstract class ExecutionEnvironment {
* returns {@code -1}, if the environments default parallelism should be used.
*/
public int getDegreeOfParallelism() {
- return degreeOfParallelism;
+ return config.getDegreeOfParallelism();
}
/**
@@ -139,11 +153,7 @@ public abstract class ExecutionEnvironment {
* @param degreeOfParallelism The degree of parallelism
*/
public void setDegreeOfParallelism(int degreeOfParallelism) {
- if (degreeOfParallelism < 1) {
- throw new IllegalArgumentException("Degree of parallelism must be at least one.");
- }
-
- this.degreeOfParallelism = degreeOfParallelism;
+ config.setDegreeOfParallelism(degreeOfParallelism);
}
/**
@@ -154,10 +164,7 @@ public abstract class ExecutionEnvironment {
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*/
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
- if (numberOfExecutionRetries < -1) {
- throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
- }
- this.numberOfExecutionRetries = numberOfExecutionRetries;
+ config.setNumberOfExecutionRetries(numberOfExecutionRetries);
}
/**
@@ -168,7 +175,7 @@ public abstract class ExecutionEnvironment {
* @return The number of times the system will try to re-execute failed tasks.
*/
public int getNumberOfExecutionRetries() {
- return numberOfExecutionRetries;
+ return config.getNumberOfExecutionRetries();
}
/**
@@ -742,8 +749,7 @@ public abstract class ExecutionEnvironment {
if (getDegreeOfParallelism() > 0) {
plan.setDefaultParallelism(getDegreeOfParallelism());
}
- plan.setNumberOfExecutionRetries(this.numberOfExecutionRetries);
-
+
try {
registerCachedFilesWithPlan(plan);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/026311ae/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 13d1f08..69bcfde 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -92,8 +92,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* Returns the execution environment associated with the current DataSet.
* @return associated execution environment
*/
- def getExecutionEnvironment: ExecutionEnvironment = new ExecutionEnvironment(set
- .getExecutionEnvironment)
+ def getExecutionEnvironment: ExecutionEnvironment =
+ new ExecutionEnvironment(set.getExecutionEnvironment)
/**
* Returns the underlying Java DataSet.
@@ -110,11 +110,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
- * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
- * serializable
+ * @throws InvalidProgramException if <tt>checkSerializable</tt> is set but <tt>f</tt>
+ * is not serializable
*/
private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
- ClosureCleaner.clean(f, checkSerializable)
+ if (set.getExecutionEnvironment.getConfig.isClosureCleanerEnabled) {
+ ClosureCleaner.clean(f, checkSerializable)
+ }
f
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/026311ae/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index e756e78..43f8609 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala
import java.util.UUID
import org.apache.commons.lang3.Validate
-import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult}
import org.apache.flink.api.java.io._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -28,7 +28,8 @@ import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase}
import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
import org.apache.flink.core.fs.Path
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, CollectionEnvironment}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv,
+CollectionEnvironment}
import org.apache.flink.api.common.io.{InputFormat, FileInputFormat}
import org.apache.flink.api.java.operators.DataSource
@@ -59,6 +60,19 @@ import scala.reflect.ClassTag
* be created.
*/
class ExecutionEnvironment(javaEnv: JavaEnv) {
+ /**
+ * Sets the config object.
+ */
+ def setConfig(config: ExecutionConfig): Unit = {
+ javaEnv.setConfig(config)
+ }
+
+ /**
+ * Gets the config object.
+ */
+ def getConfig: ExecutionConfig = {
+ javaEnv.getConfig
+ }
/**
* Sets the degree of parallelism (DOP) for operations executed through this environment.