You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/07/23 13:23:21 UTC

[flink] branch release-1.8 updated (91d036f -> 480875f)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 91d036f  [FLINK-12741] [docs] Update Kafka producer fault tolerance guarantees
     new 616d1b8  [FLINK-13369] Track references of already visited object in ClosureCleaner
     new 480875f  [hotfix] fix checkstyle

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/api/java/ClosureCleaner.java  | 13 ++++++++++-
 .../api/java/functions/ClosureCleanerTest.java     | 25 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)


[flink] 01/02: [FLINK-13369] Track references of already visited object in ClosureCleaner

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 616d1b8f41eb61be31302719d0163aac0887643a
Author: David Moravek <da...@firma.seznam.cz>
AuthorDate: Tue Jul 23 14:56:26 2019 +0200

    [FLINK-13369] Track references of already visited object in ClosureCleaner
---
 .../org/apache/flink/api/java/ClosureCleaner.java  | 13 ++++++++++-
 .../api/java/functions/ClosureCleanerTest.java     | 25 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index 101278e..38a616b 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -37,6 +37,9 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Set;
 
 /**
  * The closure cleaner is a utility that tries to truncate the closure (enclosing instance)
@@ -65,10 +68,18 @@ public class ClosureCleaner {
 	 *                          be loaded, in order to process during the closure cleaning.
 	 */
 	public static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable) {
+		clean(func, level, checkSerializable, Collections.newSetFromMap(new IdentityHashMap<>()));
+	}
+
+	private static void clean(Object func, ExecutionConfig.ClosureCleanerLevel level, boolean checkSerializable, Set<Object> visited) {
 		if (func == null) {
 			return;
 		}
 
+		if (!visited.add(func)) {
+			return;
+		}
+
 		final Class<?> cls = func.getClass();
 
 		if (ClassUtils.isPrimitiveOrWrapper(cls)) {
@@ -112,7 +123,7 @@ public class ClosureCleaner {
 						LOG.debug("Dig to clean the {}", fieldObject.getClass().getName());
 					}
 
-					clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+					clean(fieldObject, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true, visited);
 				}
 			}
 		}
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 82a08b1..3c9775a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.functions;
 
+import java.util.function.Supplier;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -133,6 +134,12 @@ public class ClosureCleanerTest {
 		Assert.assertEquals(result, 4);
 	}
 
+	@Test
+	public void testSelfReferencingClean() {
+		final NestedSelfReferencing selfReferencing = new NestedSelfReferencing();
+		ClosureCleaner.clean(selfReferencing, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+	}
+
 	class InnerCustomMap implements MapFunction<Integer, Integer> {
 
 		@Override
@@ -421,3 +428,21 @@ class OuterMapCreator implements MapCreator {
 	}
 }
 
+@FunctionalInterface
+interface SerializableSupplier<T> extends Supplier<T>, Serializable {
+
+}
+
+class NestedSelfReferencing implements Serializable {
+
+	private final SerializableSupplier<NestedSelfReferencing> cycle;
+
+	NestedSelfReferencing() {
+		this.cycle = () -> this;
+	}
+
+	public SerializableSupplier<NestedSelfReferencing> getCycle() {
+		return cycle;
+	}
+}
+


[flink] 02/02: [hotfix] fix checkstyle

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 480875f045a9777877ed1a90f9e0c6e01b7e03c9
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Jul 23 15:09:35 2019 +0200

    [hotfix] fix checkstyle
---
 .../java/org/apache/flink/api/java/functions/ClosureCleanerTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 3c9775a..3ff1f7c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.functions;
 
-import java.util.function.Supplier;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -32,6 +31,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
+import java.util.function.Supplier;
 
 /**
  * Tests for {@link ClosureCleaner}.