You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/23 13:20:43 UTC
[flink] 01/02: [FLINK-13369] Track references of already visited
object in ClosureCleaner
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 56a2742c5d0b482170b21236419c5da1bb3a8565
Author: David Moravek <da...@firma.seznam.cz>
AuthorDate: Tue Jul 23 14:56:26 2019 +0200
[FLINK-13369] Track references of already visited object in ClosureCleaner
---
.../org/apache/flink/api/java/ClosureCleaner.java | 13 ++++++++++-
.../api/java/functions/ClosureCleanerTest.java | 25 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index f9d7ab0..4c54291 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -37,6 +37,9 @@ import java.io.IOException;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
/**
* The closure cleaner is a utility that tries to truncate the closure (enclosing instance)
@@ -65,10 +68,18 @@ public class ClosureCleaner {
* be loaded, in order to process during the closure cleaning.
*/
public static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable) {
+ clean(func, level, checkSerializable, Collections.newSetFromMap(new IdentityHashMap<>()));
+ }
+
+ private static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable, Set<Object> visited) {
if (func == null) {
return;
}
+ if (!visited.add(func)) {
+ return;
+ }
+
final Class<?> cls = func.getClass();
if (ClassUtils.isPrimitiveOrWrapper(cls)) {
@@ -112,7 +123,7 @@ public class ClosureCleaner {
LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
}
- clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
}
}
}
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 82a08b1..3c9775a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.functions;
+import java.util.function.Supplier;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.MapFunction;
@@ -133,6 +134,12 @@ public class ClosureCleanerTest {
Assert.assertEquals(result, 4);
}
+ @Test
+ public void testSelfReferencingClean() {
+ final NestedSelfReferencing selfReferencing = new NestedSelfReferencing();
+ ClosureCleaner.clean(selfReferencing, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ }
+
class InnerCustomMap implements MapFunction<Integer, Integer> {
@Override
@@ -421,3 +428,21 @@ class OuterMapCreator implements MapCreator {
}
}
+@FunctionalInterface
+interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+
+}
+
+class NestedSelfReferencing implements Serializable {
+
+ private final SerializableSupplier<NestedSelfReferencing> cycle;
+
+ NestedSelfReferencing() {
+ this.cycle = () -> this;
+ }
+
+ public SerializableSupplier<NestedSelfReferencing> getCycle() {
+ return cycle;
+ }
+}
+