You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/16 22:24:06 UTC

[2/5] incubator-flink git commit: [APIs] Add ExecutionConfig

[APIs] Add ExecutionConfig


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/026311ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/026311ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/026311ae

Branch: refs/heads/master
Commit: 026311ae59b74ac5e5bdbcf79e9b38c683e38110
Parents: fb96f12
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Dec 15 18:37:42 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Dec 16 20:52:25 2014 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 157 +++++++++++++++++++
 .../flink/api/java/ExecutionEnvironment.java    |  40 +++--
 .../org/apache/flink/api/scala/DataSet.scala    |  12 +-
 .../flink/api/scala/ExecutionEnvironment.scala  |  18 ++-
 4 files changed, 203 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/026311ae/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
new file mode 100644
index 0000000..3d110bd
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import java.io.Serializable;
+
+/**
+ * A configuration config for configuring behaviour of the system, such as whether to use
+ * the closure cleaner, object-reuse mode...
+ */
+public class ExecutionConfig implements Serializable {
+
+	// Key for storing it in the Job Configuration
+	public static final String CONFIG_KEY = "runtime.config";
+
+	private boolean useClosureCleaner = true;
+	private int degreeOfParallelism = -1;
+	private int numberOfExecutionRetries = -1;
+
+	// For future use...
+//	private boolean forceGenericSerializer = false;
+//	private boolean objectReuse = false;
+
+	/**
+	 * Enables the ClosureCleaner. This analyzes user code functions and sets fields to null
+	 * that are not used. This will in most cases make closures or anonymous inner classes
+	 * serializable that where not serializable due to some Scala or Java implementation artifact.
+	 * User code must be serializable because it needs to be sent to worker nodes.
+	 */
+	public ExecutionConfig enableClosureCleaner() {
+		useClosureCleaner = true;
+		return this;
+	}
+
+	/**
+	 * Disables the ClosureCleaner. @see #enableClosureCleaner()
+	 */
+	public ExecutionConfig disableClosureCleaner() {
+		useClosureCleaner = false;
+		return this;
+	}
+
+	/**
+	 * Returns whether the ClosureCleaner is enabled. @see #enableClosureCleaner()
+	 */
+	public boolean isClosureCleanerEnabled() {
+		return useClosureCleaner;
+	}
+
+	/**
+	 * Gets the degree of parallelism with which operation are executed by default. Operations can
+	 * individually override this value to use a specific degree of parallelism.
+	 * Other operations may need to run with a different
+	 * degree of parallelism - for example calling
+	 * a reduce operation over the entire
+	 * set will insert eventually an operation that runs non-parallel (degree of parallelism of one).
+	 *
+	 * @return The degree of parallelism used by operations, unless they override that value. This method
+	 *         returns {@code -1}, if the environments default parallelism should be used.
+	 */
+
+	public int getDegreeOfParallelism() {
+		return degreeOfParallelism;
+	}
+
+	/**
+	 * Sets the degree of parallelism (DOP) for operations executed through this environment.
+	 * Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with
+	 * x parallel instances.
+	 * <p>
+	 * This method overrides the default parallelism for this environment.
+	 * The local execution environment uses by default a value equal to the number of hardware
+	 * contexts (CPU cores / threads). When executing the program via the command line client
+	 * from a JAR file, the default degree of parallelism is the one configured for that setup.
+	 *
+	 * @param degreeOfParallelism The degree of parallelism
+	 */
+
+	public ExecutionConfig setDegreeOfParallelism(int degreeOfParallelism) {
+		if (degreeOfParallelism < 1) {
+			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
+		}
+		this.degreeOfParallelism = degreeOfParallelism;
+		return this;
+	}
+
+	/**
+	 * Gets the number of times the system will try to re-execute failed tasks. A value
+	 * of {@code -1} indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 *
+	 * @return The number of times the system will try to re-execute failed tasks.
+	 */
+	public int getNumberOfExecutionRetries() {
+		return numberOfExecutionRetries;
+	}
+
+	/**
+	 * Sets the number of times that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 *
+	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
+	 */
+	public ExecutionConfig setNumberOfExecutionRetries(int numberOfExecutionRetries) {
+		if (numberOfExecutionRetries < -1) {
+			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+		}
+		this.numberOfExecutionRetries = numberOfExecutionRetries;
+		return this;
+	}
+
+	// These are for future use...
+//	public ExecutionConfig forceGenericSerializer() {
+//		forceGenericSerializer = true;
+//		return this;
+//	}
+//
+//	public ExecutionConfig disableForceGenericSerializer() {
+//		forceGenericSerializer = false;
+//		return this;
+//	}
+//
+//	public boolean isForceGenericSerializerEnabled() {
+//		return forceGenericSerializer;
+//	}
+//
+//	public ExecutionConfig enableObjectReuse() {
+//		objectReuse = true;
+//		return this;
+//	}
+//
+//	public ExecutionConfig disableObjectReuse() {
+//		objectReuse = false;
+//		return this;
+//	}
+//
+//	public boolean isObjectReuseEnabled() {
+//		return objectReuse;
+//	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/026311ae/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3804679..c19e9aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
@@ -95,9 +96,7 @@ public abstract class ExecutionEnvironment {
 	
 	private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<Tuple2<String, DistributedCacheEntry>>();
 
-	private int degreeOfParallelism = -1;
-	
-	private int numberOfExecutionRetries = -1;
+	private ExecutionConfig config = new ExecutionConfig();
 	
 	
 	// --------------------------------------------------------------------------------------------
@@ -110,7 +109,22 @@ public abstract class ExecutionEnvironment {
 	protected ExecutionEnvironment() {
 		this.executionId = UUID.randomUUID();
 	}
-	
+
+	/**
+	 * Sets the config object.
+	 */
+	public void setConfig(ExecutionConfig config) {
+		Validate.notNull(config);
+		this.config = config;
+	}
+
+	/**
+	 * Gets the config object.
+	 */
+	public ExecutionConfig getConfig() {
+		return config;
+	}
+
 	/**
 	 * Gets the degree of parallelism with which operation are executed by default. Operations can
 	 * individually override this value to use a specific degree of parallelism via
@@ -123,7 +137,7 @@ public abstract class ExecutionEnvironment {
 	 *         returns {@code -1}, if the environments default parallelism should be used.
 	 */
 	public int getDegreeOfParallelism() {
-		return degreeOfParallelism;
+		return config.getDegreeOfParallelism();
 	}
 	
 	/**
@@ -139,11 +153,7 @@ public abstract class ExecutionEnvironment {
 	 * @param degreeOfParallelism The degree of parallelism
 	 */
 	public void setDegreeOfParallelism(int degreeOfParallelism) {
-		if (degreeOfParallelism < 1) {
-			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
-		}
-		
-		this.degreeOfParallelism = degreeOfParallelism;
+		config.setDegreeOfParallelism(degreeOfParallelism);
 	}
 	
 	/**
@@ -154,10 +164,7 @@ public abstract class ExecutionEnvironment {
 	 * @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
 	 */
 	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
-		if (numberOfExecutionRetries < -1) {
-			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
-		}
-		this.numberOfExecutionRetries = numberOfExecutionRetries;
+		config.setNumberOfExecutionRetries(numberOfExecutionRetries);
 	}
 	
 	/**
@@ -168,7 +175,7 @@ public abstract class ExecutionEnvironment {
 	 * @return The number of times the system will try to re-execute failed tasks.
 	 */
 	public int getNumberOfExecutionRetries() {
-		return numberOfExecutionRetries;
+		return config.getNumberOfExecutionRetries();
 	}
 	
 	/**
@@ -742,8 +749,7 @@ public abstract class ExecutionEnvironment {
 		if (getDegreeOfParallelism() > 0) {
 			plan.setDefaultParallelism(getDegreeOfParallelism());
 		}
-		plan.setNumberOfExecutionRetries(this.numberOfExecutionRetries);
-		
+
 		try {
 			registerCachedFilesWithPlan(plan);
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/026311ae/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 13d1f08..69bcfde 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -92,8 +92,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    * Returns the execution environment associated with the current DataSet.
    * @return associated execution environment
    */
-  def getExecutionEnvironment: ExecutionEnvironment = new ExecutionEnvironment(set
-    .getExecutionEnvironment)
+  def getExecutionEnvironment: ExecutionEnvironment =
+    new ExecutionEnvironment(set.getExecutionEnvironment)
 
   /**
    * Returns the underlying Java DataSet.
@@ -110,11 +110,13 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
    *
    * @param f the closure to clean
    * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
-   * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
-   *   serializable
+   * @throws InvalidProgramException if <tt>checkSerializable</tt> is set but <tt>f</tt>
+   *          is not serializable
    */
   private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
-    ClosureCleaner.clean(f, checkSerializable)
+    if (set.getExecutionEnvironment.getConfig.isClosureCleanerEnabled) {
+      ClosureCleaner.clean(f, checkSerializable)
+    }
     f
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/026311ae/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index e756e78..43f8609 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -20,7 +20,7 @@ package org.apache.flink.api.scala
 import java.util.UUID
 
 import org.apache.commons.lang3.Validate
-import org.apache.flink.api.common.JobExecutionResult
+import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult}
 import org.apache.flink.api.java.io._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo
@@ -28,7 +28,8 @@ import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase}
 import org.apache.flink.api.scala.operators.ScalaCsvInputFormat
 import org.apache.flink.core.fs.Path
 
-import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, CollectionEnvironment}
+import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv,
+CollectionEnvironment}
 import org.apache.flink.api.common.io.{InputFormat, FileInputFormat}
 
 import org.apache.flink.api.java.operators.DataSource
@@ -59,6 +60,19 @@ import scala.reflect.ClassTag
  *  be created.
  */
 class ExecutionEnvironment(javaEnv: JavaEnv) {
+  /**
+   * Sets the config object.
+   */
+  def setConfig(config: ExecutionConfig): Unit = {
+    javaEnv.setConfig(config)
+  }
+
+  /**
+   * Gets the config object.
+   */
+  def getConfig: ExecutionConfig = {
+    javaEnv.getConfig
+  }
 
   /**
    * Sets the degree of parallelism (DOP) for operations executed through this environment.