You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/23 13:20:43 UTC

[flink] 01/02: [FLINK-13369] Track references of already visited object in ClosureCleaner

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 56a2742c5d0b482170b21236419c5da1bb3a8565
Author: David Moravek <da...@firma.seznam.cz>
AuthorDate: Tue Jul 23 14:56:26 2019 +0200

    [FLINK-13369] Track references of already visited object in ClosureCleaner
---
 .../org/apache/flink/api/java/ClosureCleaner.java  | 13 ++++++++++-
 .../api/java/functions/ClosureCleanerTest.java     | 25 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index f9d7ab0..4c54291 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -37,6 +37,9 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
 
 /**
  * The closure cleaner is a utility that tries to truncate the closure (enclosing instance)
@@ -65,10 +68,18 @@ public class ClosureCleaner {
 	 *                          be loaded, in order to process during the closure cleaning.
 	 */
 	public static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable) {
+		clean(func, level, checkSerializable, Collections.newSetFromMap(new IdentityHashMap<>()));
+	}
+
+	private static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable, Set<Object> visited) {
 		if (func == null) {
 			return;
 		}
 
+		if (!visited.add(func)) {
+			return;
+		}
+
 		final Class<?> cls = func.getClass();
 
 		if (ClassUtils.isPrimitiveOrWrapper(cls)) {
@@ -112,7 +123,7 @@ public class ClosureCleaner {
 						LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
 					}
 
-					clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+					clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
 				}
 			}
 		}
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 82a08b1..3c9775a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.functions;
 
+import java.util.function.Supplier;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -133,6 +134,12 @@ public class ClosureCleanerTest {
 		Assert.assertEquals(result, 4);
 	}
 
+	@Test
+	public void testSelfReferencingClean() {
+		final NestedSelfReferencing selfReferencing = new NestedSelfReferencing();
+		ClosureCleaner.clean(selfReferencing, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+	}
+
 	class InnerCustomMap implements MapFunction<Integer, Integer> {
 
 		@Override
@@ -421,3 +428,21 @@ class OuterMapCreator implements MapCreator {
 	}
 }
 
+@FunctionalInterface
+interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+
+}
+
+class NestedSelfReferencing implements Serializable {
+
+	private final SerializableSupplier<NestedSelfReferencing> cycle;
+
+	NestedSelfReferencing() {
+		this.cycle = () -> this;
+	}
+
+	public SerializableSupplier<NestedSelfReferencing> getCycle() {
+		return cycle;
+	}
+}
+