You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/23 13:20:42 UTC

[flink] branch release-1.9 updated (2ee6373 -> 7b62630)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 2ee6373  [FLINK-13074][table-planner] Fix PartitionableTableSink doesn't work for flink and blink planner
     new 56a2742  [FLINK-13369] Track references of already visited object in ClosureCleaner
     new 7b62630  [hotfix] fix checkstyle

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/api/java/ClosureCleaner.java  | 13 ++++++++++-
 .../api/java/functions/ClosureCleanerTest.java     | 25 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)


[flink] 01/02: [FLINK-13369] Track references of already visited object in ClosureCleaner

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 56a2742c5d0b482170b21236419c5da1bb3a8565
Author: David Moravek <da...@firma.seznam.cz>
AuthorDate: Tue Jul 23 14:56:26 2019 +0200

    [FLINK-13369] Track references of already visited object in ClosureCleaner
---
 .../org/apache/flink/api/java/ClosureCleaner.java  | 13 ++++++++++-
 .../api/java/functions/ClosureCleanerTest.java     | 25 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index f9d7ab0..4c54291 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -37,6 +37,9 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
 
 /**
  * The closure cleaner is a utility that tries to truncate the closure (enclosing instance)
@@ -65,10 +68,18 @@ public class ClosureCleaner {
 	 *                          be loaded, in order to process during the closure cleaning.
 	 */
 	public static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable) {
+		clean(func, level, checkSerializable, Collections.newSetFromMap(new IdentityHashMap<>()));
+	}
+
+	private static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable, Set<Object> visited) {
 		if (func == null) {
 			return;
 		}
 
+		if (!visited.add(func)) {
+			return;
+		}
+
 		final Class<?> cls = func.getClass();
 
 		if (ClassUtils.isPrimitiveOrWrapper(cls)) {
@@ -112,7 +123,7 @@ public class ClosureCleaner {
 						LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
 					}
 
-					clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+					clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
 				}
 			}
 		}
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 82a08b1..3c9775a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.functions;
 
+import java.util.function.Supplier;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -133,6 +134,12 @@ public class ClosureCleanerTest {
 		Assert.assertEquals(result, 4);
 	}
 
+	@Test
+	public void testSelfReferencingClean() {
+		final NestedSelfReferencing selfReferencing = new NestedSelfReferencing();
+		ClosureCleaner.clean(selfReferencing, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+	}
+
 	class InnerCustomMap implements MapFunction<Integer, Integer> {
 
 		@Override
@@ -421,3 +428,21 @@ class OuterMapCreator implements MapCreator {
 	}
 }
 
+@FunctionalInterface
+interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+
+}
+
+class NestedSelfReferencing implements Serializable {
+
+	private final SerializableSupplier<NestedSelfReferencing> cycle;
+
+	NestedSelfReferencing() {
+		this.cycle = () -> this;
+	}
+
+	public SerializableSupplier<NestedSelfReferencing> getCycle() {
+		return cycle;
+	}
+}
+


[flink] 02/02: [hotfix] fix checkstyle

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7b62630a56b057bbb339d3f0e69cba649bec341f
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Jul 23 15:09:35 2019 +0200

    [hotfix] fix checkstyle
---
 .../java/org/apache/flink/api/java/functions/ClosureCleanerTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 3c9775a..3ff1f7c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.functions;
 
-import java.util.function.Supplier;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -32,6 +31,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.function.Supplier;
 
 /**
  * Tests for {@link ClosureCleaner}.