You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:53 UTC

[24/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MethodCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MethodCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MethodCallGen.scala
new file mode 100644
index 0000000..2e0d340
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MethodCallGen.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import java.lang.reflect.Method
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenUtils.qualifyMethod
+import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+  * Generates a function call by using a [[java.lang.reflect.Method]].
+  */
+class MethodCallGen(returnType: TypeInformation[_], method: Method) extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    generateCallIfArgsNotNull(codeGenerator.nullCheck, returnType, operands) {
+      (terms) =>
+        s"""
+          |${qualifyMethod(method)}(${terms.mkString(", ")})
+          |""".stripMargin
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MultiTypeMethodCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MultiTypeMethodCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MultiTypeMethodCallGen.scala
new file mode 100644
index 0000000..e5958a0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MultiTypeMethodCallGen.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import java.lang.reflect.Method
+
+import org.apache.flink.table.codegen.calls.CallGenerator._
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+  * Generates a function call that calls a method which returns the same type that it
+  * takes as first argument.
+  */
+class MultiTypeMethodCallGen(method: Method) extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    generateCallIfArgsNotNull(codeGenerator.nullCheck, operands.head.resultType, operands) {
+      (operandResultTerms) =>
+        s"""
+          |${method.getDeclaringClass.getCanonicalName}.
+          |  ${method.getName}(${operandResultTerms.mkString(", ")})
+         """.stripMargin
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/NotCallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/NotCallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/NotCallGenerator.scala
new file mode 100644
index 0000000..52397c9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/NotCallGenerator.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.table.codegen.calls.ScalarOperators.generateNot
+import org.apache.flink.table.codegen.{GeneratedExpression, CodeGenerator}
+
+/**
+  * Inverts the boolean value of a CallGenerator result.
+  */
+class NotCallGenerator(callGenerator: CallGenerator) extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    val expr = callGenerator.generate(codeGenerator, operands)
+    generateNot(codeGenerator.nullCheck, expr)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
new file mode 100644
index 0000000..ac840df
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+
+/**
+  * Generates a call to user-defined [[ScalarFunction]].
+  *
+  * @param scalarFunction user-defined [[ScalarFunction]] that might be overloaded
+  * @param signature actual signature with which the function is called
+  * @param returnType actual return type required by the surrounding
+  */
+class ScalarFunctionCallGen(
+    scalarFunction: ScalarFunction,
+    signature: Seq[TypeInformation[_]],
+    returnType: TypeInformation[_])
+  extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    // determine function signature and result class
+    val matchingSignature = getSignature(scalarFunction, signature)
+      .getOrElse(throw new CodeGenException("No matching signature found."))
+    val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
+
+    // convert parameters for function (output boxing)
+    val parameters = matchingSignature
+        .zip(operands)
+        .map { case (paramClass, operandExpr) =>
+          if (paramClass.isPrimitive) {
+            operandExpr
+          } else {
+            val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType)
+            val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr)
+            val exprOrNull: String = if (codeGenerator.nullCheck) {
+              s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
+            } else {
+              boxedExpr.resultTerm
+            }
+            boxedExpr.copy(resultTerm = exprOrNull)
+          }
+        }
+
+    // generate function call
+    val functionReference = codeGenerator.addReusableFunction(scalarFunction)
+    val resultTypeTerm = if (resultClass.isPrimitive) {
+      primitiveTypeTermForTypeInfo(returnType)
+    } else {
+      boxedTypeTermForTypeInfo(returnType)
+    }
+    val resultTerm = newName("result")
+    val functionCallCode =
+      s"""
+        |${parameters.map(_.code).mkString("\n")}
+        |$resultTypeTerm $resultTerm = $functionReference.eval(
+        |  ${parameters.map(_.resultTerm).mkString(", ")});
+        |""".stripMargin
+
+    // convert result of function to internal representation (input unboxing)
+    val resultUnboxing = if (resultClass.isPrimitive) {
+      codeGenerator.generateNonNullLiteral(returnType, resultTerm)
+    } else {
+      codeGenerator.generateInputFieldUnboxing(returnType, resultTerm)
+    }
+    resultUnboxing.copy(code =
+      s"""
+        |$functionCallCode
+        |${resultUnboxing.code}
+        |""".stripMargin
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
new file mode 100644
index 0000000..3f7c91f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -0,0 +1,1025 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen.calls
+
+import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
+import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.{CodeGenerator, CodeGenException, GeneratedExpression}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+
+object ScalarOperators {
+
+  def generateStringConcatOperator(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    generateOperatorIfNotNull(nullCheck, STRING_TYPE_INFO, left, right) {
+      (leftTerm, rightTerm) => s"$leftTerm + $rightTerm"
+    }
+  }
+
+  def generateArithmeticOperator(
+      operator: String,
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    val leftCasting = numericCasting(left.resultType, resultType)
+    val rightCasting = numericCasting(right.resultType, resultType)
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+
+    generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+      (leftTerm, rightTerm) =>
+        if (isDecimal(resultType)) {
+          s"${leftCasting(leftTerm)}.${arithOpToDecMethod(operator)}(${rightCasting(rightTerm)})"
+        } else {
+          s"($resultTypeTerm) (${leftCasting(leftTerm)} $operator ${rightCasting(rightTerm)})"
+        }
+    }
+  }
+
+  def generateUnaryArithmeticOperator(
+      operator: String,
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) {
+      (operandTerm) =>
+        if (isDecimal(operand.resultType) && operator == "-") {
+          s"$operandTerm.negate()"
+        } else if (isDecimal(operand.resultType) && operator == "+") {
+          s"$operandTerm"
+        } else {
+          s"$operator($operandTerm)"
+        }
+    }
+  }
+
+  def generateEquals(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    // numeric types
+    if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+      generateComparison("==", nullCheck, left, right)
+    }
+    // temporal types
+    else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
+      generateComparison("==", nullCheck, left, right)
+    }
+    // array types
+    else if (isArray(left.resultType) && left.resultType == right.resultType) {
+      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+        (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)"
+      }
+    }
+    // comparable types of same type
+    else if (isComparable(left.resultType) && left.resultType == right.resultType) {
+      generateComparison("==", nullCheck, left, right)
+    }
+    // non comparable types
+    else {
+      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+        if (isReference(left)) {
+          (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
+        }
+        else if (isReference(right)) {
+          (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
+        }
+        else {
+          throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
+            s"${right.resultType}")
+        }
+      }
+    }
+  }
+
+  def generateNotEquals(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    // numeric types
+    if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+      generateComparison("!=", nullCheck, left, right)
+    }
+    // temporal types
+    else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
+      generateComparison("!=", nullCheck, left, right)
+    }
+    // array types
+    else if (isArray(left.resultType) && left.resultType == right.resultType) {
+      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+        (leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, $rightTerm)"
+      }
+    }
+    // comparable types
+    else if (isComparable(left.resultType) && left.resultType == right.resultType) {
+      generateComparison("!=", nullCheck, left, right)
+    }
+    // non-comparable types
+    else {
+      generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+        if (isReference(left)) {
+          (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
+        }
+        else if (isReference(right)) {
+          (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
+        }
+        else {
+          throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
+            s"${right.resultType}")
+        }
+      }
+    }
+  }
+
+  /**
+    * Generates comparison code for numeric types and comparable types of same type.
+    */
+  def generateComparison(
+      operator: String,
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+      // left is decimal or both sides are decimal
+      if (isDecimal(left.resultType) && isNumeric(right.resultType)) {
+        (leftTerm, rightTerm) => {
+          val operandCasting = numericCasting(right.resultType, left.resultType)
+          s"$leftTerm.compareTo(${operandCasting(rightTerm)}) $operator 0"
+        }
+      }
+      // right is decimal
+      else if (isNumeric(left.resultType) && isDecimal(right.resultType)) {
+        (leftTerm, rightTerm) => {
+          val operandCasting = numericCasting(left.resultType, right.resultType)
+          s"${operandCasting(leftTerm)}.compareTo($rightTerm) $operator 0"
+        }
+      }
+      // both sides are numeric
+      else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+      }
+      // both sides are temporal of same type
+      else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
+        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+      }
+      // both sides are boolean
+      else if (isBoolean(left.resultType) && left.resultType == right.resultType) {
+        operator match {
+          case "==" | "!=" => (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+          case _ => throw new CodeGenException(s"Unsupported boolean comparison '$operator'.")
+        }
+      }
+      // both sides are same comparable type
+      else if (isComparable(left.resultType) && left.resultType == right.resultType) {
+        (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0"
+      }
+      else {
+        throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
+            s"${right.resultType}")
+      }
+    }
+  }
+
+  def generateIsNull(
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val operatorCode = if (nullCheck) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = ${operand.nullTerm};
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else if (!nullCheck && isReference(operand)) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = ${operand.resultTerm} == null;
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = false;
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateIsNotNull(
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val operatorCode = if (nullCheck) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = !${operand.nullTerm};
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else if (!nullCheck && isReference(operand)) {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = ${operand.resultTerm} != null;
+        |boolean $nullTerm = false;
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operand.code}
+        |boolean $resultTerm = true;
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateAnd(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+
+    val operatorCode = if (nullCheck) {
+      // Three-valued logic:
+      // no Unknown -> Two-valued logic
+      // True && Unknown -> Unknown
+      // False && Unknown -> False
+      // Unknown && True -> Unknown
+      // Unknown && False -> False
+      // Unknown && Unknown -> Unknown
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm;
+        |boolean $nullTerm;
+        |if (!${left.nullTerm} && !${right.nullTerm}) {
+        |  $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+        |  $nullTerm = false;
+        |}
+        |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = false;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = false;
+        |}
+        |else {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateOr(
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+
+    val operatorCode = if (nullCheck) {
+      // Three-valued logic:
+      // no Unknown -> Two-valued logic
+      // True && Unknown -> True
+      // False && Unknown -> Unknown
+      // Unknown && True -> True
+      // Unknown && False -> Unknown
+      // Unknown && Unknown -> Unknown
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm;
+        |boolean $nullTerm;
+        |if (!${left.nullTerm} && !${right.nullTerm}) {
+        |  $resultTerm = ${left.resultTerm} || ${right.resultTerm};
+        |  $nullTerm = false;
+        |}
+        |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
+        |  $resultTerm = true;
+        |  $nullTerm = false;
+        |}
+        |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
+        |  $resultTerm = true;
+        |  $nullTerm = false;
+        |}
+        |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |else {
+        |  $resultTerm = false;
+        |  $nullTerm = true;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $resultTerm = ${left.resultTerm} || ${right.resultTerm};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+  }
+
+  def generateNot(
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    // Three-valued logic:
+    // no Unknown -> Two-valued logic
+    // Unknown -> Unknown
+    generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) {
+      (operandTerm) => s"!($operandTerm)"
+    }
+  }
+
+  def generateIsTrue(operand: GeneratedExpression): GeneratedExpression = {
+    GeneratedExpression(
+      operand.resultTerm, // unknown is always false by default
+      GeneratedExpression.NEVER_NULL,
+      operand.code,
+      BOOLEAN_TYPE_INFO)
+  }
+
+  def generateIsNotTrue(operand: GeneratedExpression): GeneratedExpression = {
+    GeneratedExpression(
+      s"(!${operand.resultTerm})", // unknown is always false by default
+      GeneratedExpression.NEVER_NULL,
+      operand.code,
+      BOOLEAN_TYPE_INFO)
+  }
+
+  def generateIsFalse(operand: GeneratedExpression): GeneratedExpression = {
+    GeneratedExpression(
+      s"(!${operand.resultTerm} && !${operand.nullTerm})",
+      GeneratedExpression.NEVER_NULL,
+      operand.code,
+      BOOLEAN_TYPE_INFO)
+  }
+
+  def generateIsNotFalse(operand: GeneratedExpression): GeneratedExpression = {
+    GeneratedExpression(
+      s"(${operand.resultTerm} || ${operand.nullTerm})",
+      GeneratedExpression.NEVER_NULL,
+      operand.code,
+      BOOLEAN_TYPE_INFO)
+  }
+
+  def generateCast(
+      nullCheck: Boolean,
+      operand: GeneratedExpression,
+      targetType: TypeInformation[_])
+    : GeneratedExpression = (operand.resultType, targetType) match {
+    // identity casting
+    case (fromTp, toTp) if fromTp == toTp =>
+      operand
+
+    // Date/Time/Timestamp -> String
+    case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"${internalToTimePointCode(dtt, operandTerm)}.toString()"
+      }
+
+    // Interval Months -> String
+    case (TimeIntervalTypeInfo.INTERVAL_MONTHS, STRING_TYPE_INFO) =>
+      val method = qualifyMethod(BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method)
+      val timeUnitRange = qualifyEnum(TimeUnitRange.YEAR_TO_MONTH)
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"$method($operandTerm, $timeUnitRange)"
+      }
+
+    // Interval Millis -> String
+    case (TimeIntervalTypeInfo.INTERVAL_MILLIS, STRING_TYPE_INFO) =>
+      val method = qualifyMethod(BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method)
+      val timeUnitRange = qualifyEnum(TimeUnitRange.DAY_TO_SECOND)
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"$method($operandTerm, $timeUnitRange, 3)" // milli second precision
+      }
+
+    // Object array -> String
+    case (_:ObjectArrayTypeInfo[_, _], STRING_TYPE_INFO) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"java.util.Arrays.deepToString($operandTerm)"
+      }
+
+    // Primitive array -> String
+    case (_:PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"java.util.Arrays.toString($operandTerm)"
+      }
+
+    // * (not Date/Time/Timestamp) -> String
+    case (_, STRING_TYPE_INFO) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s""" "" + $operandTerm"""
+      }
+
+    // * -> Character
+    case (_, CHAR_TYPE_INFO) =>
+      throw new CodeGenException("Character type not supported.")
+
+    // String -> NUMERIC TYPE (not Character), Boolean
+    case (STRING_TYPE_INFO, _: NumericTypeInfo[_])
+        | (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) =>
+      val wrapperClass = targetType.getTypeClass.getCanonicalName
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"$wrapperClass.valueOf($operandTerm)"
+      }
+
+    // String -> BigDecimal
+    case (STRING_TYPE_INFO, BIG_DEC_TYPE_INFO) =>
+      val wrapperClass = targetType.getTypeClass.getCanonicalName
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"new $wrapperClass($operandTerm)"
+      }
+
+    // String -> Date
+    case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_DATE.method)}($operandTerm)"
+      }
+
+    // String -> Time
+    case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIME.method)}($operandTerm)"
+      }
+
+    // String -> Timestamp
+    case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIMESTAMP.method)}" +
+          s"($operandTerm)"
+      }
+
+    // Boolean -> NUMERIC TYPE
+    case (BOOLEAN_TYPE_INFO, nti: NumericTypeInfo[_]) =>
+      val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)"
+      }
+
+    // Boolean -> BigDecimal
+    case (BOOLEAN_TYPE_INFO, BIG_DEC_TYPE_INFO) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"$operandTerm ? java.math.BigDecimal.ONE : java.math.BigDecimal.ZERO"
+      }
+
+    // NUMERIC TYPE -> Boolean
+    case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"$operandTerm != 0"
+      }
+
+    // BigDecimal -> Boolean
+    case (BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"$operandTerm.compareTo(java.math.BigDecimal.ZERO) != 0"
+      }
+
+    // NUMERIC TYPE, BigDecimal -> NUMERIC TYPE, BigDecimal
+    case (_: NumericTypeInfo[_], _: NumericTypeInfo[_])
+        | (BIG_DEC_TYPE_INFO, _: NumericTypeInfo[_])
+        | (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) =>
+      val operandCasting = numericCasting(operand.resultType, targetType)
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"${operandCasting(operandTerm)}"
+      }
+
+    // Date -> Timestamp
+    case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) =>
+          s"$operandTerm * ${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY"
+      }
+
+    // Timestamp -> Date
+    case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+      val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) =>
+          s"($targetTypeTerm) ($operandTerm / " +
+            s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
+      }
+
+    // Time -> Timestamp
+    case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.TIMESTAMP) =>
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) => s"$operandTerm"
+      }
+
+    // Timestamp -> Time
+    case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIME) =>
+      val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
+      generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+        (operandTerm) =>
+          s"($targetTypeTerm) ($operandTerm % " +
+            s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
+      }
+
+    // internal temporal casting
+    // Date -> Integer
+    // Time -> Integer
+    // Timestamp -> Long
+    // Integer -> Date
+    // Integer -> Time
+    // Long -> Timestamp
+    // Integer -> Interval Months
+    // Long -> Interval Millis
+    // Interval Months -> Integer
+    // Interval Millis -> Long
+    case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) |
+         (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) |
+         (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) |
+         (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) |
+         (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) |
+         (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) |
+         (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) |
+         (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
+         (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) |
+         (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) =>
+      internalExprCasting(operand, targetType)
+
+    // internal reinterpretation of temporal types
+    // Date, Time, Interval Months -> Long
+    case  (SqlTimeTypeInfo.DATE, LONG_TYPE_INFO)
+        | (SqlTimeTypeInfo.TIME, LONG_TYPE_INFO)
+        | (TimeIntervalTypeInfo.INTERVAL_MONTHS, LONG_TYPE_INFO) =>
+      internalExprCasting(operand, targetType)
+
+    case (from, to) =>
+      throw new CodeGenException(s"Unsupported cast from '$from' to '$to'.")
+  }
+
+  def generateIfElse(
+      nullCheck: Boolean,
+      operands: Seq[GeneratedExpression],
+      resultType: TypeInformation[_],
+      i: Int = 0)
+    : GeneratedExpression = {
+    // else part
+    if (i == operands.size - 1) {
+      generateCast(nullCheck, operands(i), resultType)
+    }
+    else {
+      // check that the condition is boolean
+      // we do not check for null instead we use the default value
+      // thus null is false
+      requireBoolean(operands(i))
+      val condition = operands(i)
+      val trueAction = generateCast(nullCheck, operands(i + 1), resultType)
+      val falseAction = generateIfElse(nullCheck, operands, resultType, i + 2)
+
+      val resultTerm = newName("result")
+      val nullTerm = newName("isNull")
+      val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+
+      val operatorCode = if (nullCheck) {
+        s"""
+          |${condition.code}
+          |$resultTypeTerm $resultTerm;
+          |boolean $nullTerm;
+          |if (${condition.resultTerm}) {
+          |  ${trueAction.code}
+          |  $resultTerm = ${trueAction.resultTerm};
+          |  $nullTerm = ${trueAction.nullTerm};
+          |}
+          |else {
+          |  ${falseAction.code}
+          |  $resultTerm = ${falseAction.resultTerm};
+          |  $nullTerm = ${falseAction.nullTerm};
+          |}
+          |""".stripMargin
+      }
+      else {
+        s"""
+          |${condition.code}
+          |$resultTypeTerm $resultTerm;
+          |if (${condition.resultTerm}) {
+          |  ${trueAction.code}
+          |  $resultTerm = ${trueAction.resultTerm};
+          |}
+          |else {
+          |  ${falseAction.code}
+          |  $resultTerm = ${falseAction.resultTerm};
+          |}
+          |""".stripMargin
+      }
+
+      GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
+    }
+  }
+
+  def generateTemporalPlusMinus(
+      plus: Boolean,
+      nullCheck: Boolean,
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+    : GeneratedExpression = {
+
+    val op = if (plus) "+" else "-"
+
+    (left.resultType, right.resultType) match {
+      case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r =>
+        generateArithmeticOperator(op, nullCheck, l, left, right)
+
+      case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) {
+            (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))"
+        }
+
+      case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
+        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) {
+            (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
+        }
+
+      case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, right) {
+            (l, r) => s"$l $op ((int) ($r))"
+        }
+
+      case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) {
+          (l, r) => s"$l $op $r"
+        }
+
+      case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
+        generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) {
+          (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
+        }
+
+      case _ =>
+        throw new CodeGenException("Unsupported temporal arithmetic.")
+    }
+  }
+
+  def generateUnaryIntervalPlusMinus(
+      plus: Boolean,
+      nullCheck: Boolean,
+      operand: GeneratedExpression)
+    : GeneratedExpression = {
+    val operator = if (plus) "+" else "-"
+    generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand)
+  }
+
+  def generateArray(
+      codeGenerator: CodeGenerator,
+      resultType: TypeInformation[_],
+      elements: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    val arrayTerm = codeGenerator.addReusableArray(resultType.getTypeClass, elements.size)
+
+    val boxedElements: Seq[GeneratedExpression] = resultType match {
+
+      case oati: ObjectArrayTypeInfo[_, _] =>
+        // we box the elements to also represent null values
+        val boxedTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
+
+        elements.map { e =>
+          val boxedExpr = codeGenerator.generateOutputFieldBoxing(e)
+          val exprOrNull: String = if (codeGenerator.nullCheck) {
+            s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
+          } else {
+            boxedExpr.resultTerm
+          }
+          boxedExpr.copy(resultTerm = exprOrNull)
+        }
+
+      // no boxing necessary
+      case _: PrimitiveArrayTypeInfo[_] => elements
+    }
+
+    val code = boxedElements
+      .zipWithIndex
+      .map { case (element, idx) =>
+        s"""
+          |${element.code}
+          |$arrayTerm[$idx] = ${element.resultTerm};
+          |""".stripMargin
+      }
+      .mkString("\n")
+
+    GeneratedExpression(arrayTerm, GeneratedExpression.NEVER_NULL, code, resultType)
+  }
+
+  def generateArrayElementAt(
+      codeGenerator: CodeGenerator,
+      array: GeneratedExpression,
+      index: GeneratedExpression)
+    : GeneratedExpression = {
+
+    val resultTerm = newName("result")
+
+    array.resultType match {
+
+      // unbox object array types
+      case oati: ObjectArrayTypeInfo[_, _] =>
+        // get boxed array element
+        val resultTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
+
+        val arrayAccessCode = if (codeGenerator.nullCheck) {
+          s"""
+            |${array.code}
+            |${index.code}
+            |$resultTypeTerm $resultTerm = (${array.nullTerm} || ${index.nullTerm}) ?
+            |  null : ${array.resultTerm}[${index.resultTerm} - 1];
+            |""".stripMargin
+        } else {
+          s"""
+            |${array.code}
+            |${index.code}
+            |$resultTypeTerm $resultTerm = ${array.resultTerm}[${index.resultTerm} - 1];
+            |""".stripMargin
+        }
+
+        // generate unbox code
+        val unboxing = codeGenerator.generateInputFieldUnboxing(oati.getComponentInfo, resultTerm)
+
+        unboxing.copy(code =
+          s"""
+            |$arrayAccessCode
+            |${unboxing.code}
+            |""".stripMargin
+        )
+
+      // no unboxing necessary
+      case pati: PrimitiveArrayTypeInfo[_] =>
+        generateOperatorIfNotNull(codeGenerator.nullCheck, pati.getComponentType, array, index) {
+          (leftTerm, rightTerm) => s"$leftTerm[$rightTerm - 1]"
+        }
+    }
+  }
+
+  def generateArrayElement(
+      codeGenerator: CodeGenerator,
+      array: GeneratedExpression)
+    : GeneratedExpression = {
+
+    val nullTerm = newName("isNull")
+    val resultTerm = newName("result")
+    val resultType = array.resultType match {
+      case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+      case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+    }
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    val defaultValue = primitiveDefaultValue(resultType)
+
+    val arrayLengthCode = if (codeGenerator.nullCheck) {
+      s"${array.nullTerm} ? 0 : ${array.resultTerm}.length"
+    } else {
+      s"${array.resultTerm}.length"
+    }
+
+    val arrayAccessCode = array.resultType match {
+      case oati: ObjectArrayTypeInfo[_, _] =>
+        // generate unboxing code
+        val unboxing = codeGenerator.generateInputFieldUnboxing(
+          oati.getComponentInfo,
+          s"${array.resultTerm}[0]")
+
+        s"""
+          |${array.code}
+          |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
+          |$resultTypeTerm $resultTerm;
+          |switch ($arrayLengthCode) {
+          |  case 0:
+          |    ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
+          |    $resultTerm = $defaultValue;
+          |    break;
+          |  case 1:
+          |    ${unboxing.code}
+          |    ${if (codeGenerator.nullCheck) s"$nullTerm = ${unboxing.nullTerm};" else "" }
+          |    $resultTerm = ${unboxing.resultTerm};
+          |    break;
+          |  default:
+          |    throw new RuntimeException("Array has more than one element.");
+          |}
+          |""".stripMargin
+
+      case pati: PrimitiveArrayTypeInfo[_] =>
+        s"""
+          |${array.code}
+          |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
+          |$resultTypeTerm $resultTerm;
+          |switch ($arrayLengthCode) {
+          |  case 0:
+          |    ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
+          |    $resultTerm = $defaultValue;
+          |    break;
+          |  case 1:
+          |    ${if (codeGenerator.nullCheck) s"$nullTerm = false;" else "" }
+          |    $resultTerm = ${array.resultTerm}[0];
+          |    break;
+          |  default:
+          |    throw new RuntimeException("Array has more than one element.");
+          |}
+          |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, arrayAccessCode, resultType)
+  }
+
+  def generateArrayCardinality(
+      nullCheck: Boolean,
+      array: GeneratedExpression)
+    : GeneratedExpression = {
+
+    generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, array) {
+      (operandTerm) => s"${array.resultTerm}.length"
+    }
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  private def generateUnaryOperatorIfNotNull(
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      operand: GeneratedExpression)
+      (expr: (String) => String)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    val defaultValue = primitiveDefaultValue(resultType)
+
+    val operatorCode = if (nullCheck) {
+      s"""
+        |${operand.code}
+        |$resultTypeTerm $resultTerm;
+        |boolean $nullTerm;
+        |if (!${operand.nullTerm}) {
+        |  $resultTerm = ${expr(operand.resultTerm)};
+        |  $nullTerm = false;
+        |}
+        |else {
+        |  $resultTerm = $defaultValue;
+        |  $nullTerm = true;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operand.code}
+        |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
+  }
+
+  private def generateOperatorIfNotNull(
+      nullCheck: Boolean,
+      resultType: TypeInformation[_],
+      left: GeneratedExpression,
+      right: GeneratedExpression)
+      (expr: (String, String) => String)
+    : GeneratedExpression = {
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    val defaultValue = primitiveDefaultValue(resultType)
+
+    val resultCode = if (nullCheck) {
+      s"""
+        |${left.code}
+        |${right.code}
+        |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm};
+        |$resultTypeTerm $resultTerm;
+        |if ($nullTerm) {
+        |  $resultTerm = $defaultValue;
+        |}
+        |else {
+        |  $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${left.code}
+        |${right.code}
+        |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
+  }
+
+  private def internalExprCasting(
+      expr: GeneratedExpression,
+      typeInfo: TypeInformation[_])
+    : GeneratedExpression = {
+    GeneratedExpression(expr.resultTerm, expr.nullTerm, expr.code, typeInfo)
+  }
+
+  private def arithOpToDecMethod(operator: String): String = operator match {
+    case "+" => "add"
+    case "-" => "subtract"
+    case "*" => "multiply"
+    case "/" => "divide"
+    case "%" => "remainder"
+    case _ => throw new CodeGenException("Unsupported decimal arithmetic operator.")
+  }
+
+  private def numericCasting(
+      operandType: TypeInformation[_],
+      resultType: TypeInformation[_])
+    : (String) => String = {
+
+    def decToPrimMethod(targetType: TypeInformation[_]): String = targetType match {
+      case BYTE_TYPE_INFO => "byteValueExact"
+      case SHORT_TYPE_INFO => "shortValueExact"
+      case INT_TYPE_INFO => "intValueExact"
+      case LONG_TYPE_INFO => "longValueExact"
+      case FLOAT_TYPE_INFO => "floatValue"
+      case DOUBLE_TYPE_INFO => "doubleValue"
+      case _ => throw new CodeGenException("Unsupported decimal casting type.")
+    }
+
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+    // no casting necessary
+    if (operandType == resultType) {
+      (operandTerm) => s"$operandTerm"
+    }
+    // result type is decimal but numeric operand is not
+    else if (isDecimal(resultType) && !isDecimal(operandType) && isNumeric(operandType)) {
+      (operandTerm) =>
+        s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) $operandTerm)"
+    }
+    // numeric result type is not decimal but operand is
+    else if (isNumeric(resultType) && !isDecimal(resultType) && isDecimal(operandType) ) {
+      (operandTerm) => s"$operandTerm.${decToPrimMethod(resultType)}()"
+    }
+    // result type and operand type are numeric but not decimal
+    else if (isNumeric(operandType) && isNumeric(resultType)
+        && !isDecimal(operandType) && !isDecimal(resultType)) {
+      (operandTerm) => s"(($resultTypeTerm) $operandTerm)"
+    }
+    else {
+      throw new CodeGenException(s"Unsupported casting from $operandType to $resultType.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
new file mode 100644
index 0000000..50c569f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.GeneratedExpression.NEVER_NULL
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+
+/**
+  * Generates a call to user-defined [[TableFunction]].
+  *
+  * @param tableFunction user-defined [[TableFunction]] that might be overloaded
+  * @param signature actual signature with which the function is called
+  * @param returnType actual return type required by the surrounding
+  */
+class TableFunctionCallGen(
+    tableFunction: TableFunction[_],
+    signature: Seq[TypeInformation[_]],
+    returnType: TypeInformation[_])
+  extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    // determine function signature
+    val matchingSignature = getSignature(tableFunction, signature)
+      .getOrElse(throw new CodeGenException("No matching signature found."))
+
+    // convert parameters for function (output boxing)
+    val parameters = matchingSignature
+        .zip(operands)
+        .map { case (paramClass, operandExpr) =>
+          if (paramClass.isPrimitive) {
+            operandExpr
+          } else {
+            val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType)
+            val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr)
+            val exprOrNull: String = if (codeGenerator.nullCheck) {
+              s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
+            } else {
+              boxedExpr.resultTerm
+            }
+            boxedExpr.copy(resultTerm = exprOrNull)
+          }
+        }
+
+    // generate function call
+    val functionReference = codeGenerator.addReusableFunction(tableFunction)
+    val functionCallCode =
+      s"""
+        |${parameters.map(_.code).mkString("\n")}
+        |$functionReference.clear();
+        |$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")});
+        |""".stripMargin
+
+    // has no result
+    GeneratedExpression(
+      functionReference,
+      NEVER_NULL,
+      functionCallCode,
+      returnType)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TrimCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TrimCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TrimCallGen.scala
new file mode 100644
index 0000000..9d50bf9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TrimCallGen.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING}
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.calls.CallGenerator._
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+  * Generates a TRIM function call.
+  *
+  * First operand: trim mode (see [[org.apache.calcite.sql.fun.SqlTrimFunction.Flag]])
+  * Second operand: String to be removed
+  * Third operand: String to be trimmed
+  */
+class TrimCallGen extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) {
+      (terms) =>
+        val leading = compareEnum(terms.head, BOTH) || compareEnum(terms.head, LEADING)
+        val trailing = compareEnum(terms.head, BOTH) || compareEnum(terms.head, TRAILING)
+        s"""
+          |${qualifyMethod(BuiltInMethod.TRIM.method)}(
+          |  $leading, $trailing, ${terms(1)}, ${terms(2)})
+          |""".stripMargin
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
new file mode 100644
index 0000000..0d60dc1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+  * Describes a generated expression.
+  *
+  * @param resultTerm term to access the result of the expression
+  * @param nullTerm boolean term that indicates if expression is null
+  * @param code code necessary to produce resultTerm and nullTerm
+  * @param resultType type of the resultTerm
+  */
+case class GeneratedExpression(
+    resultTerm: String,
+    nullTerm: String,
+    code: String,
+    resultType: TypeInformation[_])
+
+object GeneratedExpression {
+  val ALWAYS_NULL = "true"
+  val NEVER_NULL = "false"
+  val NO_CODE = ""
+}
+
+case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String)

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/package.scala
new file mode 100644
index 0000000..743f846
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+package object codegen {
+  // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not thread safe. We might
+  // have several parallel expression operators in one TaskManager, therefore we need to guard
+  // these operations.
+  object ReflectionLock
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
new file mode 100644
index 0000000..6d16722
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.TableEnvironment
+
+/**
+  * Simple example for demonstrating the use of SQL on a Stream Table.
+  *
+  * This example shows how to:
+  *  - Convert DataStreams to Tables
+  *  - Register a Table under a name
+  *  - Run a StreamSQL query on the registered Table
+  *
+  */
+object StreamSQLExample {
+
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val orderA: DataStream[Order] = env.fromCollection(Seq(
+      Order(1L, "beer", 3),
+      Order(1L, "diaper", 4),
+      Order(3L, "rubber", 2)))
+
+    val orderB: DataStream[Order] = env.fromCollection(Seq(
+      Order(2L, "pen", 3),
+      Order(2L, "rubber", 3),
+      Order(4L, "beer", 1)))
+
+    // register the DataStreams under the name "OrderA" and "OrderB"
+    tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
+    tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
+
+    // union the two tables
+    val result = tEnv.sql(
+      "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
+        "SELECT * FROM OrderB WHERE amount < 2")
+
+    result.toDataStream[Order].print()
+
+    env.execute()
+  }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class Order(user: Long, product: String, amount: Int)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
new file mode 100644
index 0000000..6c1467f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.TableEnvironment
+
+/**
+  * Simple example for demonstrating the use of Table API on a Stream Table.
+  *
+  * This example shows how to:
+  *  - Convert DataStreams to Tables
+  *  - Apply union, select, and filter operations
+  *
+  */
+object StreamTableExample {
+
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val orderA = env.fromCollection(Seq(
+      Order(1L, "beer", 3),
+      Order(1L, "diaper", 4),
+      Order(3L, "rubber", 2))).toTable(tEnv)
+
+    val orderB = env.fromCollection(Seq(
+      Order(2L, "pen", 3),
+      Order(2L, "rubber", 3),
+      Order(4L, "beer", 1))).toTable(tEnv)
+
+    // union the two tables
+    val result: DataStream[Order] = orderA.unionAll(orderB)
+      .select('user, 'product, 'amount)
+      .where('amount > 2)
+      .toDataStream[Order]
+
+    result.print()
+
+    env.execute()
+  }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class Order(user: Long, product: String, amount: Int)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala
new file mode 100644
index 0000000..74afb06
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+
+/**
+  * This program implements a modified version of the TPC-H query 3. The
+  * example demonstrates how to assign names to fields by extending the Tuple class.
+  * The original query can be found at
+  * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+  * (page 29).
+  *
+  * This program implements the following SQL equivalent:
+  *
+  * {{{
+  * SELECT
+  *      l_orderkey,
+  *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
+  *      o_orderdate,
+  *      o_shippriority
+  * FROM customer,
+  *      orders,
+  *      lineitem
+  * WHERE
+  *      c_mktsegment = '[SEGMENT]'
+  *      AND c_custkey = o_custkey
+  *      AND l_orderkey = o_orderkey
+  *      AND o_orderdate < date '[DATE]'
+  *      AND l_shipdate > date '[DATE]'
+  * GROUP BY
+  *      l_orderkey,
+  *      o_orderdate,
+  *      o_shippriority
+  * ORDER BY
+  *      revenue desc,
+  *      o_orderdate;
+  * }}}
+  *
+  * Input files are plain text CSV files using the pipe character ('|') as field separator
+  * as generated by the TPC-H data generator which is available at
+  * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+  *
+  * Usage:
+  * {{{
+  * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+  * }}}
+  *
+  * This example shows how to:
+  *  - Convert DataSets to Tables
+  *  - Use Table API expressions
+  *
+  */
+object TPCHQuery3Table {
+
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
+
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    // set filter date
+    val date = "1995-03-12".toDate
+
+    // get execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val lineitems = getLineitemDataSet(env)
+      .toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate)
+      .filter('shipDate.toDate > date)
+
+    val customers = getCustomerDataSet(env)
+      .toTable(tEnv, 'id, 'mktSegment)
+      .filter('mktSegment === "AUTOMOBILE")
+
+    val orders = getOrdersDataSet(env)
+      .toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio)
+      .filter('orderDate.toDate < date)
+
+    val items =
+      orders.join(customers)
+        .where('custId === 'id)
+        .select('orderId, 'orderDate, 'shipPrio)
+      .join(lineitems)
+        .where('orderId === 'id)
+        .select(
+          'orderId,
+          'extdPrice * (1.0f.toExpr - 'discount) as 'revenue,
+          'orderDate,
+          'shipPrio)
+
+    val result = items
+      .groupBy('orderId, 'orderDate, 'shipPrio)
+      .select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio)
+      .orderBy('revenue.desc, 'orderDate.asc)
+
+    // emit result
+    result.writeAsCsv(outputPath, "\n", "|")
+
+    // execute program
+    env.execute("Scala TPCH Query 3 (Table API Expression) Example")
+  }
+  
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+  
+  case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
+  case class Customer(id: Long, mktSegment: String)
+  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
+
+  // *************************************************************************
+  //     UTIL METHODS
+  // *************************************************************************
+  
+  private var lineitemPath: String = _
+  private var customerPath: String = _
+  private var ordersPath: String = _
+  private var outputPath: String = _
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length == 4) {
+      lineitemPath = args(0)
+      customerPath = args(1)
+      ordersPath = args(2)
+      outputPath = args(3)
+      true
+    } else {
+      System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+          " Due to legal restrictions, we can not ship generated data.\n" +
+          " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " +
+                             "<orders-csv path> <result path>")
+      false
+    }
+  }
+
+  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+    env.readCsvFile[Lineitem](
+        lineitemPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 5, 6, 10) )
+  }
+
+  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+    env.readCsvFile[Customer](
+        customerPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 6) )
+  }
+
+  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+    env.readCsvFile[Order](
+        ordersPath,
+        fieldDelimiter = "|",
+        includedFields = Array(0, 1, 4, 7) )
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
new file mode 100644
index 0000000..a8b8268
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+
+/**
+  * Simple example that shows how the Batch SQL API is used in Scala.
+  *
+  * This example shows how to:
+  *  - Convert DataSets to Tables
+  *  - Register a Table under a name
+  *  - Run a SQL query on the registered Table
+  *
+  */
+object WordCountSQL {
+
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+
+    // register the DataSet as table "WordCount"
+    tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
+
+    // run a SQL query on the Table and retrieve the result as a new Table
+    val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
+
+    table.toDataSet[WC].print()
+  }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class WC(word: String, frequency: Long)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
new file mode 100644
index 0000000..75ea8ce
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+
+/**
+  * Simple example for demonstrating the use of the Table API for a Word Count in Scala.
+  *
+  * This example shows how to:
+  *  - Convert DataSets to Tables
+  *  - Apply group, aggregate, select, and filter operations
+  *
+  */
+object WordCountTable {
+
+  // *************************************************************************
+  //     PROGRAM
+  // *************************************************************************
+
+  def main(args: Array[String]): Unit = {
+
+    // set up execution environment
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+    val expr = input.toTable(tEnv)
+    val result = expr
+      .groupBy('word)
+      .select('word, 'frequency.sum as 'frequency)
+      .filter('frequency === 2)
+      .toDataSet[WC]
+
+    result.print()
+  }
+
+  // *************************************************************************
+  //     USER DATA TYPES
+  // *************************************************************************
+
+  case class WC(word: String, frequency: Long)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
new file mode 100644
index 0000000..14d899d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.TreeNode
+import org.apache.flink.table.validate.{ValidationResult, ValidationSuccess}
+
+abstract class Expression extends TreeNode[Expression] {
+  /**
+    * Returns the [[TypeInformation]] for evaluating this expression.
+    * It is sometimes not available until the expression is valid.
+    */
+  private[flink] def resultType: TypeInformation[_]
+
+  /**
+    * One pass validation of the expression tree in post order.
+    */
+  private[flink] lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+  private[flink] def childrenValid: Boolean = children.forall(_.valid)
+
+  /**
+    * Check input data types, inputs number or other properties specified by this expression.
+    * Return `ValidationSuccess` if it pass the check,
+    * or `ValidationFailure` with supplement message explaining the error.
+    * Note: we should only call this method until `childrenValid == true`
+    */
+  private[flink] def validateInput(): ValidationResult = ValidationSuccess
+
+  /**
+    * Convert Expression to its counterpart in Calcite, i.e. RexNode
+    */
+  private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException(
+      s"${this.getClass.getName} cannot be transformed to RexNode"
+    )
+
+  private[flink] def checkEquals(other: Expression): Boolean = {
+    if (this.getClass != other.getClass) {
+      false
+    } else {
+      def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
+        elements1.length == elements2.length && elements1.zip(elements2).forall {
+          case (e1: Expression, e2: Expression) => e1.checkEquals(e2)
+          case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
+          case (i1, i2) => i1 == i2
+        }
+      }
+      val elements1 = this.productIterator.toSeq
+      val elements2 = other.productIterator.toSeq
+      checkEquality(elements1, elements2)
+    }
+  }
+}
+
+abstract class BinaryExpression extends Expression {
+  private[flink] def left: Expression
+  private[flink] def right: Expression
+  private[flink] def children = Seq(left, right)
+}
+
+abstract class UnaryExpression extends Expression {
+  private[flink] def child: Expression
+  private[flink] def children = Seq(child)
+}
+
+abstract class LeafExpression extends Expression {
+  private[flink] val children = Nil
+}