You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/09/24 12:21:34 UTC

flink git commit: [FLINK-4485] close and remove user class loader after job completion [Forced Update!]

Repository: flink
Updated Branches:
  refs/heads/release-1.1 06496439a -> 62c666f57 (forced update)


[FLINK-4485] close and remove user class loader after job completion

Keeping the user class loader around after job completion may lead to
excessive temp space usage because all user jars are kept until the
class loader is garbage collected. Tests showed that garbage collection
can be delayed for a long time after the class loader is not referenced
anymore. Note that for the class loader to not be referenced anymore,
its job has to be removed from the archive.

The fastest way to minimize temp space usage is to close and remove the
URLClassloader after job completion. This requires us to keep a
serializable copy of all data which needs the user class loader after
job completion, e.g. to display data on the web interface.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62c666f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62c666f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62c666f5

Branch: refs/heads/release-1.1
Commit: 62c666f5794fa211bf570874b1b77044fd6840ac
Parents: 8fd08bf
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 14 11:00:58 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Sat Sep 24 14:21:07 2016 +0200

----------------------------------------------------------------------
 .../flink/client/program/JobWithJars.java       |  4 +-
 .../webmonitor/handlers/JobConfigHandler.java   | 47 +++++-------
 .../librarycache/BlobLibraryCacheManager.java   | 42 ++++++-----
 .../librarycache/FlinkUserCodeClassLoader.java  | 35 +++++++++
 .../runtime/executiongraph/ExecutionGraph.java  | 34 +++++++++
 .../archive/ExecutionConfigSummary.java         | 75 ++++++++++++++++++++
 .../apache/flink/runtime/taskmanager/Task.java  |  3 +-
 .../flink/test/web/WebFrontendITCase.java       |  2 +-
 8 files changed, 191 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index ef02527..d5a3014 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -22,12 +22,12 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 
 /**
  * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain
@@ -134,6 +134,6 @@ public class JobWithJars {
 		for (int i = 0; i < classpaths.size(); i++) {
 			urls[i + jars.size()] = classpaths.get(i);
 		}
-		return new URLClassLoader(urls, parent);
+		return new FlinkUserCodeClassLoader(urls, parent);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index cd63630..75389b1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -22,8 +22,8 @@ import java.io.StringWriter;
 import java.util.Map;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 /**
@@ -45,37 +45,28 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 		gen.writeStringField("jid", graph.getJobID().toString());
 		gen.writeStringField("name", graph.getJobName());
 
-		ExecutionConfig ec;
-		try {
-			ec = graph.getSerializedExecutionConfig().deserializeValue(graph.getUserClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Couldn't deserialize ExecutionConfig.", e);
-		}
+		final ExecutionConfigSummary summary = graph.getExecutionConfigSummary();
 
-		if (ec != null) {
+		if (summary != null) {
 			gen.writeObjectFieldStart("execution-config");
-			
-			gen.writeStringField("execution-mode", ec.getExecutionMode().name());
-
-			final String restartStrategyDescription = ec.getRestartStrategy() != null ? ec.getRestartStrategy().getDescription() : "default";
-			gen.writeStringField("restart-strategy", restartStrategyDescription);
-			gen.writeNumberField("job-parallelism", ec.getParallelism());
-			gen.writeBooleanField("object-reuse-mode", ec.isObjectReuseEnabled());
-
-			ExecutionConfig.GlobalJobParameters uc = ec.getGlobalJobParameters();
-			if (uc != null) {
-				Map<String, String> ucVals = uc.toMap();
-				if (ucVals != null) {
-					gen.writeObjectFieldStart("user-config");
-					
-					for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
-						gen.writeStringField(ucVal.getKey(), ucVal.getValue());
-					}
-
-					gen.writeEndObject();
+
+			gen.writeStringField("execution-mode", summary.getExecutionMode());
+
+			gen.writeStringField("restart-strategy", summary.getRestartStrategyDescription());
+			gen.writeNumberField("job-parallelism", summary.getParallelism());
+			gen.writeBooleanField("object-reuse-mode", summary.getObjectReuseEnabled());
+
+			Map<String, String> ucVals = summary.getGlobalJobParameters();
+			if (ucVals != null) {
+				gen.writeObjectFieldStart("user-config");
+
+				for (Map.Entry<String, String> ucVal : ucVals.entrySet()) {
+					gen.writeStringField(ucVal.getKey(), ucVal.getValue());
 				}
+
+				gen.writeEndObject();
 			}
-			
+
 			gen.writeEndObject();
 		}
 		gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index d1fbc70..c94768d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.execution.librarycache;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import java.net.URLClassLoader;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -138,8 +138,7 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 					count++;
 				}
 
-				URLClassLoader classLoader = new FlinkUserCodeClassLoader(urls);
-				cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, classLoader, task));
+				cacheEntries.put(jobId, new LibraryCacheEntry(requiredJarFiles, urls, task));
 			}
 			else {
 				entry.register(task, requiredJarFiles);
@@ -156,14 +155,16 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 	public void unregisterTask(JobID jobId, ExecutionAttemptID task) {
 		Preconditions.checkNotNull(jobId, "The JobId must not be null.");
 		Preconditions.checkNotNull(task, "The task execution id must not be null.");
-		
+
 		synchronized (lockObject) {
 			LibraryCacheEntry entry = cacheEntries.get(jobId);
-			
+
 			if (entry != null) {
 				if (entry.unregister(task)) {
 					cacheEntries.remove(jobId);
-					
+
+					entry.releaseClassLoader();
+
 					for (BlobKey key : entry.getLibraries()) {
 						unregisterReferenceToBlobKey(key);
 					}
@@ -286,17 +287,17 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 	 */
 	private static class LibraryCacheEntry {
 		
-		private final ClassLoader classLoader;
+		private final FlinkUserCodeClassLoader classLoader;
 		
 		private final Set<ExecutionAttemptID> referenceHolders;
 		
 		private final Set<BlobKey> libraries;
 		
 		
-		public LibraryCacheEntry(Collection<BlobKey> libraries, ClassLoader classLoader, ExecutionAttemptID initialReference) {
-			this.classLoader = classLoader;
-			this.libraries = new HashSet<BlobKey>(libraries);
-			this.referenceHolders = new HashSet<ExecutionAttemptID>();
+		public LibraryCacheEntry(Collection<BlobKey> libraries, URL[] libraryURLs, ExecutionAttemptID initialReference) {
+			this.classLoader = new FlinkUserCodeClassLoader(libraryURLs);
+			this.libraries = new HashSet<>(libraries);
+			this.referenceHolders = new HashSet<>();
 			this.referenceHolders.add(initialReference);
 		}
 		
@@ -326,15 +327,18 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
 		public int getNumberOfReferenceHolders() {
 			return referenceHolders.size();
 		}
-	}
-
-	/**
-	 * Give the URLClassLoader a nicer name for debugging purposes.
-	 */
-	private static class FlinkUserCodeClassLoader extends URLClassLoader {
 
-		public FlinkUserCodeClassLoader(URL[] urls) {
-			super(urls, FlinkUserCodeClassLoader.class.getClassLoader());
+		/**
+		 * Release the class loader to ensure any file descriptors are closed
+		 * and the cached libraries are deleted immediately.
+		 */
+		void releaseClassLoader() {
+			try {
+				classLoader.close();
+			} catch (IOException e) {
+				LOG.warn("Failed to release user code class loader for " + Arrays.toString(libraries.toArray()));
+			}
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
new file mode 100644
index 0000000..015f6c7
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.execution.librarycache;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+
+/**
+ * Gives the URLClassLoader a nicer name for debugging purposes.
+ */
+public class FlinkUserCodeClassLoader extends URLClassLoader {
+
+	public FlinkUserCodeClassLoader(URL[] urls) {
+		this(urls, FlinkUserCodeClassLoader.class.getClassLoader());
+	}
+
+	public FlinkUserCodeClassLoader(URL[] urls, ClassLoader parent) {
+		super(urls, parent);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 4229105..8cf8354 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -41,6 +41,8 @@ import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
+import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -237,6 +239,9 @@ public class ExecutionGraph implements Serializable {
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
 
+	/** Serializable summary of all job config values, e.g. for web interface */
+	private ExecutionConfigSummary executionConfigSummary;
+
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -314,6 +319,16 @@ public class ExecutionGraph implements Serializable {
 		this.restartStrategy = restartStrategy;
 
 		metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
+
+		// create a summary of all relevant data accessed in the web interface's JobConfigHandler
+		try {
+			ExecutionConfig executionConfig = serializedConfig.deserializeValue(userClassLoader);
+			if (executionConfig != null) {
+				this.executionConfigSummary = new ExecutionConfigSummary(executionConfig);
+			}
+		} catch (IOException | ClassNotFoundException e) {
+			LOG.error("Couldn't create ExecutionConfigSummary for job {} ", jobID, e);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1014,10 +1029,29 @@ public class ExecutionGraph implements Serializable {
 		jobStatusListenerActors.clear();
 		executionListenerActors.clear();
 
+		if (userClassLoader instanceof FlinkUserCodeClassLoader) {
+			try {
+				// close the classloader to free space of user jars immediately
+				// otherwise we have to wait until garbage collection
+				((FlinkUserCodeClassLoader) userClassLoader).close();
+			} catch (IOException e) {
+				LOG.warn("Failed to close the user classloader for job {}", jobID, e);
+			}
+		}
+		userClassLoader = null;
+
 		isArchived = true;
 	}
 
 	/**
+	 * Returns the serializable ExecutionConfigSummary
+	 * @return ExecutionConfigSummary which may be null in case of errors
+	 */
+	public ExecutionConfigSummary getExecutionConfigSummary() {
+		return executionConfigSummary;
+	}
+
+	/**
 	 * Returns the serialized {@link ExecutionConfig}.
 	 *
 	 * @return ExecutionConfig

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
new file mode 100644
index 0000000..ad4677f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph.archive;
+
+import org.apache.flink.api.common.ExecutionConfig;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Serializable class which is created when archiving the job.
+ * It can be used to display job information on the web interface
+ * without having to keep the classloader around after job completion.
+ */
+public class ExecutionConfigSummary implements Serializable {
+
+	private final String executionMode;
+	private final String restartStrategyDescription;
+	private final int parallelism;
+	private final boolean objectReuseEnabled;
+	private final Map<String, String> globalJobParameters;
+
+	public ExecutionConfigSummary(ExecutionConfig ec) {
+		executionMode = ec.getExecutionMode().name();
+		if (ec.getRestartStrategy() != null) {
+			restartStrategyDescription = ec.getRestartStrategy().getDescription();
+		} else {
+			restartStrategyDescription = "default";
+		}
+		parallelism = ec.getParallelism();
+		objectReuseEnabled = ec.isObjectReuseEnabled();
+		if (ec.getGlobalJobParameters() != null
+				&& ec.getGlobalJobParameters().toMap() != null) {
+			globalJobParameters = ec.getGlobalJobParameters().toMap();
+		} else {
+			globalJobParameters = Collections.emptyMap();
+		}
+	}
+
+	public String getExecutionMode() {
+		return executionMode;
+	}
+
+	public String getRestartStrategyDescription() {
+		return restartStrategyDescription;
+	}
+
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	public boolean getObjectReuseEnabled() {
+		return objectReuseEnabled;
+	}
+
+	public Map<String, String> getGlobalJobParameters() {
+		return globalJobParameters;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index dbc0b62..25a7e29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -459,6 +459,7 @@ public class Task implements Runnable {
 		Map<String, Future<Path>> distributedCacheEntries = new HashMap<String, Future<Path>>();
 		AbstractInvokable invokable = null;
 
+		ClassLoader userCodeClassLoader = null;
 		try {
 			// ----------------------------
 			//  Task Bootstrap - We periodically
@@ -469,7 +470,7 @@ public class Task implements Runnable {
 			// this may involve downloading the job's JAR files and/or classes
 			LOG.info("Loading JAR files for task " + taskNameWithSubtask);
 
-			final ClassLoader userCodeClassLoader = createUserCodeClassloader(libraryCache);
+			userCodeClassLoader = createUserCodeClassloader(libraryCache);
 			final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
 
 			if (executionConfig.getTaskCancellationInterval() >= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/62c666f5/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 032c8fe..39042d2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -249,7 +249,7 @@ public class WebFrontendITCase {
 			assertEquals(response.getType(), MimeTypes.getMimeTypeForExtension("json"));
 			assertEquals("{\"jid\":\""+jid+"\",\"name\":\"Stoppable streaming test job\"," +
 					"\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"default\"," +
-					"\"job-parallelism\":-1,\"object-reuse-mode\":false}}", response.getContent());
+					"\"job-parallelism\":-1,\"object-reuse-mode\":false,\"user-config\":{}}}", response.getContent());
 		}
 	}