You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/05 15:26:00 UTC

flink git commit: [hotfix] [core] Improve error messages of the Java Closure Cleaner

Repository: flink
Updated Branches:
  refs/heads/master 641a0d436 -> b1e508645


[hotfix] [core] Improve error messages of the Java Closure Cleaner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1e50864
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1e50864
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1e50864

Branch: refs/heads/master
Commit: b1e5086455257e5b7d967fd0715f5a2ab30734aa
Parents: 641a0d4
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 5 11:59:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 5 11:59:54 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/ClosureCleaner.java   | 144 ++++++++++++++-----
 1 file changed, 108 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1e50864/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index 8eaebb8..2f22a75 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -18,62 +18,110 @@
 
 package org.apache.flink.api.java;
 
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.objectweb.asm.ClassReader;
 import org.objectweb.asm.ClassVisitor;
 import org.objectweb.asm.MethodVisitor;
 import org.objectweb.asm.Opcodes;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
 
+/**
+ * The closure cleaner is a utility that tries to truncate the closure (enclosing instance)
+ * of non-static inner classes (created for inline transformation functions). That makes non-static
+ * inner classes in many cases serializable, where Java's default behavior renders them non-serializable
+ * without good reason.
+ */
 @Internal
 public class ClosureCleaner {
+	
 	private static Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class);
-
-	private static ClassReader getClassReader(Class<?> cls) {
-		String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
-		try {
-			return new ClassReader(cls.getResourceAsStream(className));
-		} catch (IOException e) {
-			throw new RuntimeException("Could not create ClassReader: " + e);
-		}
-	}
-
+	
+	/**
+	 * Tries to clean the closure of the given object, if the object is a non-static inner
+	 * class.
+	 * 
+	 * @param func The object whose closure should be cleaned.
+	 * @param checkSerializable Flag to indicate whether serializability should be checked after
+	 *                          the closure cleaning attempt.
+	 * 
+	 * @throws InvalidProgramException Thrown, if 'checkSerializable' is true, and the object was
+	 *                                 not serializable after the closure cleaning.
+	 * 
+	 * @throws RuntimeException A RuntimeException may be thrown, if the code of the class could not
+	 *                          be loaded, in order to process during teh closure cleaning.
+	 */
 	public static void clean(Object func, boolean checkSerializable) {
-		Class<?> cls = func.getClass();
+		if (func == null) {
+			return;
+		}
+		
+		final Class<?> cls = func.getClass();
 
 		// First find the field name of the "this$0" field, this can
-		// be "field$x" depending on the nesting
+		// be "this$x" depending on the nesting
+		boolean closureAccessed = false;
+		
 		for (Field f: cls.getDeclaredFields()) {
 			if (f.getName().startsWith("this$")) {
-				// found our field:
-				cleanThis0(func, cls, f.getName());
+				// found a closure referencing field - now try to clean
+				closureAccessed |= cleanThis0(func, cls, f.getName());
 			}
 		}
-
+		
 		if (checkSerializable) {
-			ensureSerializable(func);
+			try {
+				InstantiationUtil.serializeObject(func);
+			}
+			catch (Exception e) {
+				String functionType = getSuperClassOrInterfaceName(func.getClass());
+				
+				String msg = functionType == null ?
+						(func + " is not serializable.") :
+						("The implementation of the " + functionType + " is not serializable.");
+				
+				
+				if (closureAccessed) {
+					msg += " The implementation accesses fields of its enclosing class, which is " +
+							"a common reason for non-serializability. " +
+							"A common solution is to make the function a proper (non-inner) class, or" +
+							"a static inner class.";
+				} else {
+					msg += " The object probably contains or references non serializable fields.";
+				}
+				
+				throw new InvalidProgramException(msg, e);
+			}
 		}
 	}
 
-	private static void cleanThis0(Object func, Class<?> cls, String this0Name) {
-
+	public static void ensureSerializable(Object obj) {
+		try {
+			InstantiationUtil.serializeObject(obj);
+		} catch (Exception e) {
+			throw new InvalidProgramException("Object " + obj + " is not serializable", e);
+		}
+	}
+	
+	private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) {
+		
 		This0AccessFinder this0Finder = new This0AccessFinder(this0Name);
-
 		getClassReader(cls).accept(this0Finder, 0);
-
-
+		
+		final boolean accessesClosure = this0Finder.isThis0Accessed();
+				
 		if (LOG.isDebugEnabled()) {
-			LOG.debug(this0Name + " is accessed: " + this0Finder.isThis0Accessed());
+			LOG.debug(this0Name + " is accessed: " + accessesClosure);
 		}
 
-		if (!this0Finder.isThis0Accessed()) {
+		if (!accessesClosure) {
 			Field this0;
 			try {
 				this0 = func.getClass().getDeclaredField(this0Name);
@@ -81,30 +129,54 @@ public class ClosureCleaner {
 				// has no this$0, just return
 				throw new RuntimeException("Could not set " + this0Name + ": " + e);
 			}
-			this0.setAccessible(true);
+			
 			try {
+				this0.setAccessible(true);
 				this0.set(func, null);
-			} catch (IllegalAccessException e) {
+			}
+			catch (Exception e) {
 				// should not happen, since we use setAccessible
-				throw new RuntimeException("Could not set " + this0Name + ": " + e);
+				throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e);
 			}
 		}
+		
+		return accessesClosure;
 	}
-
-
-	public static void ensureSerializable(Object obj) {
+	
+	private static ClassReader getClassReader(Class<?> cls) {
+		String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
 		try {
-			InstantiationUtil.serializeObject(obj);
-		} catch (Exception e) {
-			throw new InvalidProgramException("Object " + obj + " not serializable", e);
+			return new ClassReader(cls.getResourceAsStream(className));
+		} catch (IOException e) {
+			throw new RuntimeException("Could not create ClassReader: " + e.getMessage(), e);
+		}
+	}
+	
+	
+	private static String getSuperClassOrInterfaceName(Class<?> cls) {
+		Class<?> superclass = cls.getSuperclass();
+		if (superclass.getName().startsWith("org.apache.flink")) {
+			return superclass.getSimpleName();
+		} else {
+			for (Class<?> inFace : cls.getInterfaces()) {
+				if (inFace.getName().startsWith("org.apache.flink")) {
+					return inFace.getSimpleName();
+				}
+			}
+			return null;
 		}
 	}
-
 }
 
+/**
+ * This visitor walks methods and finds accesses to the field with the reference to
+ * the enclosing class.
+ */
 class This0AccessFinder extends ClassVisitor {
-	private boolean isThis0Accessed = false;
-	private String this0Name;
+
+	private final String this0Name;
+	private boolean isThis0Accessed;
+	
 
 	public This0AccessFinder(String this0Name) {
 		super(Opcodes.ASM5);