You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/04/05 15:26:00 UTC
flink git commit: [hotfix] [core] Improve error messages of the Java
Closure Cleaner
Repository: flink
Updated Branches:
refs/heads/master 641a0d436 -> b1e508645
[hotfix] [core] Improve error messages of the Java Closure Cleaner
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1e50864
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1e50864
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1e50864
Branch: refs/heads/master
Commit: b1e5086455257e5b7d967fd0715f5a2ab30734aa
Parents: 641a0d4
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Apr 5 11:59:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Apr 5 11:59:54 2016 +0200
----------------------------------------------------------------------
.../apache/flink/api/java/ClosureCleaner.java | 144 ++++++++++++++-----
1 file changed, 108 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b1e50864/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index 8eaebb8..2f22a75 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -18,62 +18,110 @@
package org.apache.flink.api.java;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.util.InstantiationUtil;
+
import org.objectweb.asm.ClassReader;
import org.objectweb.asm.ClassVisitor;
import org.objectweb.asm.MethodVisitor;
import org.objectweb.asm.Opcodes;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Field;
+/**
+ * The closure cleaner is a utility that tries to truncate the closure (enclosing instance)
+ * of non-static inner classes (created for inline transformation functions). That makes non-static
+ * inner classes in many cases serializable, where Java's default behavior renders them non-serializable
+ * without good reason.
+ */
@Internal
public class ClosureCleaner {
+
private static Logger LOG = LoggerFactory.getLogger(ClosureCleaner.class);
-
- private static ClassReader getClassReader(Class<?> cls) {
- String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
- try {
- return new ClassReader(cls.getResourceAsStream(className));
- } catch (IOException e) {
- throw new RuntimeException("Could not create ClassReader: " + e);
- }
- }
-
+
+ /**
+ * Tries to clean the closure of the given object, if the object is a non-static inner
+ * class.
+ *
+ * @param func The object whose closure should be cleaned.
+ * @param checkSerializable Flag to indicate whether serializability should be checked after
+ * the closure cleaning attempt.
+ *
+ * @throws InvalidProgramException Thrown, if 'checkSerializable' is true, and the object was
+ * not serializable after the closure cleaning.
+ *
+ * @throws RuntimeException A RuntimeException may be thrown, if the code of the class could not
+ * be loaded, in order to process during teh closure cleaning.
+ */
public static void clean(Object func, boolean checkSerializable) {
- Class<?> cls = func.getClass();
+ if (func == null) {
+ return;
+ }
+
+ final Class<?> cls = func.getClass();
// First find the field name of the "this$0" field, this can
- // be "field$x" depending on the nesting
+ // be "this$x" depending on the nesting
+ boolean closureAccessed = false;
+
for (Field f: cls.getDeclaredFields()) {
if (f.getName().startsWith("this$")) {
- // found our field:
- cleanThis0(func, cls, f.getName());
+ // found a closure referencing field - now try to clean
+ closureAccessed |= cleanThis0(func, cls, f.getName());
}
}
-
+
if (checkSerializable) {
- ensureSerializable(func);
+ try {
+ InstantiationUtil.serializeObject(func);
+ }
+ catch (Exception e) {
+ String functionType = getSuperClassOrInterfaceName(func.getClass());
+
+ String msg = functionType == null ?
+ (func + " is not serializable.") :
+ ("The implementation of the " + functionType + " is not serializable.");
+
+
+ if (closureAccessed) {
+ msg += " The implementation accesses fields of its enclosing class, which is " +
+ "a common reason for non-serializability. " +
+ "A common solution is to make the function a proper (non-inner) class, or" +
+ "a static inner class.";
+ } else {
+ msg += " The object probably contains or references non serializable fields.";
+ }
+
+ throw new InvalidProgramException(msg, e);
+ }
}
}
- private static void cleanThis0(Object func, Class<?> cls, String this0Name) {
-
+ public static void ensureSerializable(Object obj) {
+ try {
+ InstantiationUtil.serializeObject(obj);
+ } catch (Exception e) {
+ throw new InvalidProgramException("Object " + obj + " is not serializable", e);
+ }
+ }
+
+ private static boolean cleanThis0(Object func, Class<?> cls, String this0Name) {
+
This0AccessFinder this0Finder = new This0AccessFinder(this0Name);
-
getClassReader(cls).accept(this0Finder, 0);
-
-
+
+ final boolean accessesClosure = this0Finder.isThis0Accessed();
+
if (LOG.isDebugEnabled()) {
- LOG.debug(this0Name + " is accessed: " + this0Finder.isThis0Accessed());
+ LOG.debug(this0Name + " is accessed: " + accessesClosure);
}
- if (!this0Finder.isThis0Accessed()) {
+ if (!accessesClosure) {
Field this0;
try {
this0 = func.getClass().getDeclaredField(this0Name);
@@ -81,30 +129,54 @@ public class ClosureCleaner {
// has no this$0, just return
throw new RuntimeException("Could not set " + this0Name + ": " + e);
}
- this0.setAccessible(true);
+
try {
+ this0.setAccessible(true);
this0.set(func, null);
- } catch (IllegalAccessException e) {
+ }
+ catch (Exception e) {
// should not happen, since we use setAccessible
- throw new RuntimeException("Could not set " + this0Name + ": " + e);
+ throw new RuntimeException("Could not set " + this0Name + " to null. " + e.getMessage(), e);
}
}
+
+ return accessesClosure;
}
-
-
- public static void ensureSerializable(Object obj) {
+
+ private static ClassReader getClassReader(Class<?> cls) {
+ String className = cls.getName().replaceFirst("^.*\\.", "") + ".class";
try {
- InstantiationUtil.serializeObject(obj);
- } catch (Exception e) {
- throw new InvalidProgramException("Object " + obj + " not serializable", e);
+ return new ClassReader(cls.getResourceAsStream(className));
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create ClassReader: " + e.getMessage(), e);
+ }
+ }
+
+
+ private static String getSuperClassOrInterfaceName(Class<?> cls) {
+ Class<?> superclass = cls.getSuperclass();
+ if (superclass.getName().startsWith("org.apache.flink")) {
+ return superclass.getSimpleName();
+ } else {
+ for (Class<?> inFace : cls.getInterfaces()) {
+ if (inFace.getName().startsWith("org.apache.flink")) {
+ return inFace.getSimpleName();
+ }
+ }
+ return null;
}
}
-
}
+/**
+ * This visitor walks methods and finds accesses to the field with the reference to
+ * the enclosing class.
+ */
class This0AccessFinder extends ClassVisitor {
- private boolean isThis0Accessed = false;
- private String this0Name;
+
+ private final String this0Name;
+ private boolean isThis0Accessed;
+
public This0AccessFinder(String this0Name) {
super(Opcodes.ASM5);