You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2019/07/25 15:59:56 UTC
[flink] branch release-1.9 updated: [FLINK-13367] Recognize
writeReplace in ClosureCleaner
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 6f8bb5e [FLINK-13367] Recognize writeReplace in ClosureCleaner
6f8bb5e is described below
commit 6f8bb5e2316f5e9e25092945231f152f0a62c7c6
Author: David Moravek <da...@firma.seznam.cz>
AuthorDate: Mon Jul 22 13:59:08 2019 +0200
[FLINK-13367] Recognize writeReplace in ClosureCleaner
This closes #9201
---
.../org/apache/flink/api/java/ClosureCleaner.java | 5 +
.../api/java/functions/ClosureCleanerTest.java | 101 ++++++++++++++++++---
2 files changed, 93 insertions(+), 13 deletions(-)
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index 4c54291..b47479f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -165,6 +165,11 @@ public class ClosureCleaner {
return true;
} catch (NoSuchMethodException ignored) {}
+ try {
+ cls.getDeclaredMethod("writeReplace");
+ return true;
+ } catch (NoSuchMethodException ignored) {}
+
return Externalizable.class.isAssignableFrom(cls);
}
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
index 3ff1f7c..7b93e8a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/ClosureCleanerTest.java
@@ -217,6 +217,21 @@ public class ClosureCleanerTest {
ClosureCleaner.ensureSerializable(recursiveClass);
}
+
+ @Test
+ public void testWriteReplace() {
+ WithWriteReplace.SerializablePayload writeReplace =
+ new WithWriteReplace.SerializablePayload(new WithWriteReplace.Payload("text"));
+ Assert.assertEquals("text", writeReplace.get().getRaw());
+ ClosureCleaner.clean(writeReplace, ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL, true);
+ }
+
+ @Test
+ public void testWriteReplaceRecursive() {
+ WithWriteReplace writeReplace = new WithWriteReplace(new WithWriteReplace.Payload("text"));
+ Assert.assertEquals("text", writeReplace.getPayload().getRaw());
+ ClosureCleaner.clean(writeReplace, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+ }
}
class CustomMap implements MapFunction<Integer, Integer> {
@@ -234,10 +249,11 @@ class CustomMap implements MapFunction<Integer, Integer> {
// top level class
class ComplexMap implements MapFunction<Integer, Integer> {
- static MapFunction<Integer, Integer> map1;
- transient MapFunction<Integer, Integer> map2;
- CustomMap map3;
- LocalMap map4;
+ private static MapFunction<Integer, Integer> map1;
+
+ private transient MapFunction<Integer, Integer> map2;
+ private CustomMap map3;
+ private LocalMap map4;
class LocalMap implements MapFunction<Integer, Integer> {
@@ -253,7 +269,7 @@ class ComplexMap implements MapFunction<Integer, Integer> {
}
}
- public ComplexMap(MapFunction<Integer, Integer> map2) {
+ ComplexMap(MapFunction<Integer, Integer> map2) {
map1 = map2;
this.map2 = map2;
this.map3 = new CustomMap();
@@ -267,12 +283,13 @@ class ComplexMap implements MapFunction<Integer, Integer> {
}
class RecursiveClass implements Serializable {
- RecursiveClass recurse;
- public RecursiveClass() {
+ private RecursiveClass recurse;
+
+ RecursiveClass() {
}
- public RecursiveClass(RecursiveClass recurse) {
+ RecursiveClass(RecursiveClass recurse) {
this.recurse = recurse;
}
}
@@ -283,8 +300,9 @@ interface MapCreator {
class WrapperMapFunction implements MapFunction<Integer, Integer> {
- MapFunction<Integer, Integer> innerMapFuc;
- public WrapperMapFunction(MapFunction<Integer, Integer> mapFunction) {
+ private MapFunction<Integer, Integer> innerMapFuc;
+
+ WrapperMapFunction(MapFunction<Integer, Integer> mapFunction) {
innerMapFuc = mapFunction;
}
@@ -314,7 +332,7 @@ class SerializableMapCreator implements MapCreator, Serializable {
private int add = 0;
- public SerializableMapCreator(int add) {
+ SerializableMapCreator(int add) {
this.add = add;
}
@@ -336,7 +354,7 @@ class NestedSerializableMapCreator implements MapCreator, Serializable {
private int add = 0;
private InnerSerializableMapCreator inner;
- public NestedSerializableMapCreator(int add) {
+ NestedSerializableMapCreator(int add) {
this.add = add;
this.inner = new InnerSerializableMapCreator();
}
@@ -367,7 +385,7 @@ class NestedNonSerializableMapCreator implements MapCreator {
private int add = 0;
private InnerSerializableMapCreator inner;
- public NestedNonSerializableMapCreator(int add) {
+ NestedNonSerializableMapCreator(int add) {
this.add = add;
this.inner = new InnerSerializableMapCreator();
}
@@ -446,3 +464,60 @@ class NestedSelfReferencing implements Serializable {
}
}
+class WithWriteReplace implements Serializable {
+
+ private final SerializablePayload serializablePayload;
+
+ WithWriteReplace(Payload payload) {
+ this.serializablePayload = new SerializablePayload(payload);
+ }
+
+ Payload getPayload() {
+ return serializablePayload.get();
+ }
+
+ static class Payload {
+
+ private final String raw;
+
+ Payload(String raw) {
+ this.raw = raw;
+ }
+
+ String getRaw() {
+ return raw;
+ }
+ }
+
+ static class SerializablePayload implements Serializable {
+
+ private final Payload payload;
+
+ SerializablePayload(Payload payload) {
+ this.payload = payload;
+ }
+
+ private Object writeReplace() {
+ return new SerializedPayload(payload.getRaw());
+ }
+
+ Payload get() {
+ return payload;
+ }
+ }
+
+ private static class SerializedPayload implements Serializable {
+
+ private final String raw;
+
+ private SerializedPayload(String raw) {
+ this.raw = raw;
+ }
+
+ private Object readResolve() throws IOException, ClassNotFoundException {
+ return new Payload(raw);
+ }
+ }
+
+}
+