You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:53 UTC
[24/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MethodCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MethodCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MethodCallGen.scala
new file mode 100644
index 0000000..2e0d340
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MethodCallGen.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import java.lang.reflect.Method
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenUtils.qualifyMethod
+import org.apache.flink.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+ * Generates a function call by using a [[java.lang.reflect.Method]].
+ */
+class MethodCallGen(returnType: TypeInformation[_], method: Method) extends CallGenerator {
+
+ override def generate(
+ codeGenerator: CodeGenerator,
+ operands: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, returnType, operands) {
+ (terms) =>
+ s"""
+ |${qualifyMethod(method)}(${terms.mkString(", ")})
+ |""".stripMargin
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MultiTypeMethodCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MultiTypeMethodCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MultiTypeMethodCallGen.scala
new file mode 100644
index 0000000..e5958a0
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/MultiTypeMethodCallGen.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import java.lang.reflect.Method
+
+import org.apache.flink.table.codegen.calls.CallGenerator._
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+ * Generates a function call that calls a method which returns the same type that it
+ * takes as first argument.
+ */
+class MultiTypeMethodCallGen(method: Method) extends CallGenerator {
+
+ override def generate(
+ codeGenerator: CodeGenerator,
+ operands: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, operands.head.resultType, operands) {
+ (operandResultTerms) =>
+ s"""
+ |${method.getDeclaringClass.getCanonicalName}.
+ | ${method.getName}(${operandResultTerms.mkString(", ")})
+ """.stripMargin
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/NotCallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/NotCallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/NotCallGenerator.scala
new file mode 100644
index 0000000..52397c9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/NotCallGenerator.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.table.codegen.calls.ScalarOperators.generateNot
+import org.apache.flink.table.codegen.{GeneratedExpression, CodeGenerator}
+
+/**
+ * Inverts the boolean value of a CallGenerator result.
+ */
+class NotCallGenerator(callGenerator: CallGenerator) extends CallGenerator {
+
+ override def generate(
+ codeGenerator: CodeGenerator,
+ operands: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ val expr = callGenerator.generate(codeGenerator, operands)
+ generateNot(codeGenerator.nullCheck, expr)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
new file mode 100644
index 0000000..ac840df
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+
+/**
+ * Generates a call to user-defined [[ScalarFunction]].
+ *
+ * @param scalarFunction user-defined [[ScalarFunction]] that might be overloaded
+ * @param signature actual signature with which the function is called
+ * @param returnType actual return type required by the surrounding
+ */
+class ScalarFunctionCallGen(
+ scalarFunction: ScalarFunction,
+ signature: Seq[TypeInformation[_]],
+ returnType: TypeInformation[_])
+ extends CallGenerator {
+
+ override def generate(
+ codeGenerator: CodeGenerator,
+ operands: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ // determine function signature and result class
+ val matchingSignature = getSignature(scalarFunction, signature)
+ .getOrElse(throw new CodeGenException("No matching signature found."))
+ val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
+
+ // convert parameters for function (output boxing)
+ val parameters = matchingSignature
+ .zip(operands)
+ .map { case (paramClass, operandExpr) =>
+ if (paramClass.isPrimitive) {
+ operandExpr
+ } else {
+ val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType)
+ val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr)
+ val exprOrNull: String = if (codeGenerator.nullCheck) {
+ s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
+ } else {
+ boxedExpr.resultTerm
+ }
+ boxedExpr.copy(resultTerm = exprOrNull)
+ }
+ }
+
+ // generate function call
+ val functionReference = codeGenerator.addReusableFunction(scalarFunction)
+ val resultTypeTerm = if (resultClass.isPrimitive) {
+ primitiveTypeTermForTypeInfo(returnType)
+ } else {
+ boxedTypeTermForTypeInfo(returnType)
+ }
+ val resultTerm = newName("result")
+ val functionCallCode =
+ s"""
+ |${parameters.map(_.code).mkString("\n")}
+ |$resultTypeTerm $resultTerm = $functionReference.eval(
+ | ${parameters.map(_.resultTerm).mkString(", ")});
+ |""".stripMargin
+
+ // convert result of function to internal representation (input unboxing)
+ val resultUnboxing = if (resultClass.isPrimitive) {
+ codeGenerator.generateNonNullLiteral(returnType, resultTerm)
+ } else {
+ codeGenerator.generateInputFieldUnboxing(returnType, resultTerm)
+ }
+ resultUnboxing.copy(code =
+ s"""
+ |$functionCallCode
+ |${resultUnboxing.code}
+ |""".stripMargin
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
new file mode 100644
index 0000000..3f7c91f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -0,0 +1,1025 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen.calls
+
+import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
+import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.{CodeGenerator, CodeGenException, GeneratedExpression}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+
+object ScalarOperators {
+
+ def generateStringConcatOperator(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ generateOperatorIfNotNull(nullCheck, STRING_TYPE_INFO, left, right) {
+ (leftTerm, rightTerm) => s"$leftTerm + $rightTerm"
+ }
+ }
+
+ def generateArithmeticOperator(
+ operator: String,
+ nullCheck: Boolean,
+ resultType: TypeInformation[_],
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ val leftCasting = numericCasting(left.resultType, resultType)
+ val rightCasting = numericCasting(right.resultType, resultType)
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+
+ generateOperatorIfNotNull(nullCheck, resultType, left, right) {
+ (leftTerm, rightTerm) =>
+ if (isDecimal(resultType)) {
+ s"${leftCasting(leftTerm)}.${arithOpToDecMethod(operator)}(${rightCasting(rightTerm)})"
+ } else {
+ s"($resultTypeTerm) (${leftCasting(leftTerm)} $operator ${rightCasting(rightTerm)})"
+ }
+ }
+ }
+
+ def generateUnaryArithmeticOperator(
+ operator: String,
+ nullCheck: Boolean,
+ resultType: TypeInformation[_],
+ operand: GeneratedExpression)
+ : GeneratedExpression = {
+ generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) {
+ (operandTerm) =>
+ if (isDecimal(operand.resultType) && operator == "-") {
+ s"$operandTerm.negate()"
+ } else if (isDecimal(operand.resultType) && operator == "+") {
+ s"$operandTerm"
+ } else {
+ s"$operator($operandTerm)"
+ }
+ }
+ }
+
+ def generateEquals(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ // numeric types
+ if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+ generateComparison("==", nullCheck, left, right)
+ }
+ // temporal types
+ else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
+ generateComparison("==", nullCheck, left, right)
+ }
+ // array types
+ else if (isArray(left.resultType) && left.resultType == right.resultType) {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)"
+ }
+ }
+ // comparable types of same type
+ else if (isComparable(left.resultType) && left.resultType == right.resultType) {
+ generateComparison("==", nullCheck, left, right)
+ }
+ // non comparable types
+ else {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ if (isReference(left)) {
+ (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
+ }
+ else if (isReference(right)) {
+ (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
+ }
+ else {
+ throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
+ s"${right.resultType}")
+ }
+ }
+ }
+ }
+
+ def generateNotEquals(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ // numeric types
+ if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+ generateComparison("!=", nullCheck, left, right)
+ }
+ // temporal types
+ else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
+ generateComparison("!=", nullCheck, left, right)
+ }
+ // array types
+ else if (isArray(left.resultType) && left.resultType == right.resultType) {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ (leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, $rightTerm)"
+ }
+ }
+ // comparable types
+ else if (isComparable(left.resultType) && left.resultType == right.resultType) {
+ generateComparison("!=", nullCheck, left, right)
+ }
+ // non-comparable types
+ else {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ if (isReference(left)) {
+ (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
+ }
+ else if (isReference(right)) {
+ (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
+ }
+ else {
+ throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
+ s"${right.resultType}")
+ }
+ }
+ }
+ }
+
+ /**
+ * Generates comparison code for numeric types and comparable types of same type.
+ */
+ def generateComparison(
+ operator: String,
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
+ // left is decimal or both sides are decimal
+ if (isDecimal(left.resultType) && isNumeric(right.resultType)) {
+ (leftTerm, rightTerm) => {
+ val operandCasting = numericCasting(right.resultType, left.resultType)
+ s"$leftTerm.compareTo(${operandCasting(rightTerm)}) $operator 0"
+ }
+ }
+ // right is decimal
+ else if (isNumeric(left.resultType) && isDecimal(right.resultType)) {
+ (leftTerm, rightTerm) => {
+ val operandCasting = numericCasting(left.resultType, right.resultType)
+ s"${operandCasting(leftTerm)}.compareTo($rightTerm) $operator 0"
+ }
+ }
+ // both sides are numeric
+ else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+ (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+ }
+ // both sides are temporal of same type
+ else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
+ (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+ }
+ // both sides are boolean
+ else if (isBoolean(left.resultType) && left.resultType == right.resultType) {
+ operator match {
+ case "==" | "!=" => (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+ case _ => throw new CodeGenException(s"Unsupported boolean comparison '$operator'.")
+ }
+ }
+ // both sides are same comparable type
+ else if (isComparable(left.resultType) && left.resultType == right.resultType) {
+ (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0"
+ }
+ else {
+ throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
+ s"${right.resultType}")
+ }
+ }
+ }
+
+ def generateIsNull(
+ nullCheck: Boolean,
+ operand: GeneratedExpression)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val operatorCode = if (nullCheck) {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = ${operand.nullTerm};
+ |boolean $nullTerm = false;
+ |""".stripMargin
+ }
+ else if (!nullCheck && isReference(operand)) {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = ${operand.resultTerm} == null;
+ |boolean $nullTerm = false;
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = false;
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+ }
+
+ def generateIsNotNull(
+ nullCheck: Boolean,
+ operand: GeneratedExpression)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val operatorCode = if (nullCheck) {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = !${operand.nullTerm};
+ |boolean $nullTerm = false;
+ |""".stripMargin
+ }
+ else if (!nullCheck && isReference(operand)) {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = ${operand.resultTerm} != null;
+ |boolean $nullTerm = false;
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${operand.code}
+ |boolean $resultTerm = true;
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+ }
+
+ def generateAnd(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+
+ val operatorCode = if (nullCheck) {
+ // Three-valued logic:
+ // no Unknown -> Two-valued logic
+ // True && Unknown -> Unknown
+ // False && Unknown -> False
+ // Unknown && True -> Unknown
+ // Unknown && False -> False
+ // Unknown && Unknown -> Unknown
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $resultTerm;
+ |boolean $nullTerm;
+ |if (!${left.nullTerm} && !${right.nullTerm}) {
+ | $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+ | $nullTerm = false;
+ |}
+ |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = false;
+ |}
+ |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = false;
+ |}
+ |else {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+ }
+
+ def generateOr(
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+
+ val operatorCode = if (nullCheck) {
+ // Three-valued logic:
+ // no Unknown -> Two-valued logic
+ // True && Unknown -> True
+ // False && Unknown -> Unknown
+ // Unknown && True -> True
+ // Unknown && False -> Unknown
+ // Unknown && Unknown -> Unknown
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $resultTerm;
+ |boolean $nullTerm;
+ |if (!${left.nullTerm} && !${right.nullTerm}) {
+ | $resultTerm = ${left.resultTerm} || ${right.resultTerm};
+ | $nullTerm = false;
+ |}
+ |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
+ | $resultTerm = true;
+ | $nullTerm = false;
+ |}
+ |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
+ | $resultTerm = true;
+ | $nullTerm = false;
+ |}
+ |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |else {
+ | $resultTerm = false;
+ | $nullTerm = true;
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $resultTerm = ${left.resultTerm} || ${right.resultTerm};
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
+ }
+
+ def generateNot(
+ nullCheck: Boolean,
+ operand: GeneratedExpression)
+ : GeneratedExpression = {
+ // Three-valued logic:
+ // no Unknown -> Two-valued logic
+ // Unknown -> Unknown
+ generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) {
+ (operandTerm) => s"!($operandTerm)"
+ }
+ }
+
+ def generateIsTrue(operand: GeneratedExpression): GeneratedExpression = {
+ GeneratedExpression(
+ operand.resultTerm, // unknown is always false by default
+ GeneratedExpression.NEVER_NULL,
+ operand.code,
+ BOOLEAN_TYPE_INFO)
+ }
+
+ def generateIsNotTrue(operand: GeneratedExpression): GeneratedExpression = {
+ GeneratedExpression(
+ s"(!${operand.resultTerm})", // unknown is always false by default
+ GeneratedExpression.NEVER_NULL,
+ operand.code,
+ BOOLEAN_TYPE_INFO)
+ }
+
+ def generateIsFalse(operand: GeneratedExpression): GeneratedExpression = {
+ GeneratedExpression(
+ s"(!${operand.resultTerm} && !${operand.nullTerm})",
+ GeneratedExpression.NEVER_NULL,
+ operand.code,
+ BOOLEAN_TYPE_INFO)
+ }
+
+ def generateIsNotFalse(operand: GeneratedExpression): GeneratedExpression = {
+ GeneratedExpression(
+ s"(${operand.resultTerm} || ${operand.nullTerm})",
+ GeneratedExpression.NEVER_NULL,
+ operand.code,
+ BOOLEAN_TYPE_INFO)
+ }
+
+ def generateCast(
+ nullCheck: Boolean,
+ operand: GeneratedExpression,
+ targetType: TypeInformation[_])
+ : GeneratedExpression = (operand.resultType, targetType) match {
+ // identity casting
+ case (fromTp, toTp) if fromTp == toTp =>
+ operand
+
+ // Date/Time/Timestamp -> String
+ case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"${internalToTimePointCode(dtt, operandTerm)}.toString()"
+ }
+
+ // Interval Months -> String
+ case (TimeIntervalTypeInfo.INTERVAL_MONTHS, STRING_TYPE_INFO) =>
+ val method = qualifyMethod(BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method)
+ val timeUnitRange = qualifyEnum(TimeUnitRange.YEAR_TO_MONTH)
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$method($operandTerm, $timeUnitRange)"
+ }
+
+ // Interval Millis -> String
+ case (TimeIntervalTypeInfo.INTERVAL_MILLIS, STRING_TYPE_INFO) =>
+ val method = qualifyMethod(BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method)
+ val timeUnitRange = qualifyEnum(TimeUnitRange.DAY_TO_SECOND)
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$method($operandTerm, $timeUnitRange, 3)" // milli second precision
+ }
+
+ // Object array -> String
+ case (_:ObjectArrayTypeInfo[_, _], STRING_TYPE_INFO) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"java.util.Arrays.deepToString($operandTerm)"
+ }
+
+ // Primitive array -> String
+ case (_:PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"java.util.Arrays.toString($operandTerm)"
+ }
+
+ // * (not Date/Time/Timestamp) -> String
+ case (_, STRING_TYPE_INFO) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s""" "" + $operandTerm"""
+ }
+
+ // * -> Character
+ case (_, CHAR_TYPE_INFO) =>
+ throw new CodeGenException("Character type not supported.")
+
+ // String -> NUMERIC TYPE (not Character), Boolean
+ case (STRING_TYPE_INFO, _: NumericTypeInfo[_])
+ | (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) =>
+ val wrapperClass = targetType.getTypeClass.getCanonicalName
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$wrapperClass.valueOf($operandTerm)"
+ }
+
+ // String -> BigDecimal
+ case (STRING_TYPE_INFO, BIG_DEC_TYPE_INFO) =>
+ val wrapperClass = targetType.getTypeClass.getCanonicalName
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"new $wrapperClass($operandTerm)"
+ }
+
+ // String -> Date
+ case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_DATE.method)}($operandTerm)"
+ }
+
+ // String -> Time
+ case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIME.method)}($operandTerm)"
+ }
+
+ // String -> Timestamp
+ case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIMESTAMP.method)}" +
+ s"($operandTerm)"
+ }
+
+ // Boolean -> NUMERIC TYPE
+ case (BOOLEAN_TYPE_INFO, nti: NumericTypeInfo[_]) =>
+ val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)"
+ }
+
+ // Boolean -> BigDecimal
+ case (BOOLEAN_TYPE_INFO, BIG_DEC_TYPE_INFO) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$operandTerm ? java.math.BigDecimal.ONE : java.math.BigDecimal.ZERO"
+ }
+
+ // NUMERIC TYPE -> Boolean
+ case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$operandTerm != 0"
+ }
+
+ // BigDecimal -> Boolean
+ case (BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$operandTerm.compareTo(java.math.BigDecimal.ZERO) != 0"
+ }
+
+ // NUMERIC TYPE, BigDecimal -> NUMERIC TYPE, BigDecimal
+ case (_: NumericTypeInfo[_], _: NumericTypeInfo[_])
+ | (BIG_DEC_TYPE_INFO, _: NumericTypeInfo[_])
+ | (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) =>
+ val operandCasting = numericCasting(operand.resultType, targetType)
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"${operandCasting(operandTerm)}"
+ }
+
+ // Date -> Timestamp
+ case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) =>
+ s"$operandTerm * ${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY"
+ }
+
+ // Timestamp -> Date
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
+ val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) =>
+ s"($targetTypeTerm) ($operandTerm / " +
+ s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
+ }
+
+ // Time -> Timestamp
+ case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.TIMESTAMP) =>
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) => s"$operandTerm"
+ }
+
+ // Timestamp -> Time
+ case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIME) =>
+ val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
+ generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
+ (operandTerm) =>
+ s"($targetTypeTerm) ($operandTerm % " +
+ s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
+ }
+
+ // internal temporal casting
+ // Date -> Integer
+ // Time -> Integer
+ // Timestamp -> Long
+ // Integer -> Date
+ // Integer -> Time
+ // Long -> Timestamp
+ // Integer -> Interval Months
+ // Long -> Interval Millis
+ // Interval Months -> Integer
+ // Interval Millis -> Long
+ case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) |
+ (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) |
+ (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) |
+ (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) |
+ (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) |
+ (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) |
+ (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) |
+ (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
+ (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) |
+ (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) =>
+ internalExprCasting(operand, targetType)
+
+ // internal reinterpretation of temporal types
+ // Date, Time, Interval Months -> Long
+ case (SqlTimeTypeInfo.DATE, LONG_TYPE_INFO)
+ | (SqlTimeTypeInfo.TIME, LONG_TYPE_INFO)
+ | (TimeIntervalTypeInfo.INTERVAL_MONTHS, LONG_TYPE_INFO) =>
+ internalExprCasting(operand, targetType)
+
+ case (from, to) =>
+ throw new CodeGenException(s"Unsupported cast from '$from' to '$to'.")
+ }
+
+ def generateIfElse(
+ nullCheck: Boolean,
+ operands: Seq[GeneratedExpression],
+ resultType: TypeInformation[_],
+ i: Int = 0)
+ : GeneratedExpression = {
+ // else part
+ if (i == operands.size - 1) {
+ generateCast(nullCheck, operands(i), resultType)
+ }
+ else {
+ // check that the condition is boolean
+ // we do not check for null instead we use the default value
+ // thus null is false
+ requireBoolean(operands(i))
+ val condition = operands(i)
+ val trueAction = generateCast(nullCheck, operands(i + 1), resultType)
+ val falseAction = generateIfElse(nullCheck, operands, resultType, i + 2)
+
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+
+ val operatorCode = if (nullCheck) {
+ s"""
+ |${condition.code}
+ |$resultTypeTerm $resultTerm;
+ |boolean $nullTerm;
+ |if (${condition.resultTerm}) {
+ | ${trueAction.code}
+ | $resultTerm = ${trueAction.resultTerm};
+ | $nullTerm = ${trueAction.nullTerm};
+ |}
+ |else {
+ | ${falseAction.code}
+ | $resultTerm = ${falseAction.resultTerm};
+ | $nullTerm = ${falseAction.nullTerm};
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${condition.code}
+ |$resultTypeTerm $resultTerm;
+ |if (${condition.resultTerm}) {
+ | ${trueAction.code}
+ | $resultTerm = ${trueAction.resultTerm};
+ |}
+ |else {
+ | ${falseAction.code}
+ | $resultTerm = ${falseAction.resultTerm};
+ |}
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
+ }
+ }
+
+ def generateTemporalPlusMinus(
+ plus: Boolean,
+ nullCheck: Boolean,
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ : GeneratedExpression = {
+
+ val op = if (plus) "+" else "-"
+
+ (left.resultType, right.resultType) match {
+ case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r =>
+ generateArithmeticOperator(op, nullCheck, l, left, right)
+
+ case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) {
+ (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))"
+ }
+
+ case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
+ generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) {
+ (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
+ }
+
+ case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, right) {
+ (l, r) => s"$l $op ((int) ($r))"
+ }
+
+ case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) {
+ (l, r) => s"$l $op $r"
+ }
+
+ case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
+ generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) {
+ (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
+ }
+
+ case _ =>
+ throw new CodeGenException("Unsupported temporal arithmetic.")
+ }
+ }
+
+ def generateUnaryIntervalPlusMinus(
+ plus: Boolean,
+ nullCheck: Boolean,
+ operand: GeneratedExpression)
+ : GeneratedExpression = {
+ val operator = if (plus) "+" else "-"
+ generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand)
+ }
+
+ def generateArray(
+ codeGenerator: CodeGenerator,
+ resultType: TypeInformation[_],
+ elements: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ val arrayTerm = codeGenerator.addReusableArray(resultType.getTypeClass, elements.size)
+
+ val boxedElements: Seq[GeneratedExpression] = resultType match {
+
+ case oati: ObjectArrayTypeInfo[_, _] =>
+ // we box the elements to also represent null values
+ val boxedTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
+
+ elements.map { e =>
+ val boxedExpr = codeGenerator.generateOutputFieldBoxing(e)
+ val exprOrNull: String = if (codeGenerator.nullCheck) {
+ s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
+ } else {
+ boxedExpr.resultTerm
+ }
+ boxedExpr.copy(resultTerm = exprOrNull)
+ }
+
+ // no boxing necessary
+ case _: PrimitiveArrayTypeInfo[_] => elements
+ }
+
+ val code = boxedElements
+ .zipWithIndex
+ .map { case (element, idx) =>
+ s"""
+ |${element.code}
+ |$arrayTerm[$idx] = ${element.resultTerm};
+ |""".stripMargin
+ }
+ .mkString("\n")
+
+ GeneratedExpression(arrayTerm, GeneratedExpression.NEVER_NULL, code, resultType)
+ }
+
+ def generateArrayElementAt(
+ codeGenerator: CodeGenerator,
+ array: GeneratedExpression,
+ index: GeneratedExpression)
+ : GeneratedExpression = {
+
+ val resultTerm = newName("result")
+
+ array.resultType match {
+
+ // unbox object array types
+ case oati: ObjectArrayTypeInfo[_, _] =>
+ // get boxed array element
+ val resultTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
+
+ val arrayAccessCode = if (codeGenerator.nullCheck) {
+ s"""
+ |${array.code}
+ |${index.code}
+ |$resultTypeTerm $resultTerm = (${array.nullTerm} || ${index.nullTerm}) ?
+ | null : ${array.resultTerm}[${index.resultTerm} - 1];
+ |""".stripMargin
+ } else {
+ s"""
+ |${array.code}
+ |${index.code}
+ |$resultTypeTerm $resultTerm = ${array.resultTerm}[${index.resultTerm} - 1];
+ |""".stripMargin
+ }
+
+ // generate unbox code
+ val unboxing = codeGenerator.generateInputFieldUnboxing(oati.getComponentInfo, resultTerm)
+
+ unboxing.copy(code =
+ s"""
+ |$arrayAccessCode
+ |${unboxing.code}
+ |""".stripMargin
+ )
+
+ // no unboxing necessary
+ case pati: PrimitiveArrayTypeInfo[_] =>
+ generateOperatorIfNotNull(codeGenerator.nullCheck, pati.getComponentType, array, index) {
+ (leftTerm, rightTerm) => s"$leftTerm[$rightTerm - 1]"
+ }
+ }
+ }
+
+ def generateArrayElement(
+ codeGenerator: CodeGenerator,
+ array: GeneratedExpression)
+ : GeneratedExpression = {
+
+ val nullTerm = newName("isNull")
+ val resultTerm = newName("result")
+ val resultType = array.resultType match {
+ case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+ }
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+ val defaultValue = primitiveDefaultValue(resultType)
+
+ val arrayLengthCode = if (codeGenerator.nullCheck) {
+ s"${array.nullTerm} ? 0 : ${array.resultTerm}.length"
+ } else {
+ s"${array.resultTerm}.length"
+ }
+
+ val arrayAccessCode = array.resultType match {
+ case oati: ObjectArrayTypeInfo[_, _] =>
+ // generate unboxing code
+ val unboxing = codeGenerator.generateInputFieldUnboxing(
+ oati.getComponentInfo,
+ s"${array.resultTerm}[0]")
+
+ s"""
+ |${array.code}
+ |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
+ |$resultTypeTerm $resultTerm;
+ |switch ($arrayLengthCode) {
+ | case 0:
+ | ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
+ | $resultTerm = $defaultValue;
+ | break;
+ | case 1:
+ | ${unboxing.code}
+ | ${if (codeGenerator.nullCheck) s"$nullTerm = ${unboxing.nullTerm};" else "" }
+ | $resultTerm = ${unboxing.resultTerm};
+ | break;
+ | default:
+ | throw new RuntimeException("Array has more than one element.");
+ |}
+ |""".stripMargin
+
+ case pati: PrimitiveArrayTypeInfo[_] =>
+ s"""
+ |${array.code}
+ |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
+ |$resultTypeTerm $resultTerm;
+ |switch ($arrayLengthCode) {
+ | case 0:
+ | ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
+ | $resultTerm = $defaultValue;
+ | break;
+ | case 1:
+ | ${if (codeGenerator.nullCheck) s"$nullTerm = false;" else "" }
+ | $resultTerm = ${array.resultTerm}[0];
+ | break;
+ | default:
+ | throw new RuntimeException("Array has more than one element.");
+ |}
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, arrayAccessCode, resultType)
+ }
+
+ def generateArrayCardinality(
+ nullCheck: Boolean,
+ array: GeneratedExpression)
+ : GeneratedExpression = {
+
+ generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, array) {
+ (operandTerm) => s"${array.resultTerm}.length"
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def generateUnaryOperatorIfNotNull(
+ nullCheck: Boolean,
+ resultType: TypeInformation[_],
+ operand: GeneratedExpression)
+ (expr: (String) => String)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+ val defaultValue = primitiveDefaultValue(resultType)
+
+ val operatorCode = if (nullCheck) {
+ s"""
+ |${operand.code}
+ |$resultTypeTerm $resultTerm;
+ |boolean $nullTerm;
+ |if (!${operand.nullTerm}) {
+ | $resultTerm = ${expr(operand.resultTerm)};
+ | $nullTerm = false;
+ |}
+ |else {
+ | $resultTerm = $defaultValue;
+ | $nullTerm = true;
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${operand.code}
+ |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)};
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
+ }
+
+ private def generateOperatorIfNotNull(
+ nullCheck: Boolean,
+ resultType: TypeInformation[_],
+ left: GeneratedExpression,
+ right: GeneratedExpression)
+ (expr: (String, String) => String)
+ : GeneratedExpression = {
+ val resultTerm = newName("result")
+ val nullTerm = newName("isNull")
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+ val defaultValue = primitiveDefaultValue(resultType)
+
+ val resultCode = if (nullCheck) {
+ s"""
+ |${left.code}
+ |${right.code}
+ |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm};
+ |$resultTypeTerm $resultTerm;
+ |if ($nullTerm) {
+ | $resultTerm = $defaultValue;
+ |}
+ |else {
+ | $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
+ |}
+ |""".stripMargin
+ }
+ else {
+ s"""
+ |${left.code}
+ |${right.code}
+ |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
+ |""".stripMargin
+ }
+
+ GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
+ }
+
+ private def internalExprCasting(
+ expr: GeneratedExpression,
+ typeInfo: TypeInformation[_])
+ : GeneratedExpression = {
+ GeneratedExpression(expr.resultTerm, expr.nullTerm, expr.code, typeInfo)
+ }
+
+ private def arithOpToDecMethod(operator: String): String = operator match {
+ case "+" => "add"
+ case "-" => "subtract"
+ case "*" => "multiply"
+ case "/" => "divide"
+ case "%" => "remainder"
+ case _ => throw new CodeGenException("Unsupported decimal arithmetic operator.")
+ }
+
+ private def numericCasting(
+ operandType: TypeInformation[_],
+ resultType: TypeInformation[_])
+ : (String) => String = {
+
+ def decToPrimMethod(targetType: TypeInformation[_]): String = targetType match {
+ case BYTE_TYPE_INFO => "byteValueExact"
+ case SHORT_TYPE_INFO => "shortValueExact"
+ case INT_TYPE_INFO => "intValueExact"
+ case LONG_TYPE_INFO => "longValueExact"
+ case FLOAT_TYPE_INFO => "floatValue"
+ case DOUBLE_TYPE_INFO => "doubleValue"
+ case _ => throw new CodeGenException("Unsupported decimal casting type.")
+ }
+
+ val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
+ // no casting necessary
+ if (operandType == resultType) {
+ (operandTerm) => s"$operandTerm"
+ }
+ // result type is decimal but numeric operand is not
+ else if (isDecimal(resultType) && !isDecimal(operandType) && isNumeric(operandType)) {
+ (operandTerm) =>
+ s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) $operandTerm)"
+ }
+ // numeric result type is not decimal but operand is
+ else if (isNumeric(resultType) && !isDecimal(resultType) && isDecimal(operandType) ) {
+ (operandTerm) => s"$operandTerm.${decToPrimMethod(resultType)}()"
+ }
+ // result type and operand type are numeric but not decimal
+ else if (isNumeric(operandType) && isNumeric(resultType)
+ && !isDecimal(operandType) && !isDecimal(resultType)) {
+ (operandTerm) => s"(($resultTypeTerm) $operandTerm)"
+ }
+ else {
+ throw new CodeGenException(s"Unsupported casting from $operandType to $resultType.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
new file mode 100644
index 0000000..50c569f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.GeneratedExpression.NEVER_NULL
+import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+
+/**
+ * Generates a call to user-defined [[TableFunction]].
+ *
+ * @param tableFunction user-defined [[TableFunction]] that might be overloaded
+ * @param signature actual signature with which the function is called
+ * @param returnType actual return type required by the surrounding
+ */
+class TableFunctionCallGen(
+ tableFunction: TableFunction[_],
+ signature: Seq[TypeInformation[_]],
+ returnType: TypeInformation[_])
+ extends CallGenerator {
+
+ override def generate(
+ codeGenerator: CodeGenerator,
+ operands: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ // determine function signature
+ val matchingSignature = getSignature(tableFunction, signature)
+ .getOrElse(throw new CodeGenException("No matching signature found."))
+
+ // convert parameters for function (output boxing)
+ val parameters = matchingSignature
+ .zip(operands)
+ .map { case (paramClass, operandExpr) =>
+ if (paramClass.isPrimitive) {
+ operandExpr
+ } else {
+ val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType)
+ val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr)
+ val exprOrNull: String = if (codeGenerator.nullCheck) {
+ s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
+ } else {
+ boxedExpr.resultTerm
+ }
+ boxedExpr.copy(resultTerm = exprOrNull)
+ }
+ }
+
+ // generate function call
+ val functionReference = codeGenerator.addReusableFunction(tableFunction)
+ val functionCallCode =
+ s"""
+ |${parameters.map(_.code).mkString("\n")}
+ |$functionReference.clear();
+ |$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")});
+ |""".stripMargin
+
+ // has no result
+ GeneratedExpression(
+ functionReference,
+ NEVER_NULL,
+ functionCallCode,
+ returnType)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TrimCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TrimCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TrimCallGen.scala
new file mode 100644
index 0000000..9d50bf9
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TrimCallGen.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING}
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.calls.CallGenerator._
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+ * Generates a TRIM function call.
+ *
+ * First operand: trim mode (see [[org.apache.calcite.sql.fun.SqlTrimFunction.Flag]])
+ * Second operand: String to be removed
+ * Third operand: String to be trimmed
+ */
+class TrimCallGen extends CallGenerator {
+
+ override def generate(
+ codeGenerator: CodeGenerator,
+ operands: Seq[GeneratedExpression])
+ : GeneratedExpression = {
+ generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) {
+ (terms) =>
+ val leading = compareEnum(terms.head, BOTH) || compareEnum(terms.head, LEADING)
+ val trailing = compareEnum(terms.head, BOTH) || compareEnum(terms.head, TRAILING)
+ s"""
+ |${qualifyMethod(BuiltInMethod.TRIM.method)}(
+ | $leading, $trailing, ${terms(1)}, ${terms(2)})
+ |""".stripMargin
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
new file mode 100644
index 0000000..0d60dc1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+/**
+ * Describes a generated expression.
+ *
+ * @param resultTerm term to access the result of the expression
+ * @param nullTerm boolean term that indicates if expression is null
+ * @param code code necessary to produce resultTerm and nullTerm
+ * @param resultType type of the resultTerm
+ */
+case class GeneratedExpression(
+ resultTerm: String,
+ nullTerm: String,
+ code: String,
+ resultType: TypeInformation[_])
+
+object GeneratedExpression {
+ val ALWAYS_NULL = "true"
+ val NEVER_NULL = "false"
+ val NO_CODE = ""
+}
+
+case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String)
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/package.scala
new file mode 100644
index 0000000..743f846
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/package.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table
+
+package object codegen {
+ // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not thread safe. We might
+ // have several parallel expression operators in one TaskManager, therefore we need to guard
+ // these operations.
+ object ReflectionLock
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
new file mode 100644
index 0000000..6d16722
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamSQLExample.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.TableEnvironment
+
+/**
+ * Simple example for demonstrating the use of SQL on a Stream Table.
+ *
+ * This example shows how to:
+ * - Convert DataStreams to Tables
+ * - Register a Table under a name
+ * - Run a StreamSQL query on the registered Table
+ *
+ */
+object StreamSQLExample {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ def main(args: Array[String]): Unit = {
+
+ // set up execution environment
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val orderA: DataStream[Order] = env.fromCollection(Seq(
+ Order(1L, "beer", 3),
+ Order(1L, "diaper", 4),
+ Order(3L, "rubber", 2)))
+
+ val orderB: DataStream[Order] = env.fromCollection(Seq(
+ Order(2L, "pen", 3),
+ Order(2L, "rubber", 3),
+ Order(4L, "beer", 1)))
+
+ // register the DataStreams under the name "OrderA" and "OrderB"
+ tEnv.registerDataStream("OrderA", orderA, 'user, 'product, 'amount)
+ tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
+
+ // union the two tables
+ val result = tEnv.sql(
+ "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
+ "SELECT * FROM OrderB WHERE amount < 2")
+
+ result.toDataStream[Order].print()
+
+ env.execute()
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class Order(user: Long, product: String, amount: Int)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
new file mode 100644
index 0000000..6c1467f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/StreamTableExample.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.TableEnvironment
+
+/**
+ * Simple example for demonstrating the use of Table API on a Stream Table.
+ *
+ * This example shows how to:
+ * - Convert DataStreams to Tables
+ * - Apply union, select, and filter operations
+ *
+ */
+object StreamTableExample {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ def main(args: Array[String]): Unit = {
+
+ // set up execution environment
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val orderA = env.fromCollection(Seq(
+ Order(1L, "beer", 3),
+ Order(1L, "diaper", 4),
+ Order(3L, "rubber", 2))).toTable(tEnv)
+
+ val orderB = env.fromCollection(Seq(
+ Order(2L, "pen", 3),
+ Order(2L, "rubber", 3),
+ Order(4L, "beer", 1))).toTable(tEnv)
+
+ // union the two tables
+ val result: DataStream[Order] = orderA.unionAll(orderB)
+ .select('user, 'product, 'amount)
+ .where('amount > 2)
+ .toDataStream[Order]
+
+ result.print()
+
+ env.execute()
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class Order(user: Long, product: String, amount: Int)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala
new file mode 100644
index 0000000..74afb06
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/TPCHQuery3Table.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+
+/**
+ * This program implements a modified version of the TPC-H query 3. The
+ * example demonstrates how to assign names to fields by extending the Tuple class.
+ * The original query can be found at
+ * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
+ * (page 29).
+ *
+ * This program implements the following SQL equivalent:
+ *
+ * {{{
+ * SELECT
+ * l_orderkey,
+ * SUM(l_extendedprice*(1-l_discount)) AS revenue,
+ * o_orderdate,
+ * o_shippriority
+ * FROM customer,
+ * orders,
+ * lineitem
+ * WHERE
+ * c_mktsegment = '[SEGMENT]'
+ * AND c_custkey = o_custkey
+ * AND l_orderkey = o_orderkey
+ * AND o_orderdate < date '[DATE]'
+ * AND l_shipdate > date '[DATE]'
+ * GROUP BY
+ * l_orderkey,
+ * o_orderdate,
+ * o_shippriority
+ * ORDER BY
+ * revenue desc,
+ * o_orderdate;
+ * }}}
+ *
+ * Input files are plain text CSV files using the pipe character ('|') as field separator
+ * as generated by the TPC-H data generator which is available at
+ * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
+ *
+ * Usage:
+ * {{{
+ * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>
+ * }}}
+ *
+ * This example shows how to:
+ * - Convert DataSets to Tables
+ * - Use Table API expressions
+ *
+ */
+object TPCHQuery3Table {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ // set filter date
+ val date = "1995-03-12".toDate
+
+ // get execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val lineitems = getLineitemDataSet(env)
+ .toTable(tEnv, 'id, 'extdPrice, 'discount, 'shipDate)
+ .filter('shipDate.toDate > date)
+
+ val customers = getCustomerDataSet(env)
+ .toTable(tEnv, 'id, 'mktSegment)
+ .filter('mktSegment === "AUTOMOBILE")
+
+ val orders = getOrdersDataSet(env)
+ .toTable(tEnv, 'orderId, 'custId, 'orderDate, 'shipPrio)
+ .filter('orderDate.toDate < date)
+
+ val items =
+ orders.join(customers)
+ .where('custId === 'id)
+ .select('orderId, 'orderDate, 'shipPrio)
+ .join(lineitems)
+ .where('orderId === 'id)
+ .select(
+ 'orderId,
+ 'extdPrice * (1.0f.toExpr - 'discount) as 'revenue,
+ 'orderDate,
+ 'shipPrio)
+
+ val result = items
+ .groupBy('orderId, 'orderDate, 'shipPrio)
+ .select('orderId, 'revenue.sum as 'revenue, 'orderDate, 'shipPrio)
+ .orderBy('revenue.desc, 'orderDate.asc)
+
+ // emit result
+ result.writeAsCsv(outputPath, "\n", "|")
+
+ // execute program
+ env.execute("Scala TPCH Query 3 (Table API Expression) Example")
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: String)
+ case class Customer(id: Long, mktSegment: String)
+ case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private var lineitemPath: String = _
+ private var customerPath: String = _
+ private var ordersPath: String = _
+ private var outputPath: String = _
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length == 4) {
+ lineitemPath = args(0)
+ customerPath = args(1)
+ ordersPath = args(2)
+ outputPath = args(3)
+ true
+ } else {
+ System.err.println("This program expects data from the TPC-H benchmark as input data.\n" +
+ " Due to legal restrictions, we can not ship generated data.\n" +
+ " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" +
+ " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> " +
+ "<orders-csv path> <result path>")
+ false
+ }
+ }
+
+ private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] = {
+ env.readCsvFile[Lineitem](
+ lineitemPath,
+ fieldDelimiter = "|",
+ includedFields = Array(0, 5, 6, 10) )
+ }
+
+ private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] = {
+ env.readCsvFile[Customer](
+ customerPath,
+ fieldDelimiter = "|",
+ includedFields = Array(0, 6) )
+ }
+
+ private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
+ env.readCsvFile[Order](
+ ordersPath,
+ fieldDelimiter = "|",
+ includedFields = Array(0, 1, 4, 7) )
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
new file mode 100644
index 0000000..a8b8268
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountSQL.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+
+/**
+ * Simple example that shows how the Batch SQL API is used in Scala.
+ *
+ * This example shows how to:
+ * - Convert DataSets to Tables
+ * - Register a Table under a name
+ * - Run a SQL query on the registered Table
+ *
+ */
+object WordCountSQL {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ def main(args: Array[String]): Unit = {
+
+ // set up execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+
+ // register the DataSet as table "WordCount"
+ tEnv.registerDataSet("WordCount", input, 'word, 'frequency)
+
+ // run a SQL query on the Table and retrieve the result as a new Table
+ val table = tEnv.sql("SELECT word, SUM(frequency) FROM WordCount GROUP BY word")
+
+ table.toDataSet[WC].print()
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class WC(word: String, frequency: Long)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
new file mode 100644
index 0000000..75ea8ce
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/examples/scala/WordCountTable.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.scala
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+
+/**
+ * Simple example for demonstrating the use of the Table API for a Word Count in Scala.
+ *
+ * This example shows how to:
+ * - Convert DataSets to Tables
+ * - Apply group, aggregate, select, and filter operations
+ *
+ */
+object WordCountTable {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ def main(args: Array[String]): Unit = {
+
+ // set up execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
+ val expr = input.toTable(tEnv)
+ val result = expr
+ .groupBy('word)
+ .select('word, 'frequency.sum as 'frequency)
+ .filter('frequency === 2)
+ .toDataSet[WC]
+
+ result.print()
+ }
+
+ // *************************************************************************
+ // USER DATA TYPES
+ // *************************************************************************
+
+ case class WC(word: String, frequency: Long)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
new file mode 100644
index 0000000..14d899d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/Expression.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.plan.TreeNode
+import org.apache.flink.table.validate.{ValidationResult, ValidationSuccess}
+
+abstract class Expression extends TreeNode[Expression] {
+ /**
+ * Returns the [[TypeInformation]] for evaluating this expression.
+ * It is sometimes not available until the expression is valid.
+ */
+ private[flink] def resultType: TypeInformation[_]
+
+ /**
+ * One pass validation of the expression tree in post order.
+ */
+ private[flink] lazy val valid: Boolean = childrenValid && validateInput().isSuccess
+
+ private[flink] def childrenValid: Boolean = children.forall(_.valid)
+
+ /**
+ * Check input data types, inputs number or other properties specified by this expression.
+ * Return `ValidationSuccess` if it pass the check,
+ * or `ValidationFailure` with supplement message explaining the error.
+ * Note: we should only call this method until `childrenValid == true`
+ */
+ private[flink] def validateInput(): ValidationResult = ValidationSuccess
+
+ /**
+ * Convert Expression to its counterpart in Calcite, i.e. RexNode
+ */
+ private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+ throw new UnsupportedOperationException(
+ s"${this.getClass.getName} cannot be transformed to RexNode"
+ )
+
+ private[flink] def checkEquals(other: Expression): Boolean = {
+ if (this.getClass != other.getClass) {
+ false
+ } else {
+ def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
+ elements1.length == elements2.length && elements1.zip(elements2).forall {
+ case (e1: Expression, e2: Expression) => e1.checkEquals(e2)
+ case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
+ case (i1, i2) => i1 == i2
+ }
+ }
+ val elements1 = this.productIterator.toSeq
+ val elements2 = other.productIterator.toSeq
+ checkEquality(elements1, elements2)
+ }
+ }
+}
+
+abstract class BinaryExpression extends Expression {
+ private[flink] def left: Expression
+ private[flink] def right: Expression
+ private[flink] def children = Seq(left, right)
+}
+
+abstract class UnaryExpression extends Expression {
+ private[flink] def child: Expression
+ private[flink] def children = Seq(child)
+}
+
+abstract class LeafExpression extends Expression {
+ private[flink] val children = Nil
+}