You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2019/07/25 15:59:56 UTC

[flink] branch release-1.9 updated: [FLINK-13367] Recognize writeReplace in ClosureCleaner

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 6f8bb5e  [FLINK-13367] Recognize writeReplace in ClosureCleaner
6f8bb5e is described below

commit 6f8bb5e2316f5e9e25092945231f152f0a62c7c6
Author: David Moravek <da...@firma.seznam.cz>
AuthorDate: Mon Jul 22 13:59:08 2019 +0200

    [FLINK-13367] Recognize writeReplace in ClosureCleaner
    
    This closes #9201
---
 .../org/apache/flink/api/java/ClosureCleaner.java  |   5 +
 .../api/java/functions/ClosureCleanerTest.java     | 101 ++++++++++++++++++---
 2 files changed, 93 insertions(+), 13 deletions(-)

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index 4c54291..b47479f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -165,6 +165,11 @@ public class ClosureCleaner {
 			return true;
 		} catch (NoSuchMethodException ignored) {}
 
+		try {
+			cls.getDeclaredMethod("writeReplace");
+			return true;
+		} catch (NoSuchMethodException ignored) {}
+
 		return Externalizable.class.isAssignableFrom(cls);
 	}
 
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 3ff1f7c..7b93e8a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -217,6 +217,21 @@ public class ClosureCleanerTest {
 
 		ClosureCleaner.ensureSerializable(recursiveClass);
 	}
+
+	@Test
+	public void testWriteReplace() {
+		WithWriteReplace.SerializablePayload writeReplace =
+			new WithWriteReplace.SerializablePayload(new WithWriteReplace.Payload("text"));
+		Assert.assertEquals("text", writeReplace.get().getRaw());
+		ClosureCleaner.clean(writeReplace, ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL, true);
+	}
+
+	@Test
+	public void testWriteReplaceRecursive() {
+		WithWriteReplace writeReplace = new WithWriteReplace(new WithWriteReplace.Payload("text"));
+		Assert.assertEquals("text", writeReplace.getPayload().getRaw());
+		ClosureCleaner.clean(writeReplace, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+	}
 }
 
 class CustomMap implements MapFunction<Integer, Integer> {
@@ -234,10 +249,11 @@ class CustomMap implements MapFunction<Integer, Integer> {
 // top level class
 class ComplexMap implements MapFunction<Integer, Integer> {
 
-	static MapFunction<Integer, Integer> map1;
-	transient MapFunction<Integer, Integer> map2;
-	CustomMap map3;
-	LocalMap map4;
+	private static MapFunction<Integer, Integer> map1;
+
+	private transient MapFunction<Integer, Integer> map2;
+	private CustomMap map3;
+	private LocalMap map4;
 
 	class LocalMap implements MapFunction<Integer, Integer> {
 
@@ -253,7 +269,7 @@ class ComplexMap implements MapFunction<Integer, Integer> {
 		}
 	}
 
-	public ComplexMap(MapFunction<Integer, Integer> map2) {
+	ComplexMap(MapFunction<Integer, Integer> map2) {
 		map1 = map2;
 		this.map2 = map2;
 		this.map3 = new CustomMap();
@@ -267,12 +283,13 @@ class ComplexMap implements MapFunction<Integer, Integer> {
 }
 
 class RecursiveClass implements Serializable {
-	RecursiveClass recurse;
 
-	public RecursiveClass() {
+	private RecursiveClass recurse;
+
+	RecursiveClass() {
 	}
 
-	public RecursiveClass(RecursiveClass recurse) {
+	RecursiveClass(RecursiveClass recurse) {
 		this.recurse = recurse;
 	}
 }
@@ -283,8 +300,9 @@ interface MapCreator {
 
 class WrapperMapFunction implements MapFunction<Integer, Integer> {
 
-	MapFunction<Integer, Integer> innerMapFuc;
-	public WrapperMapFunction(MapFunction<Integer, Integer> mapFunction) {
+	private MapFunction<Integer, Integer> innerMapFuc;
+
+	WrapperMapFunction(MapFunction<Integer, Integer> mapFunction) {
 		innerMapFuc = mapFunction;
 	}
 
@@ -314,7 +332,7 @@ class SerializableMapCreator implements MapCreator, Serializable {
 
 	private int add = 0;
 
-	public SerializableMapCreator(int add) {
+	SerializableMapCreator(int add) {
 		this.add = add;
 	}
 
@@ -336,7 +354,7 @@ class NestedSerializableMapCreator implements MapCreator, Serializable {
 	private int add = 0;
 	private InnerSerializableMapCreator inner;
 
-	public NestedSerializableMapCreator(int add) {
+	NestedSerializableMapCreator(int add) {
 		this.add = add;
 		this.inner = new InnerSerializableMapCreator();
 	}
@@ -367,7 +385,7 @@ class NestedNonSerializableMapCreator implements MapCreator {
 	private int add = 0;
 	private InnerSerializableMapCreator inner;
 
-	public NestedNonSerializableMapCreator(int add) {
+	NestedNonSerializableMapCreator(int add) {
 		this.add = add;
 		this.inner = new InnerSerializableMapCreator();
 	}
@@ -446,3 +464,60 @@ class NestedSelfReferencing implements Serializable {
 	}
 }
 
+class WithWriteReplace implements Serializable {
+
+	private final SerializablePayload serializablePayload;
+
+	WithWriteReplace(Payload payload) {
+		this.serializablePayload = new SerializablePayload(payload);
+	}
+
+	Payload getPayload() {
+		return serializablePayload.get();
+	}
+
+	static class Payload {
+
+		private final String raw;
+
+		Payload(String raw) {
+			this.raw = raw;
+		}
+
+		String getRaw() {
+			return raw;
+		}
+	}
+
+	static class SerializablePayload implements Serializable {
+
+		private final Payload payload;
+
+		SerializablePayload(Payload payload) {
+			this.payload = payload;
+		}
+
+		private Object writeReplace() {
+			return new SerializedPayload(payload.getRaw());
+		}
+
+		Payload get() {
+			return payload;
+		}
+	}
+
+	private static class SerializedPayload implements Serializable {
+
+		private final String raw;
+
+		private SerializedPayload(String raw) {
+			this.raw = raw;
+		}
+
+		private Object readResolve() throws IOException, ClassNotFoundException {
+			return new Payload(raw);
+		}
+	}
+
+}
+