You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/05/08 15:08:42 UTC

apex-malhar git commit: APEXMALHAR-2481 support lambda expressions with high level API

Repository: apex-malhar
Updated Branches:
  refs/heads/master 09a65c2f9 -> 8eb750053


APEXMALHAR-2481 support lambda expressions with high level API


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8eb75005
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8eb75005
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8eb75005

Branch: refs/heads/master
Commit: 8eb7500537f1491ce35e5a647046c73519948803
Parents: 09a65c2
Author: Thomas Weise <th...@apache.org>
Authored: Sat May 6 23:24:44 2017 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sat May 6 23:24:44 2017 -0700

----------------------------------------------------------------------
 .../apex/malhar/lib/function/Function.java      |  2 +-
 .../malhar/lib/function/FunctionOperator.java   | 20 ++++++++++++++------
 2 files changed, 15 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8eb75005/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
index 0d43cd2..6d2226c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/function/Function.java
@@ -35,7 +35,7 @@ import com.datatorrent.lib.util.KeyValPair;
  * @since 3.4.0
  */
 @InterfaceStability.Evolving
-public interface Function
+public interface Function extends java.io.Serializable
 {
   /**
    * If the {@link Function} implements this interface.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8eb75005/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
index b6190a0..6f06302 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/function/FunctionOperator.java
@@ -30,8 +30,11 @@ import org.apache.apex.malhar.lib.utils.ByteArrayClassLoader;
 import org.apache.apex.malhar.lib.utils.TupleUtil;
 import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes;
@@ -55,7 +58,12 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
 
   protected transient FUNCTION statelessF;
 
-  protected FUNCTION statefulF;
+  /**
+   * Kryo cannot handle fields that reference ({@link java.io.Serializable}) lambda classes.
+   * Wrap the reference to keep Kryo happy and delegate to Java serialization.
+   */
+  @Bind(JavaSerializer.class)
+  protected final MutableObject<FUNCTION> statefulF = new MutableObject<>();
 
   protected boolean stateful = false;
 
@@ -75,7 +83,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
     } else if (f instanceof Function.Stateful) {
       statelessF = f;
     } else {
-      statefulF = f;
+      statefulF.setValue(f);
       stateful = true;
     }
   }
@@ -171,7 +179,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
   private void readFunction()
   {
     try {
-      if (statelessF != null || statefulF != null) {
+      if (statelessF != null || statefulF.getValue() != null) {
         return;
       }
       DataInputStream input = new DataInputStream(new ByteArrayInputStream(annonymousFunctionClass));
@@ -199,7 +207,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
   {
     readFunction();
     if (stateful) {
-      return statefulF;
+      return statefulF.getValue();
     } else {
       return statelessF;
     }
@@ -217,12 +225,12 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato
 
   public FUNCTION getStatefulF()
   {
-    return statefulF;
+    return statefulF.getValue();
   }
 
   public void setStatefulF(FUNCTION statefulF)
   {
-    this.statefulF = statefulF;
+    this.statefulF.setValue(statefulF);
   }
 
   public boolean isStateful()