You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/05/08 15:08:42 UTC
apex-malhar git commit: APEXMALHAR-2481 support lambda expressions
with high level API
Repository: apex-malhar
Updated Branches:
refs/heads/master 09a65c2f9 -> 8eb750053
APEXMALHAR-2481 support lambda expressions with high level API
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8eb75005
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8eb75005
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8eb75005
Branch: refs/heads/master
Commit: 8eb7500537f1491ce35e5a647046c73519948803
Parents: 09a65c2
Author: Thomas Weise <th...@apache.org>
Authored: Sat May 6 23:24:44 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sat May 6 23:24:44 2017 -0700
----------------------------------------------------------------------
.../apex/malhar/lib/function/Function.java | 2 +-
.../malhar/lib/function/FunctionOperator.java | 20 ++++++++++++++------
2 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8eb75005/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
index 0d43cd2..6d2226c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
@@ -35,7 +35,7 @@ import com.datatorrent.lib.util.KeyValPair;
* @since 3.4.0
*/
@InterfaceStability.Evolving
-public interface Function
+public interface Function extends java.io.Serializable
{
/**
* If the {@link Function} implements this interface.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8eb75005/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
index b6190a0..6f06302 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
@@ -30,8 +30,11 @@ import org.apache.apex.malhar.lib.utils.ByteArrayClassLoader;
import org.apache.apex.malhar.lib.utils.TupleUtil;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.classification.InterfaceStability;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
@@ -55,7 +58,12 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
protected transient FUNCTION statelessF;
- protected FUNCTION statefulF;
+ /**
+ * Kryo cannot handle fields that reference ({@link java.io.Serializable}) lambda classes.
+ * Wrap the reference to keep Kryo happy and delegate to Java serialization.
+ */
+ @Bind(JavaSerializer.class)
+ protected final MutableObject<FUNCTION> statefulF = new MutableObject<>();
protected boolean stateful = false;
@@ -75,7 +83,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
} else if (f instanceof Function.Stateful) {
statelessF = f;
} else {
- statefulF = f;
+ statefulF.setValue(f);
stateful = true;
}
}
@@ -171,7 +179,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
private void readFunction()
{
try {
- if (statelessF != null || statefulF != null) {
+ if (statelessF != null || statefulF.getValue() != null) {
return;
}
DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass));
@@ -199,7 +207,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
{
readFunction();
if (stateful) {
- return statefulF;
+ return statefulF.getValue();
} else {
return statelessF;
}
@@ -217,12 +225,12 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
public FUNCTION getStatefulF()
{
- return statefulF;
+ return statefulF.getValue();
}
public void setStatefulF(FUNCTION statefulF)
{
- this.statefulF = statefulF;
+ this.statefulF.setValue(statefulF);
}
public boolean isStateful()