You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/13 14:46:16 UTC

flink git commit: [FLINK-1710] [table] Switch compile backend to Janino

Repository: flink
Updated Branches:
  refs/heads/master 92e0c7744 -> f81d9f022


[FLINK-1710] [table] Switch compile backend to Janino

This greatly reduces compile time while still supporting the same
feature set.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f81d9f02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f81d9f02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f81d9f02

Branch: refs/heads/master
Commit: f81d9f02212b05ea7770dc08743c866878f1364c
Parents: 92e0c77
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Apr 7 18:11:16 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Apr 13 14:45:41 2015 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/LICENSE           |   1 +
 flink-dist/src/main/flink-bin/NOTICE            |   8 +
 flink-staging/flink-table/pom.xml               |   6 +
 .../flink/examples/java/JavaTableExample.java   |   1 +
 .../org/apache/flink/api/table/Table.scala      |   2 +-
 .../table/codegen/ExpressionCodeGenerator.scala | 602 +++++++++----------
 .../table/codegen/GenerateBinaryPredicate.scala |  73 ---
 .../codegen/GenerateBinaryResultAssembler.scala |  60 --
 .../api/table/codegen/GenerateFilter.scala      |  82 +++
 .../flink/api/table/codegen/GenerateJoin.scala  | 144 +++++
 .../table/codegen/GenerateResultAssembler.scala |  99 +--
 .../api/table/codegen/GenerateSelect.scala      |  74 +++
 .../table/codegen/GenerateUnaryPredicate.scala  |  67 ---
 .../codegen/GenerateUnaryResultAssembler.scala  |  57 --
 .../flink/api/table/codegen/Indenter.scala      |  54 ++
 .../runtime/ExpressionFilterFunction.scala      |  30 +-
 .../table/runtime/ExpressionJoinFunction.scala  |  56 +-
 .../runtime/ExpressionSelectFunction.scala      |  20 +-
 18 files changed, 753 insertions(+), 683 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index d348bbc..d66922c 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -300,6 +300,7 @@ BSD-style licenses:
 [3-clause BSD license]
  - Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
  - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
+ - Janino (http://docs.codehaus.org/display/JANINO/Home) - Copyright (c) 2001-2010, Arno Unkrig
  
 [BSD-like License]
  - Scala Library (http://www.scala-lang.org/) - Copyright (c) 2002-2014 EPFL, Copyright (c) 2011-2014 Typesafe, Inc.

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-dist/src/main/flink-bin/NOTICE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/NOTICE b/flink-dist/src/main/flink-bin/NOTICE
index cf3f4fe..a36e2a3 100644
--- a/flink-dist/src/main/flink-bin/NOTICE
+++ b/flink-dist/src/main/flink-bin/NOTICE
@@ -190,6 +190,14 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 
+-----------------------------------------------------------------------
+                            Janino
+-----------------------------------------------------------------------
+
+Janino - An embedded Java[TM] compiler
+
+Copyright (c) 2001-2010, Arno Unkrig
+All rights reserved.
 
 -----------------------------------------------------------------------
                          The Netty Project

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml
index dcdbe83..cbd1c47 100644
--- a/flink-staging/flink-table/pom.xml
+++ b/flink-staging/flink-table/pom.xml
@@ -81,6 +81,12 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.codehaus.janino</groupId>
+			<artifactId>janino</artifactId>
+			<version>2.7.5</version>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-tests</artifactId>
 			<version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java b/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
index 1c1fdca..c29fc67 100644
--- a/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
+++ b/flink-staging/flink-table/src/main/java/org/apache/flink/examples/java/JavaTableExample.java
@@ -47,6 +47,7 @@ public class JavaTableExample {
 			return "WC " + word + " " + count;
 		}
 	}
+
 	public static void main(String[] args) throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
 		TableEnvironment tableEnv = new TableEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
index 3ced3a4..83d5239 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala
@@ -216,7 +216,7 @@ case class Table(private[flink] val operation: PlanNode) {
   }
 
   /**
-   * Joins to [[Table]]s. Similar to an SQL join. The fields of the two joined
+   * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined
    * operations must not overlap, use [[as]] to rename fields if necessary. You can use
    * where and select clauses after a join to further specify the behaviour of the join.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
index 708a3fc..fc0abe4 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/ExpressionCodeGenerator.scala
@@ -18,19 +18,18 @@
 package org.apache.flink.api.table.codegen
 
 import java.util.concurrent.atomic.AtomicInteger
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo}
-import org.apache.flink.api.table.{ExpressionException, expressions}
-import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, BasicTypeInfo, TypeInformation}
+
+import org.codehaus.janino.SimpleCompiler
+import org.slf4j.LoggerFactory
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeinfo.{RenamingProxyTypeInfo, RowTypeInfo}
+import org.apache.flink.api.table.{ExpressionException, expressions}
 
 /** Base class for all code generation classes. This provides the functionality for generating
   * code from an [[Expression]] tree. Derived classes must embed this in a lambda function
@@ -48,65 +47,64 @@ abstract class ExpressionCodeGenerator[R](
     cl: ClassLoader) {
   protected val log = LoggerFactory.getLogger(classOf[ExpressionCodeGenerator[_]])
 
-  import scala.reflect.runtime.{universe => ru}
   import scala.reflect.runtime.universe._
+  import scala.reflect.runtime.{universe => ru}
 
   if (cl == null) {
     throw new IllegalArgumentException("ClassLoader must not be null.")
   }
 
-  import scala.tools.reflect.ToolBox
+  val compiler = new SimpleCompiler()
+  compiler.setParentClassLoader(cl)
 
-  protected val (mirror, toolBox) = ReflectionLock.synchronized {
-    val mirror = runtimeMirror(cl)
-    (mirror, mirror.mkToolBox())
-  }
 
   // This is to be implemented by subclasses, we have it like this
   // so that we only call it from here with the Scala Reflection Lock.
   protected def generateInternal(): R
 
   final def generate(): R = {
-    ReflectionLock.synchronized {
-      generateInternal()
-    }
+    generateInternal()
   }
 
-  val cache = mutable.HashMap[Expression, GeneratedExpression]()
-
   protected def generateExpression(expr: Expression): GeneratedExpression = {
-    // doesn't work yet, because we insert the same code twice and reuse variable names
-    //    cache.getOrElseUpdate(expr, generateExpressionInternal(expr))
     generateExpressionInternal(expr)
   }
 
   protected def generateExpressionInternal(expr: Expression): GeneratedExpression = {
     //  protected def generateExpression(expr: Expression): GeneratedExpression = {
-    val nullTerm = freshTermName("isNull")
-    val resultTerm = freshTermName("result")
+    val nullTerm = freshName("isNull")
+    val resultTerm = freshName("result")
 
     // For binary predicates that must only be evaluated when both operands are non-null.
     // This will write to nullTerm and resultTerm, so don't use those term names
     // after using this function
     def generateIfNonNull(left: Expression, right: Expression, resultType: TypeInformation[_])
-                         (expr: (TermName, TermName) => Tree): Seq[Tree] = {
+                         (expr: (String, String) => String): String = {
       val leftCode = generateExpression(left)
       val rightCode = generateExpression(right)
 
+      val leftTpe = typeTermForTypeInfo(left.typeInfo)
+      val rightTpe = typeTermForTypeInfo(right.typeInfo)
+      val resultTpe = typeTermForTypeInfo(resultType)
 
       if (nullCheck) {
-        leftCode.code ++ rightCode.code ++ q"""
-        val $nullTerm = ${leftCode.nullTerm}|| ${rightCode.nullTerm}
-        val $resultTerm = if ($nullTerm) {
-          ${defaultPrimitive(resultType)}
-        } else {
-          ${expr(leftCode.resultTerm, rightCode.resultTerm)}
-        }
-        """.children
+        leftCode.code + "\n" + 
+          rightCode.code + "\n" +
+          s"""
+            |boolean $nullTerm = ${leftCode.nullTerm} || ${rightCode.nullTerm};
+            |$resultTpe $resultTerm;
+            |if ($nullTerm) {
+            |  $resultTerm = ${defaultPrimitive(resultType)}
+            |} else {
+            |  $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)}
+            |}
+          """.stripMargin
       } else {
-        leftCode.code ++ rightCode.code :+ q"""
-        val $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)}
-        """
+        leftCode.code + "\n" +
+          rightCode.code + "\n" +
+          s"""
+            |$resultTpe $resultTerm = ${expr(leftCode.resultTerm, rightCode.resultTerm)};
+          """.stripMargin
       }
     }
 
@@ -114,92 +112,94 @@ abstract class ExpressionCodeGenerator[R](
       case expressions.Naming(namedExpr, _) => namedExpr
       case _ => expr
     }
+    
+    val resultTpe = typeTermForTypeInfo(cleanedExpr.typeInfo)
 
-    val code: Seq[Tree] = cleanedExpr match {
+    val code: String = cleanedExpr match {
 
       case expressions.Literal(null, typeInfo) =>
         if (nullCheck) {
-          q"""
-            val $nullTerm = true
-            val resultTerm = null
-          """.children
+          s"""
+            |boolean $nullTerm = true;
+            |$resultTpe resultTerm = null;
+          """.stripMargin
         } else {
-          Seq( q"""
-            val resultTerm = null
-          """)
+          s"""
+            |$resultTpe resultTerm = null;
+          """.stripMargin
         }
 
       case expressions.Literal(intValue: Int, INT_TYPE_INFO) =>
         if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $intValue
-          """.children
+          s"""
+            |boolean $nullTerm = false;
+            |$resultTpe $resultTerm = $intValue;
+          """.stripMargin
         } else {
-          Seq( q"""
-            val $resultTerm = $intValue
-          """)
+          s"""
+            |$resultTpe $resultTerm = $intValue;
+          """.stripMargin
         }
 
       case expressions.Literal(longValue: Long, LONG_TYPE_INFO) =>
         if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $longValue
-          """.children
+          s"""
+            |boolean $nullTerm = false;
+            |$resultTpe $resultTerm = ${longValue}L;
+          """.stripMargin
         } else {
-          Seq( q"""
-            val $resultTerm = $longValue
-          """)
+          s"""
+            |$resultTpe $resultTerm = ${longValue}L;
+          """.stripMargin
         }
 
 
       case expressions.Literal(doubleValue: Double, DOUBLE_TYPE_INFO) =>
         if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $doubleValue
-          """.children
+          s"""
+            |val $nullTerm = false
+            |$resultTpe $resultTerm = $doubleValue;
+          """.stripMargin
         } else {
-          Seq( q"""
-              val $resultTerm = $doubleValue
-          """)
+          s"""
+            |$resultTpe $resultTerm = $doubleValue;
+          """.stripMargin
         }
 
       case expressions.Literal(floatValue: Float, FLOAT_TYPE_INFO) =>
         if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $floatValue
-          """.children
+          s"""
+            |val $nullTerm = false
+            |$resultTpe $resultTerm = ${floatValue}f;
+          """.stripMargin
         } else {
-          Seq( q"""
-              val $resultTerm = $floatValue
-          """)
+          s"""
+            |$resultTpe $resultTerm = ${floatValue}f;
+          """.stripMargin
         }
 
       case expressions.Literal(strValue: String, STRING_TYPE_INFO) =>
         if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $strValue
-          """.children
+          s"""
+            |val $nullTerm = false
+            |$resultTpe $resultTerm = "$strValue";
+          """.stripMargin
         } else {
-          Seq( q"""
-              val $resultTerm = $strValue
-          """)
+          s"""
+            |$resultTpe $resultTerm = "$strValue";
+          """.stripMargin
         }
 
       case expressions.Literal(boolValue: Boolean, BOOLEAN_TYPE_INFO) =>
         if (nullCheck) {
-          q"""
-            val $nullTerm = false
-            val $resultTerm = $boolValue
-          """.children
+          s"""
+            |val $nullTerm = false
+            |$resultTpe $resultTerm = $boolValue;
+          """.stripMargin
         } else {
-          Seq( q"""
-              val $resultTerm = $boolValue
-          """)
+          s"""
+            $resultTpe $resultTerm = $boolValue;
+          """.stripMargin
         }
 
       case Substring(str, beginIndex, endIndex) =>
@@ -207,120 +207,88 @@ abstract class ExpressionCodeGenerator[R](
         val beginIndexCode = generateExpression(beginIndex)
         val endIndexCode = generateExpression(endIndex)
         if (nullCheck) {
-          strCode.code ++ beginIndexCode.code ++ endIndexCode.code ++ q"""
-            val $nullTerm =
-              ${strCode.nullTerm}|| ${beginIndexCode.nullTerm}|| ${endIndexCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(str.typeInfo)}
-            } else {
-              val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) {
-                 (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm})
+          strCode.code +
+            beginIndexCode.code +
+            endIndexCode.code +
+            s"""
+              boolean $nullTerm =
+                ${strCode.nullTerm} || ${beginIndexCode.nullTerm} || ${endIndexCode.nullTerm};
+              $resultTpe $resultTerm;
+              if ($nullTerm) {
+                $resultTerm = ${defaultPrimitive(str.typeInfo)};
               } else {
-                (${strCode.resultTerm}).substring(
-                  ${beginIndexCode.resultTerm},
-                  ${endIndexCode.resultTerm})
+                if (${endIndexCode.resultTerm} == Int.MaxValue) {
+                   $resultTerm = (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm});
+                } else {
+                  $resultTerm = (${strCode.resultTerm}).substring(
+                    ${beginIndexCode.resultTerm},
+                    ${endIndexCode.resultTerm});
+                }
               }
-            }
-          """.children
+            """.stripMargin
         } else {
-          strCode.code ++ beginIndexCode.code ++ endIndexCode.code :+ q"""
-            val $resultTerm = if (${endIndexCode.resultTerm} == Int.MaxValue) {
-              (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm})
-            } else {
-              (${strCode.resultTerm}).substring(
-                ${beginIndexCode.resultTerm},
-                ${endIndexCode.resultTerm})
-            }
-          """
+          strCode.code +
+            beginIndexCode.code +
+            endIndexCode.code +
+            s"""
+              $resultTpe $resultTerm;
+
+              if (${endIndexCode.resultTerm} == Integer.MAX_VALUE) {
+                $resultTerm = (${strCode.resultTerm}).substring(${beginIndexCode.resultTerm});
+              } else {
+                $resultTerm = (${strCode.resultTerm}).substring(
+                  ${beginIndexCode.resultTerm},
+                  ${endIndexCode.resultTerm});
+              }
+            """
         }
 
       case expressions.Cast(child: Expression, STRING_TYPE_INFO) =>
         val childGen = generateExpression(child)
         val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = if ($nullTerm == null) {
-              null
-            } else {
-              ${childGen.resultTerm}.toString
-            }
-          """.children
+          s"""
+            |boolean $nullTerm = ${childGen.nullTerm};
+            |$resultTpe $resultTerm;
+            |if ($nullTerm == null) {
+            |  $resultTerm = null;
+            |} else {
+            |  $resultTerm = "" + ${childGen.resultTerm};
+            |}
+          """.stripMargin
         } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toString
-          """)
+          s"""
+            |$resultTpe $resultTerm = "" + ${childGen.resultTerm};
+          """.stripMargin
         }
-        childGen.code ++ castCode
+        childGen.code + castCode
 
-      case expressions.Cast(child: Expression, INT_TYPE_INFO) =>
+      case expressions.Cast(child: Expression, tpe: BasicTypeInfo[_]) =>
         val childGen = generateExpression(child)
         val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = ${childGen.resultTerm}.toInt
-          """.children
+          s"""
+            |boolean $nullTerm = ${childGen.nullTerm};
+            |$resultTpe $resultTerm =
+            |  ${tpe.getTypeClass.getCanonicalName}.valueOf(${childGen.resultTerm});
+          """.stripMargin
         } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toInt
-          """)
+          s"""
+            |$resultTpe $resultTerm =
+            |  ${tpe.getTypeClass.getCanonicalName}.valueOf(${childGen.resultTerm});
+          """.stripMargin
         }
-        childGen.code ++ castCode
-
-      case expressions.Cast(child: Expression, LONG_TYPE_INFO) =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = ${childGen.resultTerm}.toLong
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toLong
-          """)
-        }
-        childGen.code ++ castCode
-
-      case expressions.Cast(child: Expression, FLOAT_TYPE_INFO) =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = ${childGen.resultTerm}.toFloat
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toFloat
-          """)
-        }
-        childGen.code ++ castCode
-
-      case expressions.Cast(child: Expression, DOUBLE_TYPE_INFO) =>
-        val childGen = generateExpression(child)
-        val castCode = if (nullCheck) {
-          q"""
-            val $nullTerm = ${childGen.nullTerm}
-            val $resultTerm = ${childGen.resultTerm}.toDouble
-          """.children
-        } else {
-          Seq( q"""
-            val $resultTerm = ${childGen.resultTerm}.toDouble
-          """)
-        }
-        childGen.code ++ castCode
+        childGen.code + castCode
 
       case ResolvedFieldReference(fieldName, fieldTpe: TypeInformation[_]) =>
         inputs find { i => i._2.hasField(fieldName)} match {
           case Some((inputName, inputTpe)) =>
             val fieldCode = getField(newTermName(inputName), inputTpe, fieldName, fieldTpe)
             if (nullCheck) {
-              q"""
-                val $resultTerm = $fieldCode
-                val $nullTerm = $resultTerm == null
-              """.children
+              s"""
+                |$resultTpe $resultTerm = $fieldCode;
+                |boolean $nullTerm = $resultTerm == null;
+              """.stripMargin
             } else {
-              Seq( q"""
-                val $resultTerm = $fieldCode
-              """)
+              s"""$resultTpe $resultTerm = $fieldCode;"""
             }
 
           case None => throw new ExpressionException("Could not get accessor for " + fieldName
@@ -329,184 +297,196 @@ abstract class ExpressionCodeGenerator[R](
 
       case GreaterThan(left, right) =>
         generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm > $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm > $rightTerm"
         }
 
       case GreaterThanOrEqual(left, right) =>
         generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm >= $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm >= $rightTerm"
         }
 
       case LessThan(left, right) =>
         generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm < $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm < $rightTerm"
         }
 
       case LessThanOrEqual(left, right) =>
         generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm <= $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm <= $rightTerm"
         }
 
       case EqualTo(left, right) =>
         generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm == $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm == $rightTerm"
         }
 
       case NotEqualTo(left, right) =>
         generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm != $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm != $rightTerm"
         }
 
       case And(left, right) =>
         generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm && $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm && $rightTerm"
         }
 
       case Or(left, right) =>
         generateIfNonNull(left, right, BOOLEAN_TYPE_INFO) {
-          (leftTerm, rightTerm) => q"$leftTerm || $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm || $rightTerm"
         }
 
       case Plus(left, right) =>
         generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm + $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm + $rightTerm"
         }
 
       case Minus(left, right) =>
         generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm - $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm - $rightTerm"
         }
 
       case Div(left, right) =>
         generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm / $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm / $rightTerm"
         }
 
       case Mul(left, right) =>
         generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm * $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm * $rightTerm"
         }
 
       case Mod(left, right) =>
         generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm % $rightTerm"
+          (leftTerm, rightTerm) => s"$leftTerm % $rightTerm"
         }
 
       case UnaryMinus(child) =>
         val childCode = generateExpression(child)
         if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = -(${childCode.resultTerm})
-            }
-          """.children
+          childCode.code +
+            s"""
+              |boolean $nullTerm = ${childCode.nullTerm};
+              |if ($nullTerm) {
+              |  ${defaultPrimitive(child.typeInfo)};
+              |} else {
+              |  $resultTpe $resultTerm = -(${childCode.resultTerm});
+              |}
+            """.stripMargin
         } else {
-          childCode.code :+ q"""
-              val $resultTerm = -(${childCode.resultTerm})
-          """
+          childCode.code +
+            s"""
+              |$resultTpe $resultTerm = -(${childCode.resultTerm});
+            """.stripMargin
         }
 
       case BitwiseAnd(left, right) =>
         generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm & $rightTerm"
+          (leftTerm, rightTerm) => s"(int) $leftTerm & (int) $rightTerm"
         }
 
       case BitwiseOr(left, right) =>
         generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm | $rightTerm"
+          (leftTerm, rightTerm) => s"(int) $leftTerm | (int) $rightTerm"
         }
 
       case BitwiseXor(left, right) =>
         generateIfNonNull(left, right, expr.typeInfo) {
-          (leftTerm, rightTerm) => q"$leftTerm ^ $rightTerm"
+          (leftTerm, rightTerm) => s"(int) $leftTerm ^ (int) $rightTerm"
         }
 
       case BitwiseNot(child) =>
         val childCode = generateExpression(child)
         if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = ~(${childCode.resultTerm})
-            }
-          """.children
+          childCode.code +
+            s"""
+              |boolean $nullTerm = ${childCode.nullTerm};
+              |if ($nullTerm) {
+              |  ${defaultPrimitive(child.typeInfo)};
+              |} else {
+              |  $resultTpe $resultTerm = ~((int) ${childCode.resultTerm});
+              |}
+            """.stripMargin
         } else {
-          childCode.code :+ q"""
-              val $resultTerm = ~(${childCode.resultTerm})
-          """
+          childCode.code +
+            s"""
+              |$resultTpe $resultTerm = ~((int) ${childCode.resultTerm});
+            """.stripMargin
         }
 
       case Not(child) =>
         val childCode = generateExpression(child)
         if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = !(${childCode.resultTerm})
-            }
-          """.children
+          childCode.code +
+            s"""
+              |boolean $nullTerm = ${childCode.nullTerm};
+              |if ($nullTerm) {
+              |  ${defaultPrimitive(child.typeInfo)};
+              |} else {
+              |  $resultTpe $resultTerm = !(${childCode.resultTerm});
+              |}
+            """.stripMargin
         } else {
-          childCode.code :+ q"""
-              val $resultTerm = !(${childCode.resultTerm})
-          """
+          childCode.code +
+            s"""
+              |$resultTpe $resultTerm = !(${childCode.resultTerm});
+            """.stripMargin
         }
 
       case IsNull(child) =>
         val childCode = generateExpression(child)
         if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = (${childCode.resultTerm}) == null
-            }
-          """.children
+          childCode.code +
+            s"""
+              |boolean $nullTerm = ${childCode.nullTerm};
+              |if ($nullTerm) {
+              |  ${defaultPrimitive(child.typeInfo)};
+              |} else {
+              |  $resultTpe $resultTerm = (${childCode.resultTerm}) == null;
+              |}
+            """.stripMargin
         } else {
-          childCode.code :+ q"""
-              val $resultTerm = (${childCode.resultTerm}) == null
-          """
+          childCode.code +
+            s"""
+              |$resultTpe $resultTerm = (${childCode.resultTerm}) == null;
+            """.stripMargin
         }
 
       case IsNotNull(child) =>
         val childCode = generateExpression(child)
         if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = (${childCode.resultTerm}) != null
-            }
-          """.children
+          childCode.code +
+            s"""
+              |boolean $nullTerm = ${childCode.nullTerm};
+              |if ($nullTerm) {
+              |  ${defaultPrimitive(child.typeInfo)};
+              |} else {
+              |  $resultTpe $resultTerm = (${childCode.resultTerm}) != null;
+              |}
+            """.stripMargin
         } else {
-          childCode.code :+ q"""
-              val $resultTerm = (${childCode.resultTerm}) != null
-          """
+          childCode.code +
+            s"""
+              |$resultTpe $resultTerm = (${childCode.resultTerm}) != null;
+            """.stripMargin
         }
 
       case Abs(child) =>
         val childCode = generateExpression(child)
         if (nullCheck) {
-          childCode.code ++ q"""
-            val $nullTerm = ${childCode.nullTerm}
-            if ($nullTerm) {
-              ${defaultPrimitive(child.typeInfo)}
-            } else {
-              val $resultTerm = Math.abs(${childCode.resultTerm})
-            }
-          """.children
+          childCode.code +
+            s"""
+              |boolean $nullTerm = ${childCode.nullTerm};
+              |if ($nullTerm) {
+              |  ${defaultPrimitive(child.typeInfo)};
+              |} else {
+              |  $resultTpe $resultTerm = Math.abs(${childCode.resultTerm});
+              |}
+            """.stripMargin
         } else {
-          childCode.code :+ q"""
-              val $resultTerm = Math.abs(${childCode.resultTerm})
-          """
+          childCode.code +
+            s"""
+              |$resultTpe $resultTerm = Math.abs(${childCode.resultTerm});
+            """.stripMargin
         }
 
       case _ => throw new ExpressionException("Could not generate code for expression " + expr)
@@ -515,34 +495,33 @@ abstract class ExpressionCodeGenerator[R](
     GeneratedExpression(code, resultTerm, nullTerm)
   }
 
-  case class GeneratedExpression(code: Seq[Tree], resultTerm: TermName, nullTerm: TermName)
+  case class GeneratedExpression(code: String, resultTerm: String, nullTerm: String)
 
-  // We don't have c.freshName
-  // According to http://docs.scala-lang.org/overviews/quasiquotes/hygiene.html
-  // it's coming for 2.11. We can't wait that long...
-  def freshTermName(name: String): TermName = {
-    newTermName(s"$name$$${freshNameCounter.getAndIncrement}")
+  def freshName(name: String): String = {
+    s"$name$$${freshNameCounter.getAndIncrement}"
   }
 
   val freshNameCounter = new AtomicInteger
 
   protected def getField(
-                          inputTerm: TermName,
-                          inputType: CompositeType[_],
-                          fieldName: String,
-                          fieldType: TypeInformation[_]): Tree = {
+    inputTerm: TermName,
+    inputType: CompositeType[_],
+    fieldName: String,
+    fieldType: TypeInformation[_]): String = {
     val accessor = fieldAccessorFor(inputType, fieldName)
+    val fieldTpe = typeTermForTypeInfo(fieldType)
+
     accessor match {
       case ObjectFieldAccessor(fieldName) =>
         val fieldTerm = newTermName(fieldName)
-        q"$inputTerm.$fieldTerm.asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
+        s"($fieldTpe) $inputTerm.$fieldTerm"
 
       case ObjectMethodAccessor(methodName) =>
         val methodTerm = newTermName(methodName)
-        q"$inputTerm.$methodTerm().asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
+        s"($fieldTpe) $inputTerm.$methodTerm()"
 
       case ProductAccessor(i) =>
-        q"$inputTerm.productElement($i).asInstanceOf[${typeTermForTypeInfo(fieldType)}]"
+        s"($fieldTpe) $inputTerm.productElement($i)"
 
     }
   }
@@ -561,7 +540,7 @@ abstract class ExpressionCodeGenerator[R](
         ProductAccessor(elementType.getFieldIndex(fieldName))
 
       case cc: CaseClassTypeInfo[_] =>
-        ObjectFieldAccessor(fieldName)
+        ObjectMethodAccessor(fieldName)
 
       case javaTup: TupleTypeInfo[_] =>
         ObjectFieldAccessor(fieldName)
@@ -576,60 +555,43 @@ abstract class ExpressionCodeGenerator[R](
     }
   }
 
-  protected def defaultPrimitive(tpe: TypeInformation[_]) = tpe match {
-    case BasicTypeInfo.INT_TYPE_INFO => ru.Literal(Constant(-1))
-    case BasicTypeInfo.LONG_TYPE_INFO => ru.Literal(Constant(1L))
-    case BasicTypeInfo.SHORT_TYPE_INFO => ru.Literal(Constant(-1.toShort))
-    case BasicTypeInfo.BYTE_TYPE_INFO => ru.Literal(Constant(-1.toByte))
-    case BasicTypeInfo.FLOAT_TYPE_INFO => ru.Literal(Constant(-1.0.toFloat))
-    case BasicTypeInfo.DOUBLE_TYPE_INFO => ru.Literal(Constant(-1.toDouble))
-    case BasicTypeInfo.BOOLEAN_TYPE_INFO => ru.Literal(Constant(false))
-    case BasicTypeInfo.STRING_TYPE_INFO => ru.Literal(Constant("<empty>"))
-    case BasicTypeInfo.CHAR_TYPE_INFO => ru.Literal(Constant('\0'))
-    case _ => ru.Literal(Constant(null))
+  protected def defaultPrimitive(tpe: TypeInformation[_]): String = tpe match {
+    case BasicTypeInfo.INT_TYPE_INFO => "-1"
+    case BasicTypeInfo.LONG_TYPE_INFO => "-1"
+    case BasicTypeInfo.SHORT_TYPE_INFO => "-1"
+    case BasicTypeInfo.BYTE_TYPE_INFO => "-1"
+    case BasicTypeInfo.FLOAT_TYPE_INFO => "-1.0f"
+    case BasicTypeInfo.DOUBLE_TYPE_INFO => "-1.0d"
+    case BasicTypeInfo.BOOLEAN_TYPE_INFO => "false"
+    case BasicTypeInfo.STRING_TYPE_INFO => "\"<empty>\""
+    case BasicTypeInfo.CHAR_TYPE_INFO => "'\\0'"
+    case _ => "null"
   }
 
-  protected def typeTermForTypeInfo(typeInfo: TypeInformation[_]): Tree = {
-    val tpe = typeForTypeInfo(typeInfo)
-    tq"$tpe"
-  }
+  protected def typeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
 
-  // We need two separate methods here because typeForTypeInfo is recursive when generating
-  // the type for a type with generic parameters.
-  protected def typeForTypeInfo(tpe: TypeInformation[_]): Type = tpe match {
+//    case BasicTypeInfo.INT_TYPE_INFO => "int"
+//    case BasicTypeInfo.LONG_TYPE_INFO => "long"
+//    case BasicTypeInfo.SHORT_TYPE_INFO => "short"
+//    case BasicTypeInfo.BYTE_TYPE_INFO => "byte"
+//    case BasicTypeInfo.FLOAT_TYPE_INFO => "float"
+//    case BasicTypeInfo.DOUBLE_TYPE_INFO => "double"
+//    case BasicTypeInfo.BOOLEAN_TYPE_INFO => "boolean"
+//    case BasicTypeInfo.CHAR_TYPE_INFO => "char"
 
     // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
     // does not seem to like this, so we manually give the correct type here.
-    case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Int]]
-    case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Long]]
-    case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Short]]
-    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Byte]]
-    case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Float]]
-    case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Double]]
-    case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Boolean]]
-    case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => typeOf[Array[Char]]
+    case PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+    case PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+    case PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+    case PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+    case PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+    case PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+    case PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+    case PrimitiveArrayTypeInfo.CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
 
     case _ =>
-      val clazz = mirror.staticClass(tpe.getTypeClass.getCanonicalName)
-
-      clazz.selfType.erasure match {
-        case ExistentialType(_, underlying) => underlying
-
-        case tpe@TypeRef(prefix, sym, Nil) =>
-          // Non-generic type, just return the type
-          tpe
-
-        case TypeRef(prefix, sym, emptyParams) =>
-          val genericTypeInfos = tpe.getGenericParameters.asScala
-          if (emptyParams.length != genericTypeInfos.length) {
-            throw new RuntimeException("Number of type parameters does not match.")
-          }
-          val typeParams = genericTypeInfos.map(typeForTypeInfo)
-          // TODO: remove, added only for migration of the line below, as suggested by the compiler
-          import compat._
-          TypeRef(prefix, sym, typeParams.toList)
-      }
+      tpe.getTypeClass.getCanonicalName
 
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala
deleted file mode 100644
index 801d3ae..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryPredicate.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for binary predicates, i.e. a Join or CoGroup Predicate.
- */
-class GenerateBinaryPredicate[L, R](
-    leftType: CompositeType[L],
-    rightType: CompositeType[R],
-    predicate: Expression,
-    cl: ClassLoader)
-  extends ExpressionCodeGenerator[(L, R) => Boolean](
-    Seq(("input0", leftType), ("input1", rightType)),
-    cl = cl) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  import scala.reflect.runtime.{universe => ru}
-  import scala.reflect.runtime.universe._
-
-  override protected def generateInternal(): ((L, R) => Boolean) = {
-    val pred = generateExpression(predicate)
-
-    val in0 = newTermName("input0")
-    val in1 = newTermName("input1")
-
-    val leftTpe = typeTermForTypeInfo(leftType)
-    val rightTpe = typeTermForTypeInfo(rightType)
-
-    val code = if (nullCheck) {
-      q"""
-        ($in0: $leftTpe, $in1: $rightTpe) => {
-          ..${pred.code}
-          if (${pred.nullTerm}) {
-            false
-          } else {
-            ${pred.resultTerm}
-          }
-        }
-      """
-    } else {
-      q"""
-        ($in0: $leftTpe, $in1: $rightTpe) => {
-          ..${pred.code}
-          ${pred.resultTerm}
-        }
-      """
-    }
-
-    LOG.debug(s"""Generated binary predicate "$predicate":\n$code""")
-    toolBox.eval(code).asInstanceOf[(L, R) => Boolean]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala
deleted file mode 100644
index 3e0aa68..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for assembling the result of a binary operation.
- */
-class GenerateBinaryResultAssembler[L, R, O](
-    leftTypeInfo: CompositeType[L],
-    rightTypeInfo: CompositeType[R],
-    resultTypeInfo: CompositeType[O],
-    outputFields: Seq[Expression],
-    cl: ClassLoader)
-  extends GenerateResultAssembler[(L, R, O) => O](
-    Seq(("input0", leftTypeInfo), ("input1", rightTypeInfo)),
-    cl = cl) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  import scala.reflect.runtime.universe._
-
-
-  override protected def generateInternal(): ((L, R, O) => O) = {
-
-    val leftType = typeTermForTypeInfo(leftTypeInfo)
-    val rightType = typeTermForTypeInfo(rightTypeInfo)
-    val resultType = typeTermForTypeInfo(resultTypeInfo)
-
-    val resultCode = createResult(resultTypeInfo, outputFields)
-
-    val code: Tree =
-      q"""
-        (input0: $leftType, input1: $rightType, out: $resultType) => {
-          ..$resultCode
-        }
-      """
-
-    LOG.debug(s"Generated binary result-assembler:\n$code")
-    toolBox.eval(code).asInstanceOf[(L, R, O) => O]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
new file mode 100644
index 0000000..06a7076
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateFilter.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import java.io.StringReader
+
+import org.slf4j.LoggerFactory
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.codegen.Indenter._
+import org.apache.flink.api.table.expressions.Expression
+
+/**
+ * Code generator for a unary predicate, i.e. a Filter.
+ */
+class GenerateFilter[T](
+    inputType: CompositeType[T],
+    predicate: Expression,
+    cl: ClassLoader) extends ExpressionCodeGenerator[FilterFunction[T]](
+      Seq(("in0", inputType)),
+      cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override protected def generateInternal(): FilterFunction[T] = {
+    val pred = generateExpression(predicate)
+
+    val tpe = typeTermForTypeInfo(inputType)
+
+    val generatedName = freshName("GeneratedFilter")
+
+    // Janino does not support generics, so we need to cast by hand
+    val code = if (nullCheck) {
+      j"""
+        public class $generatedName
+            implements org.apache.flink.api.common.functions.FilterFunction<$tpe> {
+          public boolean filter(Object _in0) {
+            $tpe in0 = ($tpe) _in0;
+            ${pred.code}
+            if (${pred.nullTerm}) {
+              return false;
+            } else {
+              return ${pred.resultTerm};
+            }
+          }
+        }
+      """
+    } else {
+      j"""
+        public class $generatedName
+            implements org.apache.flink.api.common.functions.FilterFunction<$tpe> {
+          public boolean filter(Object _in0) {
+            $tpe in0 = ($tpe) _in0;
+            ${pred.code}
+            return ${pred.resultTerm};
+          }
+        }
+      """
+    }
+
+    LOG.debug(s"""Generated unary predicate "$predicate":\n$code""")
+    compiler.cook(new StringReader(code))
+    val clazz = compiler.getClassLoader().loadClass(generatedName)
+    clazz.newInstance().asInstanceOf[FilterFunction[T]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
new file mode 100644
index 0000000..8c0cec3
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import java.io.StringReader
+
+import org.slf4j.LoggerFactory
+
+import org.apache.flink.api.common.functions.FlatJoinFunction
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.codegen.Indenter._
+import org.apache.flink.api.table.expressions.{Expression, NopExpression}
+
+/**
+ * Code generator for assembling the result of a binary operation.
+ */
+class GenerateJoin[L, R, O](
+    leftTypeInfo: CompositeType[L],
+    rightTypeInfo: CompositeType[R],
+    resultTypeInfo: CompositeType[O],
+    predicate: Expression,
+    outputFields: Seq[Expression],
+    cl: ClassLoader)
+  extends GenerateResultAssembler[FlatJoinFunction[L, R, O]](
+    Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)),
+    cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+
+  override protected def generateInternal(): FlatJoinFunction[L, R, O] = {
+
+    val leftTpe = typeTermForTypeInfo(leftTypeInfo)
+    val rightTpe = typeTermForTypeInfo(rightTypeInfo)
+    val resultTpe = typeTermForTypeInfo(resultTypeInfo)
+
+
+    val resultCode = createResult(resultTypeInfo, outputFields, o => s"coll.collect($o);")
+
+    val generatedName = freshName("GeneratedJoin")
+
+
+    val code = predicate match {
+      case n: NopExpression =>
+        // Janino does not support generics, that's why we need
+        // manual casting here
+        if (nullCheck) {
+          j"""
+        public class $generatedName
+            implements org.apache.flink.api.common.functions.FlatFlatJoinFunction {
+
+          ${reuseCode(resultTypeInfo)}
+
+          public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) {
+            $leftTpe in0 = ($leftTpe) _in0;
+            $rightTpe in1 = ($rightTpe) _in1;
+
+            $resultCode
+          }
+        }
+      """
+        } else {
+          j"""
+        public class $generatedName
+            implements org.apache.flink.api.common.functions.FlatJoinFunction {
+
+          ${reuseCode(resultTypeInfo)}
+
+          public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) {
+            $leftTpe in0 = ($leftTpe) _in0;
+            $rightTpe in1 = ($rightTpe) _in1;
+
+            $resultCode
+          }
+        }
+      """
+        }
+
+      case _ =>
+        val pred = generateExpression(predicate)
+        // Janino does not support generics, that's why we need
+        // manual casting here
+        if (nullCheck) {
+          j"""
+        public class $generatedName
+            implements org.apache.flink.api.common.functions.FlatFlatJoinFunction {
+
+          ${reuseCode(resultTypeInfo)}
+
+          public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) {
+            $leftTpe in0 = ($leftTpe) _in0;
+            $rightTpe in1 = ($rightTpe) _in1;
+
+            ${pred.code}
+
+            if (${pred.nullTerm} && ${pred.resultTerm}) {
+              $resultCode
+            }
+          }
+        }
+      """
+        } else {
+          j"""
+        public class $generatedName
+            implements org.apache.flink.api.common.functions.FlatJoinFunction {
+
+          ${reuseCode(resultTypeInfo)}
+
+          public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) {
+            $leftTpe in0 = ($leftTpe) _in0;
+            $rightTpe in1 = ($rightTpe) _in1;
+
+            ${pred.code}
+
+            if (${pred.resultTerm}) {
+              $resultCode
+            }
+          }
+        }
+      """
+        }
+    }
+
+    LOG.debug(s"""Generated join:\n$code""")
+    compiler.cook(new StringReader(code))
+    val clazz = compiler.getClassLoader().loadClass(generatedName)
+    clazz.newInstance().asInstanceOf[FlatJoinFunction[L, R, O]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
index 5dd7f95..5172eab 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala
@@ -17,11 +17,11 @@
  */
 package org.apache.flink.api.table.codegen
 
-import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo}
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
 
 /**
  * Base class for unary and binary result assembler code generators.
@@ -30,12 +30,24 @@ abstract class GenerateResultAssembler[R](
     inputs: Seq[(String, CompositeType[_])],
     cl: ClassLoader)
   extends ExpressionCodeGenerator[R](inputs, cl = cl) {
-  import scala.reflect.runtime.{universe => ru}
-  import scala.reflect.runtime.universe._
+
+  def reuseCode[A](resultTypeInfo: CompositeType[A]) = {
+      val resultTpe = typeTermForTypeInfo(resultTypeInfo)
+      resultTypeInfo match {
+        case pj: PojoTypeInfo[_] => s"$resultTpe out = new ${pj.getTypeClass.getCanonicalName}();"
+
+        case row: RowTypeInfo =>
+          s"org.apache.flink.api.table.Row out =" +
+            s" new org.apache.flink.api.table.Row(${row.getArity});"
+
+        case _ => ""
+      }
+  }
 
   def createResult[T](
       resultTypeInfo: CompositeType[T],
-      outputFields: Seq[Expression]): Tree = {
+      outputFields: Seq[Expression],
+      result: String => String): String = {
 
     val resultType = typeTermForTypeInfo(resultTypeInfo)
 
@@ -43,54 +55,57 @@ abstract class GenerateResultAssembler[R](
 
     val block = resultTypeInfo match {
       case ri: RowTypeInfo =>
-        val resultSetters: Seq[Tree] = fieldsCode.zipWithIndex map {
+        val resultSetters: String = fieldsCode.zipWithIndex map {
           case (fieldCode, i) =>
-            q"""
-              out.setField($i, { ..${fieldCode.code}; ${fieldCode.resultTerm} })
-            """
-        }
+            s"""
+              |${fieldCode.code}
+              |out.setField($i, ${fieldCode.resultTerm});
+            """.stripMargin
+        } mkString("\n")
 
-        q"""
-          ..$resultSetters
-          out
-        """
+        s"""
+          |$resultSetters
+          |${result("out")}
+        """.stripMargin
 
       case pj: PojoTypeInfo[_] =>
-        val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
+        val resultSetters: String = fieldsCode.zip(outputFields) map {
         case (fieldCode, expr) =>
-          val fieldName = newTermName(expr.name)
-          q"""
-              out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
-            """
-        }
+          val fieldName = expr.name
+          s"""
+            |${fieldCode.code}
+            |out.$fieldName = ${fieldCode.resultTerm};
+          """.stripMargin
+        } mkString("\n")
 
-        q"""
-          ..$resultSetters
-          out
-        """
+        s"""
+          |$resultSetters
+          |${result("out")}
+        """.stripMargin
 
       case tup: TupleTypeInfo[_] =>
-        val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map {
+        val resultSetters: String = fieldsCode.zip(outputFields) map {
           case (fieldCode, expr) =>
-            val fieldName = newTermName(expr.name)
-            q"""
-              out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} }
-            """
-        }
+            val fieldName = expr.name
+            s"""
+              |${fieldCode.code}
+              |out.$fieldName = ${fieldCode.resultTerm};
+            """.stripMargin
+        } mkString("\n")
 
-        q"""
-          ..$resultSetters
-          out
-        """
+        s"""
+          |$resultSetters
+          |${result("out")}
+        """.stripMargin
 
       case cc: CaseClassTypeInfo[_] =>
-        val resultFields: Seq[Tree] = fieldsCode map {
-          fieldCode =>
-            q"{ ..${fieldCode.code}; ${fieldCode.resultTerm}}"
-        }
-        q"""
-          new $resultType(..$resultFields)
-        """
+        val fields: String = fieldsCode.map(_.code).mkString("\n")
+        val ctorParams: String = fieldsCode.map(_.resultTerm).mkString(",")
+
+        s"""
+          |$fields
+          |return new $resultType($ctorParams);
+        """.stripMargin
     }
 
     block

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
new file mode 100644
index 0000000..5941662
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+import java.io.StringReader
+
+import org.slf4j.LoggerFactory
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.codegen.Indenter._
+import org.apache.flink.api.table.expressions.Expression
+
+/**
+ * Code generator for assembling the result of a select operation.
+ */
+class GenerateSelect[I, O](
+    inputTypeInfo: CompositeType[I],
+    resultTypeInfo: CompositeType[O],
+    outputFields: Seq[Expression],
+    cl: ClassLoader)
+  extends GenerateResultAssembler[MapFunction[I, O]](
+    Seq(("in0", inputTypeInfo)),
+    cl = cl) {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+
+  override protected def generateInternal(): MapFunction[I, O] = {
+
+    val inputTpe = typeTermForTypeInfo(inputTypeInfo)
+    val resultTpe = typeTermForTypeInfo(resultTypeInfo)
+
+    val resultCode = createResult(resultTypeInfo, outputFields, o => s"return $o;")
+
+    val generatedName = freshName("GeneratedSelect")
+
+    // Janino does not support generics, that's why we need
+    // manual casting here
+    val code =
+      j"""
+        public class $generatedName
+            implements org.apache.flink.api.common.functions.MapFunction<$inputTpe, $resultTpe> {
+
+          ${reuseCode(resultTypeInfo)}
+
+          @Override
+          public Object map(Object _in0) {
+            $inputTpe in0 = ($inputTpe) _in0;
+            $resultCode
+          }
+        }
+      """
+
+    LOG.debug(s"""Generated select:\n$code""")
+    compiler.cook(new StringReader(code))
+    val clazz = compiler.getClassLoader().loadClass(generatedName)
+    clazz.newInstance().asInstanceOf[MapFunction[I, O]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala
deleted file mode 100644
index 7ab1ab3..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for a unary predicate, i.e. a Filter.
- */
-class GenerateUnaryPredicate[T](
-    inputType: CompositeType[T],
-    predicate: Expression,
-    cl: ClassLoader) extends ExpressionCodeGenerator[T => Boolean](
-      Seq(("input0", inputType)),
-      cl = cl) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  import scala.reflect.runtime.{universe => ru}
-  import scala.reflect.runtime.universe._
-
-  override protected def generateInternal(): (T => Boolean) = {
-    val pred = generateExpression(predicate)
-
-    val tpe = typeTermForTypeInfo(inputType)
-
-    val code = if (nullCheck) {
-      q"""
-        (input0: $tpe) => {
-          ..${pred.code}
-          if (${pred.nullTerm}) {
-            false
-          } else {
-            ${pred.resultTerm}
-          }
-        }
-      """
-    } else {
-      q"""
-        (input0: $tpe) => {
-          ..${pred.code}
-          ${pred.resultTerm}
-        }
-      """
-    }
-
-    LOG.debug(s"""Generated unary predicate "$predicate":\n$code""")
-    toolBox.eval(code).asInstanceOf[(T) => Boolean]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala
deleted file mode 100644
index 4903c12..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.slf4j.LoggerFactory
-
-/**
- * Code generator for assembling the result of a unary operation.
- */
-class GenerateUnaryResultAssembler[I, O](
-    inputTypeInfo: CompositeType[I],
-    resultTypeInfo: CompositeType[O],
-    outputFields: Seq[Expression],
-    cl: ClassLoader)
-  extends GenerateResultAssembler[(I, O) => O](
-    Seq(("input0", inputTypeInfo)),
-    cl = cl) {
-
-  val LOG = LoggerFactory.getLogger(this.getClass)
-
-  import scala.reflect.runtime.universe._
-
-  override protected def generateInternal(): ((I, O) => O) = {
-
-    val inputType = typeTermForTypeInfo(inputTypeInfo)
-    val resultType = typeTermForTypeInfo(resultTypeInfo)
-
-    val resultCode = createResult(resultTypeInfo, outputFields)
-
-    val code: Tree =
-      q"""
-        (input0: $inputType, out: $resultType) => {
-          ..$resultCode
-        }
-      """
-
-    LOG.debug(s"Generated unary result-assembler:\n${show(code)}")
-    toolBox.eval(code).asInstanceOf[(I, O) => O]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
new file mode 100644
index 0000000..1319f21
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.codegen
+
+class IndentStringContext(sc: StringContext) {
+  def j(args: Any*):String = {
+    val sb = new StringBuilder()
+    for ((s, a) <- sc.parts zip args) {
+      sb append s
+
+      val ind = getindent(s)
+      if (ind.size > 0) {
+        sb append a.toString().replaceAll("\n", "\n" + ind)
+      } else {
+        sb append a.toString()
+      }
+    }
+    if (sc.parts.size > args.size) {
+      sb append sc.parts.last
+    }
+
+    sb.toString()
+  }
+
+  // get white indent after the last new line, if any
+  def getindent(str: String): String = {
+    val lastnl = str.lastIndexOf("\n")
+    if (lastnl == -1) ""
+    else {
+      val ind = str.substring(lastnl + 1)
+      if (ind.trim.isEmpty) ind  // ind is all whitespace. Use this
+      else ""
+    }
+  }
+}
+
+object Indenter {
+  implicit  def toISC(sc: StringContext) = new IndentStringContext(sc)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
index ba376f5..f1ba847 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
@@ -17,31 +17,31 @@
  */
 package org.apache.flink.api.table.runtime
 
-import org.apache.flink.api.table.codegen.GenerateUnaryPredicate
-import org.apache.flink.api.table.expressions.{NopExpression, Expression}
-import org.apache.flink.api.common.functions.RichFilterFunction
+import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
 import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.codegen.GenerateFilter
+import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.configuration.Configuration
 
+/**
+ * Proxy function that takes an expression predicate. This is compiled
+ * upon runtime and calls to [[filter()]] are forwarded to the compiled code.
+ */
 class ExpressionFilterFunction[T](
     predicate: Expression,
     inputType: CompositeType[T]) extends RichFilterFunction[T] {
 
-  var compiledPredicate: (T) => Boolean = null
+  var compiledFilter: FilterFunction[T] = null
 
   override def open(config: Configuration): Unit = {
-    if (compiledPredicate == null) {
-      compiledPredicate = predicate match {
-        case n: NopExpression => null
-        case _ =>
-          val codegen = new GenerateUnaryPredicate[T](
-            inputType,
-            predicate,
-            getRuntimeContext.getUserCodeClassLoader)
-          codegen.generate()
-      }
+    if (compiledFilter == null) {
+      val codegen = new GenerateFilter[T](
+        inputType,
+        predicate,
+        getRuntimeContext.getUserCodeClassLoader)
+      compiledFilter = codegen.generate()
     }
   }
 
-  override def filter(in: T) = compiledPredicate(in)
+  override def filter(in: T) = compiledFilter.filter(in)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
index f5616d3..5743211 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
@@ -17,14 +17,17 @@
  */
 package org.apache.flink.api.table.runtime
 
-import org.apache.flink.api.table.expressions.{NopExpression, Expression}
-import org.apache.flink.api.common.functions.RichFlatJoinFunction
+import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.codegen.{GenerateBinaryResultAssembler,
-GenerateBinaryPredicate}
+import org.apache.flink.api.table.codegen.GenerateJoin
+import org.apache.flink.api.table.expressions.Expression
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.util.Collector
 
+/**
+ * Proxy function that takes an expression predicate and output fields. These are compiled
+ * upon runtime and calls to [[join()]] are forwarded to the compiled code.
+ */
 class ExpressionJoinFunction[L, R, O](
     predicate: Expression,
     leftType: CompositeType[L],
@@ -32,45 +35,20 @@ class ExpressionJoinFunction[L, R, O](
     resultType: CompositeType[O],
     outputFields: Seq[Expression]) extends RichFlatJoinFunction[L, R, O] {
 
-  var compiledPredicate: (L, R) => Boolean = null
-  var resultAssembler: (L, R, O) => O = null
-  var result: O = null.asInstanceOf[O]
+  var compiledJoin: FlatJoinFunction[L, R, O] = null
 
   override def open(config: Configuration): Unit = {
-    result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
-    if (compiledPredicate == null) {
-      compiledPredicate = predicate match {
-        case n: NopExpression => null
-        case _ =>
-          val codegen = new GenerateBinaryPredicate[L, R](
-            leftType,
-            rightType,
-            predicate,
-            getRuntimeContext.getUserCodeClassLoader)
-          codegen.generate()
-      }
-    }
-
-    if (resultAssembler == null) {
-      val resultCodegen = new GenerateBinaryResultAssembler[L, R, O](
-        leftType,
-        rightType,
-        resultType,
-        outputFields,
-        getRuntimeContext.getUserCodeClassLoader)
-
-      resultAssembler = resultCodegen.generate()
-    }
+    val codegen = new GenerateJoin[L, R, O](
+      leftType,
+      rightType,
+      resultType,
+      predicate,
+      outputFields,
+      getRuntimeContext.getUserCodeClassLoader)
+    compiledJoin = codegen.generate()
   }
 
   def join(left: L, right: R, out: Collector[O]) = {
-    if (compiledPredicate == null) {
-      result = resultAssembler(left, right, result)
-      out.collect(result)
-    } else {
-      if (compiledPredicate(left, right)) {
-        result = resultAssembler(left, right, result)
-        out.collect(result)      }
-    }
+    compiledJoin.join(left, right, out)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f81d9f02/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
index 16e256a..32098c3 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
@@ -18,34 +18,36 @@
 package org.apache.flink.api.table.runtime
 
 import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.codegen.GenerateUnaryResultAssembler
+import org.apache.flink.api.table.codegen.GenerateSelect
 import org.apache.flink.configuration.Configuration
 
+/**
+ * Proxy function that takes expressions. These are compiled
+ * upon runtime and calls to [[map()]] are forwarded to the compiled code.
+ */
 class ExpressionSelectFunction[I, O](
      inputType: CompositeType[I],
      resultType: CompositeType[O],
      outputFields: Seq[Expression]) extends RichMapFunction[I, O] {
 
-  var resultAssembler: (I, O) => O = null
-  var result: O = null.asInstanceOf[O]
+  var compiledSelect: MapFunction[I, O] = null
 
   override def open(config: Configuration): Unit = {
-    result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance()
 
-    if (resultAssembler == null) {
-      val resultCodegen = new GenerateUnaryResultAssembler[I, O](
+    if (compiledSelect == null) {
+      val resultCodegen = new GenerateSelect[I, O](
         inputType,
         resultType,
         outputFields,
         getRuntimeContext.getUserCodeClassLoader)
 
-      resultAssembler = resultCodegen.generate()
+      compiledSelect = resultCodegen.generate()
     }
   }
 
   def map(in: I): O = {
-    resultAssembler(in, result)
+    compiledSelect.map(in)
   }
 }