You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2016/02/05 17:53:32 UTC

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/1595

    [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs

    This PR implements a CodeGenerator for Table API on Calcite.
    
    It includes the following:
    - FilterITCase, SelectITCase and ExpressionITCase are mostly working again
    - Null values are now fully supported and tested with new `TableProgramsTestBase`
    - Arithmetic `+-*/ +X -X`, logical operators `== != AND, OR, NOT, IS NULL, NOT NULL` are supported.
    - Logical operators implement 3-valued logic.
    - A new configuration parameter `efficientTypeUsage` allows generated DataSet programs to be as efficient as ordinary DataSet programs by avoiding unnecessary type conversions and using most efficient type for every operator.
    
    Limitations:
    - String functions are not yet supported. They will be forwarded to Calcites runtime functions once I have implemented the necessary interfaces for that.
    - Expression type casting is missing yet
    - Date Literal is not supported yet

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink CodeGen

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1595.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1595
    
----
commit c1a63287de0014f0b5bd23d1e0bc31e679f02fbe
Author: twalthr <tw...@apache.org>
Date:   2016-02-05T16:40:54Z

    [FLINK-3226] Implement a CodeGenerator for an efficient translation to DataSet programs

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52293343
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
    +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +
    +object CodeGenUtils {
    +
    +  private val nameCounter = new AtomicInteger
    +
    +  def newName(name: String): String = {
    +    s"$name$$${nameCounter.getAndIncrement}"
    +  }
    +
    +  // when casting we first need to unbox Primitives, for example,
    +  // float a = 1.0f;
    +  // byte b = (byte) a;
    +  // works, but for boxed types we need this:
    +  // Float a = 1.0f;
    +  // Byte b = (byte)(float) a;
    +  def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
    +    case INT_TYPE_INFO => "int"
    +    case LONG_TYPE_INFO => "long"
    +    case SHORT_TYPE_INFO => "short"
    +    case BYTE_TYPE_INFO => "byte"
    +    case FLOAT_TYPE_INFO => "float"
    +    case DOUBLE_TYPE_INFO => "double"
    +    case BOOLEAN_TYPE_INFO => "boolean"
    +    case CHAR_TYPE_INFO => "char"
    +
    +    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
    +    // does not seem to like this, so we manually give the correct type here.
    +    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
    +    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
    +    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
    +    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
    +    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
    +    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
    +    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
    +    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
    +
    +    case _ =>
    +      tpe.getTypeClass.getCanonicalName
    +  }
    +
    +  def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
    +    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
    +    // does not seem to like this, so we manually give the correct type here.
    +    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
    +    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
    +    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
    +    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
    +    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
    +    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
    +    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
    +    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
    +
    +    case _ =>
    +      tpe.getTypeClass.getCanonicalName
    +  }
    +
    +  def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
    +    case INT_TYPE_INFO => "-1"
    +    case LONG_TYPE_INFO => "-1"
    +    case SHORT_TYPE_INFO => "-1"
    +    case BYTE_TYPE_INFO => "-1"
    +    case FLOAT_TYPE_INFO => "-1.0f"
    +    case DOUBLE_TYPE_INFO => "-1.0d"
    +    case BOOLEAN_TYPE_INFO => "false"
    +    case STRING_TYPE_INFO => "\"<empty>\""
    +    case CHAR_TYPE_INFO => "'\\0'"
    +    case _ => "null"
    +  }
    +
    +  def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case nti: NumericTypeInfo[_] => // ok
    +    case _ => throw new CodeGenException("Numeric expression type expected.")
    +  }
    +
    +  def requireString(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case STRING_TYPE_INFO => // ok
    +    case _ => throw new CodeGenException("String expression type expected.")
    +  }
    +
    +  def requireBoolean(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case BOOLEAN_TYPE_INFO => // ok
    +    case _ => throw new CodeGenException("Boolean expression type expected.")
    +  }
    +
    +  def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType)
    +
    +  def isReference(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case INT_TYPE_INFO
    +         | LONG_TYPE_INFO
    +         | SHORT_TYPE_INFO
    +         | BYTE_TYPE_INFO
    +         | FLOAT_TYPE_INFO
    +         | DOUBLE_TYPE_INFO
    +         | BOOLEAN_TYPE_INFO
    +         | CHAR_TYPE_INFO => false
    +    case _ => true
    +  }
    +
    +  def isNumeric(genExpr: GeneratedExpression): Boolean = isNumeric(genExpr.resultType)
    +
    +  def isNumeric(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case nti: NumericTypeInfo[_] => true
    +    case _ => false
    +  }
    +
    +  def isString(genExpr: GeneratedExpression): Boolean = isString(genExpr.resultType)
    +
    +  def isString(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case STRING_TYPE_INFO => true
    +    case _ => false
    +  }
    +
    +  def isBoolean(genExpr: GeneratedExpression): Boolean = isBoolean(genExpr.resultType)
    +
    +  def isBoolean(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case BOOLEAN_TYPE_INFO => true
    +    case _ => false
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  sealed abstract class FieldAccessor
    +
    +  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
    +
    +  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
    +
    +  case class ProductAccessor(i: Int) extends FieldAccessor
    +
    +  def fieldAccessorFor(compType: CompositeType[_], index: Int): FieldAccessor = {
    +    compType match {
    +      case ri: RowTypeInfo =>
    +        ProductAccessor(index)
    +
    +      case cc: CaseClassTypeInfo[_] =>
    +        ObjectMethodAccessor(cc.getFieldNames()(index))
    +
    +      case javaTup: TupleTypeInfo[_] =>
    +        ObjectFieldAccessor("f" + index)
    +
    +      case pj: PojoTypeInfo[_] =>
    +        ObjectFieldAccessor(pj.getFieldNames()(index))
    --- End diff --
    
    PojoFields may be private. 
    You need to use reflection and previously make the field accessible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52303126
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
    +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +
    +object CodeGenUtils {
    +
    +  private val nameCounter = new AtomicInteger
    +
    +  def newName(name: String): String = {
    +    s"$name$$${nameCounter.getAndIncrement}"
    +  }
    +
    +  // when casting we first need to unbox Primitives, for example,
    +  // float a = 1.0f;
    +  // byte b = (byte) a;
    +  // works, but for boxed types we need this:
    +  // Float a = 1.0f;
    +  // Byte b = (byte)(float) a;
    +  def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
    +    case INT_TYPE_INFO => "int"
    +    case LONG_TYPE_INFO => "long"
    +    case SHORT_TYPE_INFO => "short"
    +    case BYTE_TYPE_INFO => "byte"
    +    case FLOAT_TYPE_INFO => "float"
    +    case DOUBLE_TYPE_INFO => "double"
    +    case BOOLEAN_TYPE_INFO => "boolean"
    +    case CHAR_TYPE_INFO => "char"
    +
    +    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
    +    // does not seem to like this, so we manually give the correct type here.
    +    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
    +    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
    +    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
    +    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
    +    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
    +    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
    +    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
    +    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
    +
    +    case _ =>
    +      tpe.getTypeClass.getCanonicalName
    +  }
    +
    +  def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
    +    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
    +    // does not seem to like this, so we manually give the correct type here.
    +    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
    +    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
    +    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
    +    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
    +    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
    +    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
    +    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
    +    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
    +
    +    case _ =>
    +      tpe.getTypeClass.getCanonicalName
    +  }
    +
    +  def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
    +    case INT_TYPE_INFO => "-1"
    +    case LONG_TYPE_INFO => "-1"
    +    case SHORT_TYPE_INFO => "-1"
    +    case BYTE_TYPE_INFO => "-1"
    +    case FLOAT_TYPE_INFO => "-1.0f"
    +    case DOUBLE_TYPE_INFO => "-1.0d"
    +    case BOOLEAN_TYPE_INFO => "false"
    +    case STRING_TYPE_INFO => "\"<empty>\""
    +    case CHAR_TYPE_INFO => "'\\0'"
    +    case _ => "null"
    +  }
    +
    +  def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case nti: NumericTypeInfo[_] => // ok
    +    case _ => throw new CodeGenException("Numeric expression type expected.")
    +  }
    +
    +  def requireString(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case STRING_TYPE_INFO => // ok
    +    case _ => throw new CodeGenException("String expression type expected.")
    +  }
    +
    +  def requireBoolean(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case BOOLEAN_TYPE_INFO => // ok
    +    case _ => throw new CodeGenException("Boolean expression type expected.")
    +  }
    +
    +  def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType)
    +
    +  def isReference(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case INT_TYPE_INFO
    +         | LONG_TYPE_INFO
    +         | SHORT_TYPE_INFO
    +         | BYTE_TYPE_INFO
    +         | FLOAT_TYPE_INFO
    +         | DOUBLE_TYPE_INFO
    +         | BOOLEAN_TYPE_INFO
    +         | CHAR_TYPE_INFO => false
    +    case _ => true
    +  }
    +
    +  def isNumeric(genExpr: GeneratedExpression): Boolean = isNumeric(genExpr.resultType)
    +
    +  def isNumeric(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case nti: NumericTypeInfo[_] => true
    +    case _ => false
    +  }
    +
    +  def isString(genExpr: GeneratedExpression): Boolean = isString(genExpr.resultType)
    +
    +  def isString(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case STRING_TYPE_INFO => true
    +    case _ => false
    +  }
    +
    +  def isBoolean(genExpr: GeneratedExpression): Boolean = isBoolean(genExpr.resultType)
    +
    +  def isBoolean(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case BOOLEAN_TYPE_INFO => true
    +    case _ => false
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  sealed abstract class FieldAccessor
    +
    +  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
    +
    +  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
    +
    +  case class ProductAccessor(i: Int) extends FieldAccessor
    +
    +  def fieldAccessorFor(compType: CompositeType[_], index: Int): FieldAccessor = {
    +    compType match {
    +      case ri: RowTypeInfo =>
    +        ProductAccessor(index)
    +
    +      case cc: CaseClassTypeInfo[_] =>
    +        ObjectMethodAccessor(cc.getFieldNames()(index))
    +
    +      case javaTup: TupleTypeInfo[_] =>
    +        ObjectFieldAccessor("f" + index)
    +
    +      case pj: PojoTypeInfo[_] =>
    +        ObjectFieldAccessor(pj.getFieldNames()(index))
    --- End diff --
    
    The PojoSerializer and PojoComparator always use reflection to access the fields to avoid any side effects when calling the getters and setters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr closed the pull request at:

    https://github.com/apache/flink/pull/1595


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1595#issuecomment-181824403
  
    Hi Timo, the PR looks really good :-)
    
    I found the following issues / questions:
    - Accessing of POJO fields might not work.
    - Can you add method comments to the code generation methods in `CodeGenerator` and `CodeGenUtils`?
    - Would it make sense to separate the function and expression code gen, i.e., split the `CodeGenerator` class? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52301780
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
    +import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +
    +object CodeGenUtils {
    +
    +  private val nameCounter = new AtomicInteger
    +
    +  def newName(name: String): String = {
    +    s"$name$$${nameCounter.getAndIncrement}"
    +  }
    +
    +  // when casting we first need to unbox Primitives, for example,
    +  // float a = 1.0f;
    +  // byte b = (byte) a;
    +  // works, but for boxed types we need this:
    +  // Float a = 1.0f;
    +  // Byte b = (byte)(float) a;
    +  def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
    +    case INT_TYPE_INFO => "int"
    +    case LONG_TYPE_INFO => "long"
    +    case SHORT_TYPE_INFO => "short"
    +    case BYTE_TYPE_INFO => "byte"
    +    case FLOAT_TYPE_INFO => "float"
    +    case DOUBLE_TYPE_INFO => "double"
    +    case BOOLEAN_TYPE_INFO => "boolean"
    +    case CHAR_TYPE_INFO => "char"
    +
    +    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
    +    // does not seem to like this, so we manually give the correct type here.
    +    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
    +    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
    +    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
    +    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
    +    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
    +    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
    +    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
    +    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
    +
    +    case _ =>
    +      tpe.getTypeClass.getCanonicalName
    +  }
    +
    +  def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match {
    +    // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections
    +    // does not seem to like this, so we manually give the correct type here.
    +    case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
    +    case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
    +    case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
    +    case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
    +    case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
    +    case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
    +    case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
    +    case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
    +
    +    case _ =>
    +      tpe.getTypeClass.getCanonicalName
    +  }
    +
    +  def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
    +    case INT_TYPE_INFO => "-1"
    +    case LONG_TYPE_INFO => "-1"
    +    case SHORT_TYPE_INFO => "-1"
    +    case BYTE_TYPE_INFO => "-1"
    +    case FLOAT_TYPE_INFO => "-1.0f"
    +    case DOUBLE_TYPE_INFO => "-1.0d"
    +    case BOOLEAN_TYPE_INFO => "false"
    +    case STRING_TYPE_INFO => "\"<empty>\""
    +    case CHAR_TYPE_INFO => "'\\0'"
    +    case _ => "null"
    +  }
    +
    +  def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case nti: NumericTypeInfo[_] => // ok
    +    case _ => throw new CodeGenException("Numeric expression type expected.")
    +  }
    +
    +  def requireString(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case STRING_TYPE_INFO => // ok
    +    case _ => throw new CodeGenException("String expression type expected.")
    +  }
    +
    +  def requireBoolean(genExpr: GeneratedExpression) = genExpr.resultType match {
    +    case BOOLEAN_TYPE_INFO => // ok
    +    case _ => throw new CodeGenException("Boolean expression type expected.")
    +  }
    +
    +  def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType)
    +
    +  def isReference(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case INT_TYPE_INFO
    +         | LONG_TYPE_INFO
    +         | SHORT_TYPE_INFO
    +         | BYTE_TYPE_INFO
    +         | FLOAT_TYPE_INFO
    +         | DOUBLE_TYPE_INFO
    +         | BOOLEAN_TYPE_INFO
    +         | CHAR_TYPE_INFO => false
    +    case _ => true
    +  }
    +
    +  def isNumeric(genExpr: GeneratedExpression): Boolean = isNumeric(genExpr.resultType)
    +
    +  def isNumeric(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case nti: NumericTypeInfo[_] => true
    +    case _ => false
    +  }
    +
    +  def isString(genExpr: GeneratedExpression): Boolean = isString(genExpr.resultType)
    +
    +  def isString(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case STRING_TYPE_INFO => true
    +    case _ => false
    +  }
    +
    +  def isBoolean(genExpr: GeneratedExpression): Boolean = isBoolean(genExpr.resultType)
    +
    +  def isBoolean(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
    +    case BOOLEAN_TYPE_INFO => true
    +    case _ => false
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  sealed abstract class FieldAccessor
    +
    +  case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
    +
    +  case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
    +
    +  case class ProductAccessor(i: Int) extends FieldAccessor
    +
    +  def fieldAccessorFor(compType: CompositeType[_], index: Int): FieldAccessor = {
    +    compType match {
    +      case ri: RowTypeInfo =>
    +        ProductAccessor(index)
    +
    +      case cc: CaseClassTypeInfo[_] =>
    +        ObjectMethodAccessor(cc.getFieldNames()(index))
    +
    +      case javaTup: TupleTypeInfo[_] =>
    +        ObjectFieldAccessor("f" + index)
    +
    +      case pj: PojoTypeInfo[_] =>
    +        ObjectFieldAccessor(pj.getFieldNames()(index))
    --- End diff --
    
    I forgot to mention this in the limitations list. I will implement a case distinction and call the getters for private fields in my next commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52301613
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---
    @@ -0,0 +1,661 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable._
    +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction}
    +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.TableConfig
    +import org.apache.flink.api.table.codegen.CodeGenUtils._
    +import org.apache.flink.api.table.codegen.Indenter._
    --- End diff --
    
    I had the same problem with IntelliJ. But if you remove the import, the `j"..."` will not work anymore. Actually I think we don't need the Indenter at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1595#issuecomment-182390675
  
    Hey @twalthr,
    I merged this yesterday but the PR didn't automatically close. Could you please close it manually? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52291201
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---
    @@ -0,0 +1,661 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable._
    +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction}
    +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.TableConfig
    +import org.apache.flink.api.table.codegen.CodeGenUtils._
    +import org.apache.flink.api.table.codegen.Indenter._
    +import org.apache.flink.api.table.codegen.OperatorCodeGen._
    +import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable
    +
    +class CodeGenerator(
    +    config: TableConfig,
    +    input1: TypeInformation[Any],
    +    input2: Option[TypeInformation[Any]] = None)
    +  extends RexVisitor[GeneratedExpression] {
    +
    +  // set of member statements that will be added only once
    +  // we use a LinkedHashSet to keep the insertion order
    +  private val reusableMemberStatements = mutable.LinkedHashSet[String]()
    +
    +  // set of constructor statements that will be added only once
    +  // we use a LinkedHashSet to keep the insertion order
    +  private val reusableInitStatements = mutable.LinkedHashSet[String]()
    +
    +  // map of initial input unboxing expressions that will be added only once
    +  // (inputTerm, index) -> expr
    +  private val reusableInputUnboxingExprs = mutable.Map[(String, Int), GeneratedExpression]()
    +
    +  def reuseMemberCode(): String = {
    +    reusableMemberStatements.mkString("", "\n", "\n")
    +  }
    +
    +  def reuseInitCode(): String = {
    +    reusableInitStatements.mkString("", "\n", "\n")
    +  }
    +
    +  def reuseInputUnboxingCode(): String = {
    +    reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n")
    +  }
    +
    +  def input1Term = "in1"
    +
    +  def input2Term = "in2"
    +
    +  def collectorTerm = "c"
    +
    +  def outRecordTerm = "out"
    +
    +  def nullCheck: Boolean = config.getNullCheck
    +
    +  def generateExpression(rex: RexNode): GeneratedExpression = {
    +    rex.accept(this)
    +  }
    +
    +  def generateFunction[T <: Function](
    +      name: String,
    +      clazz: Class[T],
    +      bodyCode: String,
    +      returnType: TypeInformation[Any])
    +    : GeneratedFunction[T] = {
    +    val funcName = newName(name)
    +
    +    // Janino does not support generics, that's why we need
    +    // manual casting here
    +    val samHeader =
    +      if (clazz == classOf[FlatMapFunction[_,_]]) {
    +        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    +        (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
    +          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +      } else if (clazz == classOf[MapFunction[_,_]]) {
    +        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    +        ("Object map(Object _in1)",
    +          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +      } else {
    +        // TODO more functions
    +        throw new CodeGenException("Unsupported Function.")
    +      }
    +
    +    val funcCode = j"""
    +      public class $funcName
    +          implements ${clazz.getCanonicalName} {
    +
    +        ${reuseMemberCode()}
    +
    +        public $funcName() {
    +          ${reuseInitCode()}
    +        }
    +
    +        @Override
    +        public ${samHeader._1} {
    +          ${samHeader._2}
    +          ${reuseInputUnboxingCode()}
    +          $bodyCode
    +        }
    +      }
    +    """.stripMargin
    +
    +    GeneratedFunction(funcName, returnType, funcCode)
    +  }
    +
    +  def generateConverterResultExpression(
    +      returnType: TypeInformation[_ <: Any])
    +    : GeneratedExpression = {
    +    val input1AccessExprs = for (i <- 0 until input1.getArity)
    +      yield generateInputAccess(input1, input1Term, i)
    +
    +    val input2AccessExprs = input2 match {
    +      case Some(ti) => for (i <- 0 until ti.getArity)
    +        yield generateInputAccess(ti, input2Term, i)
    +      case None => Seq() // add nothing
    +    }
    +
    +    generateResultExpression(input1AccessExprs ++ input2AccessExprs, returnType)
    +  }
    +
    +  def generateResultExpression(
    +      returnType: TypeInformation[_ <: Any],
    +      rexNodes: Seq[RexNode])
    +    : GeneratedExpression = {
    +    val fieldExprs = rexNodes.map(generateExpression)
    +    generateResultExpression(fieldExprs, returnType)
    +  }
    +
    +  def generateResultExpression(
    +      fieldExprs: Seq[GeneratedExpression],
    +      returnType: TypeInformation[_ <: Any])
    +    : GeneratedExpression = {
    +    // initial type check
    +    if (returnType.getArity != fieldExprs.length) {
    +      throw new CodeGenException("Arity of result type does not match number of expressions.")
    +    }
    +    // type check
    +    returnType match {
    +      case ct: CompositeType[_] =>
    +        fieldExprs.zipWithIndex foreach {
    +          case (fieldExpr, i) if fieldExpr.resultType != ct.getTypeAt(i) =>
    +            throw new CodeGenException("Incompatible types of expression and result type.")
    +          case _ => // ok
    +        }
    +      case at: AtomicType[_] if at != fieldExprs.head.resultType =>
    +        throw new CodeGenException("Incompatible types of expression and result type.")
    +      case _ => // ok
    +    }
    +
    +    val returnTypeTerm = boxedTypeTermForTypeInfo(returnType)
    +
    +    // generate result expression
    +    returnType match {
    +      case ri: RowTypeInfo =>
    +        addReusableOutRecord(ri)
    +        val resultSetters: String = fieldExprs.zipWithIndex map {
    +          case (fieldExpr, i) =>
    +            if (nullCheck) {
    +              s"""
    +              |${fieldExpr.code}
    +              |if (${fieldExpr.nullTerm}) {
    +              |  $outRecordTerm.setField($i, null);
    +              |}
    +              |else {
    +              |  $outRecordTerm.setField($i, ${fieldExpr.resultTerm});
    +              |}
    +              |""".stripMargin
    +            }
    +            else {
    +              s"""
    +              |${fieldExpr.code}
    +              |$outRecordTerm.setField($i, ${fieldExpr.resultTerm});
    +              |""".stripMargin
    +            }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
    +
    +      case pj: PojoTypeInfo[_] =>
    +        addReusableOutRecord(pj)
    +        val resultSetters: String = fieldExprs.zip(pj.getFieldNames) map {
    +        case (fieldExpr, fieldName) =>
    +          if (nullCheck) {
    +            s"""
    +            |${fieldExpr.code}
    +            |if (${fieldExpr.nullTerm}) {
    +            |  $outRecordTerm.$fieldName = null;
    +            |}
    +            |else {
    +            |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +            |}
    +            |""".stripMargin
    +          }
    +          else {
    +            s"""
    +            |${fieldExpr.code}
    +            |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +            |""".stripMargin
    +          }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
    +
    +      case tup: TupleTypeInfo[_] =>
    +        addReusableOutRecord(tup)
    +        val resultSetters: String = fieldExprs.zipWithIndex map {
    +          case (fieldExpr, i) =>
    +            val fieldName = "f" + i
    +            if (nullCheck) {
    +              s"""
    +                |${fieldExpr.code}
    +                |if (${fieldExpr.nullTerm}) {
    +                |  throw new NullPointerException("Null result cannot be stored in a Tuple.");
    +                |}
    +                |else {
    +                |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +                |}
    +                |""".stripMargin
    +            }
    +            else {
    +              s"""
    +                |${fieldExpr.code}
    +                |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +                |""".stripMargin
    +            }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
    +
    +      case cc: CaseClassTypeInfo[_] =>
    +        val fieldCodes: String = fieldExprs.map(_.code).mkString("\n")
    +        val constructorParams: String = fieldExprs.map(_.resultTerm).mkString(", ")
    +        val resultTerm = newName(outRecordTerm)
    +
    +        val nullCheckCode = if (nullCheck) {
    +        fieldExprs map { (fieldExpr) =>
    +          s"""
    +              |if (${fieldExpr.nullTerm}) {
    +              |  throw new NullPointerException("Null result cannot be stored in a Case Class.");
    +              |}
    +              |""".stripMargin
    +          } mkString "\n"
    +        } else {
    +          ""
    +        }
    +
    +        val resultCode =
    +          s"""
    +            |$fieldCodes
    +            |$nullCheckCode
    +            |$returnTypeTerm $resultTerm = new $returnTypeTerm($constructorParams);
    +            |""".stripMargin
    +
    +        GeneratedExpression(resultTerm, "false", resultCode, returnType)
    +
    +      case a: AtomicType[_] =>
    +        val fieldExpr = fieldExprs.head
    +        val nullCheckCode = if (nullCheck) {
    +          s"""
    +          |if (${fieldExpr.nullTerm}) {
    +          |  throw new NullPointerException("Null result cannot be used for atomic types.");
    +          |}
    +          |""".stripMargin
    +        } else {
    +          ""
    +        }
    +        val resultCode =
    +          s"""
    +            |${fieldExpr.code}
    +            |$nullCheckCode
    +            |""".stripMargin
    +
    +        GeneratedExpression(fieldExpr.resultTerm, "false", resultCode, returnType)
    +
    +      case _ =>
    +        throw new CodeGenException(s"Unsupported result type: $returnType")
    +    }
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = {
    +    // if inputRef index is within size of input1 we work with input1, input2 otherwise
    +    val input = if (inputRef.getIndex < input1.getArity) {
    +      (input1, input1Term)
    +    } else {
    +      (input2.getOrElse(throw new CodeGenException("Invalid input access.")), input2Term)
    +    }
    +
    +    val index = if (input._1 == input1) {
    +      inputRef.getIndex
    +    } else {
    +      inputRef.getIndex - input1.getArity
    +    }
    +
    +    generateInputAccess(input._1, input._2, index)
    +  }
    +
    +  override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression = ???
    +
    +  override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
    +    val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName)
    +    val value = literal.getValue3
    +    literal.getType.getSqlTypeName match {
    +      case BOOLEAN =>
    +        generateNonNullLiteral(resultType, literal.getValue3.toString)
    +      case TINYINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidByte) {
    +          generateNonNullLiteral(resultType, decimal.byteValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to byte.")
    +        }
    +      case SMALLINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidShort) {
    +          generateNonNullLiteral(resultType, decimal.shortValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to short.")
    +        }
    +      case INTEGER =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidShort) {
    +          generateNonNullLiteral(resultType, decimal.intValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to integer.")
    +        }
    +      case BIGINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidLong) {
    +          generateNonNullLiteral(resultType, decimal.longValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to long.")
    +        }
    +      case FLOAT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidFloat) {
    +          generateNonNullLiteral(resultType, decimal.floatValue().toString + "f")
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to float.")
    +        }
    +      case DOUBLE =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidDouble) {
    +          generateNonNullLiteral(resultType, decimal.doubleValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to double.")
    +        }
    +      case VARCHAR | CHAR =>
    +        generateNonNullLiteral(resultType, value.toString)
    --- End diff --
    
    Do we need quotes around the string value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52293004
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---
    @@ -0,0 +1,661 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable._
    +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction}
    +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.TableConfig
    +import org.apache.flink.api.table.codegen.CodeGenUtils._
    +import org.apache.flink.api.table.codegen.Indenter._
    --- End diff --
    
    Intellij tells me this import is unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1595#issuecomment-181955939
  
    Thanks for the quick update @twalthr! Some tests are failing because the wrong type of exception is expected. I'll fix those and then merge this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1595#discussion_r52301458
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---
    @@ -0,0 +1,661 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.codegen
    +
    +import org.apache.calcite.rex._
    +import org.apache.calcite.sql.`type`.SqlTypeName._
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable._
    +import org.apache.flink.api.common.functions.{FlatMapFunction, Function, MapFunction}
    +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
    +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
    +import org.apache.flink.api.table.TableConfig
    +import org.apache.flink.api.table.codegen.CodeGenUtils._
    +import org.apache.flink.api.table.codegen.Indenter._
    +import org.apache.flink.api.table.codegen.OperatorCodeGen._
    +import org.apache.flink.api.table.plan.TypeConverter.sqlTypeToTypeInfo
    +import org.apache.flink.api.table.typeinfo.RowTypeInfo
    +
    +import scala.collection.JavaConversions._
    +import scala.collection.mutable
    +
    +class CodeGenerator(
    +    config: TableConfig,
    +    input1: TypeInformation[Any],
    +    input2: Option[TypeInformation[Any]] = None)
    +  extends RexVisitor[GeneratedExpression] {
    +
    +  // set of member statements that will be added only once
    +  // we use a LinkedHashSet to keep the insertion order
    +  private val reusableMemberStatements = mutable.LinkedHashSet[String]()
    +
    +  // set of constructor statements that will be added only once
    +  // we use a LinkedHashSet to keep the insertion order
    +  private val reusableInitStatements = mutable.LinkedHashSet[String]()
    +
    +  // map of initial input unboxing expressions that will be added only once
    +  // (inputTerm, index) -> expr
    +  private val reusableInputUnboxingExprs = mutable.Map[(String, Int), GeneratedExpression]()
    +
    +  def reuseMemberCode(): String = {
    +    reusableMemberStatements.mkString("", "\n", "\n")
    +  }
    +
    +  def reuseInitCode(): String = {
    +    reusableInitStatements.mkString("", "\n", "\n")
    +  }
    +
    +  def reuseInputUnboxingCode(): String = {
    +    reusableInputUnboxingExprs.values.map(_.code).mkString("", "\n", "\n")
    +  }
    +
    +  def input1Term = "in1"
    +
    +  def input2Term = "in2"
    +
    +  def collectorTerm = "c"
    +
    +  def outRecordTerm = "out"
    +
    +  def nullCheck: Boolean = config.getNullCheck
    +
    +  def generateExpression(rex: RexNode): GeneratedExpression = {
    +    rex.accept(this)
    +  }
    +
    +  def generateFunction[T <: Function](
    +      name: String,
    +      clazz: Class[T],
    +      bodyCode: String,
    +      returnType: TypeInformation[Any])
    +    : GeneratedFunction[T] = {
    +    val funcName = newName(name)
    +
    +    // Janino does not support generics, that's why we need
    +    // manual casting here
    +    val samHeader =
    +      if (clazz == classOf[FlatMapFunction[_,_]]) {
    +        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    +        (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
    +          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +      } else if (clazz == classOf[MapFunction[_,_]]) {
    +        val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
    +        ("Object map(Object _in1)",
    +          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +      } else {
    +        // TODO more functions
    +        throw new CodeGenException("Unsupported Function.")
    +      }
    +
    +    val funcCode = j"""
    +      public class $funcName
    +          implements ${clazz.getCanonicalName} {
    +
    +        ${reuseMemberCode()}
    +
    +        public $funcName() {
    +          ${reuseInitCode()}
    +        }
    +
    +        @Override
    +        public ${samHeader._1} {
    +          ${samHeader._2}
    +          ${reuseInputUnboxingCode()}
    +          $bodyCode
    +        }
    +      }
    +    """.stripMargin
    +
    +    GeneratedFunction(funcName, returnType, funcCode)
    +  }
    +
    +  def generateConverterResultExpression(
    +      returnType: TypeInformation[_ <: Any])
    +    : GeneratedExpression = {
    +    val input1AccessExprs = for (i <- 0 until input1.getArity)
    +      yield generateInputAccess(input1, input1Term, i)
    +
    +    val input2AccessExprs = input2 match {
    +      case Some(ti) => for (i <- 0 until ti.getArity)
    +        yield generateInputAccess(ti, input2Term, i)
    +      case None => Seq() // add nothing
    +    }
    +
    +    generateResultExpression(input1AccessExprs ++ input2AccessExprs, returnType)
    +  }
    +
    +  def generateResultExpression(
    +      returnType: TypeInformation[_ <: Any],
    +      rexNodes: Seq[RexNode])
    +    : GeneratedExpression = {
    +    val fieldExprs = rexNodes.map(generateExpression)
    +    generateResultExpression(fieldExprs, returnType)
    +  }
    +
    +  def generateResultExpression(
    +      fieldExprs: Seq[GeneratedExpression],
    +      returnType: TypeInformation[_ <: Any])
    +    : GeneratedExpression = {
    +    // initial type check
    +    if (returnType.getArity != fieldExprs.length) {
    +      throw new CodeGenException("Arity of result type does not match number of expressions.")
    +    }
    +    // type check
    +    returnType match {
    +      case ct: CompositeType[_] =>
    +        fieldExprs.zipWithIndex foreach {
    +          case (fieldExpr, i) if fieldExpr.resultType != ct.getTypeAt(i) =>
    +            throw new CodeGenException("Incompatible types of expression and result type.")
    +          case _ => // ok
    +        }
    +      case at: AtomicType[_] if at != fieldExprs.head.resultType =>
    +        throw new CodeGenException("Incompatible types of expression and result type.")
    +      case _ => // ok
    +    }
    +
    +    val returnTypeTerm = boxedTypeTermForTypeInfo(returnType)
    +
    +    // generate result expression
    +    returnType match {
    +      case ri: RowTypeInfo =>
    +        addReusableOutRecord(ri)
    +        val resultSetters: String = fieldExprs.zipWithIndex map {
    +          case (fieldExpr, i) =>
    +            if (nullCheck) {
    +              s"""
    +              |${fieldExpr.code}
    +              |if (${fieldExpr.nullTerm}) {
    +              |  $outRecordTerm.setField($i, null);
    +              |}
    +              |else {
    +              |  $outRecordTerm.setField($i, ${fieldExpr.resultTerm});
    +              |}
    +              |""".stripMargin
    +            }
    +            else {
    +              s"""
    +              |${fieldExpr.code}
    +              |$outRecordTerm.setField($i, ${fieldExpr.resultTerm});
    +              |""".stripMargin
    +            }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
    +
    +      case pj: PojoTypeInfo[_] =>
    +        addReusableOutRecord(pj)
    +        val resultSetters: String = fieldExprs.zip(pj.getFieldNames) map {
    +        case (fieldExpr, fieldName) =>
    +          if (nullCheck) {
    +            s"""
    +            |${fieldExpr.code}
    +            |if (${fieldExpr.nullTerm}) {
    +            |  $outRecordTerm.$fieldName = null;
    +            |}
    +            |else {
    +            |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +            |}
    +            |""".stripMargin
    +          }
    +          else {
    +            s"""
    +            |${fieldExpr.code}
    +            |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +            |""".stripMargin
    +          }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
    +
    +      case tup: TupleTypeInfo[_] =>
    +        addReusableOutRecord(tup)
    +        val resultSetters: String = fieldExprs.zipWithIndex map {
    +          case (fieldExpr, i) =>
    +            val fieldName = "f" + i
    +            if (nullCheck) {
    +              s"""
    +                |${fieldExpr.code}
    +                |if (${fieldExpr.nullTerm}) {
    +                |  throw new NullPointerException("Null result cannot be stored in a Tuple.");
    +                |}
    +                |else {
    +                |  $outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +                |}
    +                |""".stripMargin
    +            }
    +            else {
    +              s"""
    +                |${fieldExpr.code}
    +                |$outRecordTerm.$fieldName = ${fieldExpr.resultTerm};
    +                |""".stripMargin
    +            }
    +        } mkString "\n"
    +
    +        GeneratedExpression(outRecordTerm, "false", resultSetters, returnType)
    +
    +      case cc: CaseClassTypeInfo[_] =>
    +        val fieldCodes: String = fieldExprs.map(_.code).mkString("\n")
    +        val constructorParams: String = fieldExprs.map(_.resultTerm).mkString(", ")
    +        val resultTerm = newName(outRecordTerm)
    +
    +        val nullCheckCode = if (nullCheck) {
    +        fieldExprs map { (fieldExpr) =>
    +          s"""
    +              |if (${fieldExpr.nullTerm}) {
    +              |  throw new NullPointerException("Null result cannot be stored in a Case Class.");
    +              |}
    +              |""".stripMargin
    +          } mkString "\n"
    +        } else {
    +          ""
    +        }
    +
    +        val resultCode =
    +          s"""
    +            |$fieldCodes
    +            |$nullCheckCode
    +            |$returnTypeTerm $resultTerm = new $returnTypeTerm($constructorParams);
    +            |""".stripMargin
    +
    +        GeneratedExpression(resultTerm, "false", resultCode, returnType)
    +
    +      case a: AtomicType[_] =>
    +        val fieldExpr = fieldExprs.head
    +        val nullCheckCode = if (nullCheck) {
    +          s"""
    +          |if (${fieldExpr.nullTerm}) {
    +          |  throw new NullPointerException("Null result cannot be used for atomic types.");
    +          |}
    +          |""".stripMargin
    +        } else {
    +          ""
    +        }
    +        val resultCode =
    +          s"""
    +            |${fieldExpr.code}
    +            |$nullCheckCode
    +            |""".stripMargin
    +
    +        GeneratedExpression(fieldExpr.resultTerm, "false", resultCode, returnType)
    +
    +      case _ =>
    +        throw new CodeGenException(s"Unsupported result type: $returnType")
    +    }
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  override def visitInputRef(inputRef: RexInputRef): GeneratedExpression = {
    +    // if inputRef index is within size of input1 we work with input1, input2 otherwise
    +    val input = if (inputRef.getIndex < input1.getArity) {
    +      (input1, input1Term)
    +    } else {
    +      (input2.getOrElse(throw new CodeGenException("Invalid input access.")), input2Term)
    +    }
    +
    +    val index = if (input._1 == input1) {
    +      inputRef.getIndex
    +    } else {
    +      inputRef.getIndex - input1.getArity
    +    }
    +
    +    generateInputAccess(input._1, input._2, index)
    +  }
    +
    +  override def visitFieldAccess(rexFieldAccess: RexFieldAccess): GeneratedExpression = ???
    +
    +  override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
    +    val resultType = sqlTypeToTypeInfo(literal.getType.getSqlTypeName)
    +    val value = literal.getValue3
    +    literal.getType.getSqlTypeName match {
    +      case BOOLEAN =>
    +        generateNonNullLiteral(resultType, literal.getValue3.toString)
    +      case TINYINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidByte) {
    +          generateNonNullLiteral(resultType, decimal.byteValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to byte.")
    +        }
    +      case SMALLINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidShort) {
    +          generateNonNullLiteral(resultType, decimal.shortValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to short.")
    +        }
    +      case INTEGER =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidShort) {
    +          generateNonNullLiteral(resultType, decimal.intValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to integer.")
    +        }
    +      case BIGINT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidLong) {
    +          generateNonNullLiteral(resultType, decimal.longValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to long.")
    +        }
    +      case FLOAT =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidFloat) {
    +          generateNonNullLiteral(resultType, decimal.floatValue().toString + "f")
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to float.")
    +        }
    +      case DOUBLE =>
    +        val decimal = BigDecimal(value.asInstanceOf[java.math.BigDecimal])
    +        if (decimal.isValidDouble) {
    +          generateNonNullLiteral(resultType, decimal.doubleValue().toString)
    +        }
    +        else {
    +          throw new CodeGenException("Decimal can not be converted to double.")
    +        }
    +      case VARCHAR | CHAR =>
    +        generateNonNullLiteral(resultType, value.toString)
    --- End diff --
    
    Yes, this is a bug. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/flink/pull/1595#issuecomment-181848000
  
    Thanks for reviewing @fhueske.
    
    I will comment my code more.
    
    Regarding the splitting of the `CodeGenerator`, I think it is not that easy because functions and expressions both access reusable code parts. I will think about it again...
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Implement a CodeGenerator for an ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/flink/pull/1595#issuecomment-181902296
  
    I have added comments and fixed the string bug. I will solve the other issues at the end of the week.
    If you have no objections, you can merge the PR ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---