You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/07/26 17:26:41 UTC

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/4404

    [FLINK-4565] [table] Support for SQL IN operator

    ## What is the purpose of the change
    
    *This PR adds support for the IN operator in both SQL and Table API. Both for testing in a set of elements `f0.in(1, 2, 3)` as well as in sub-query `f0.in(table)`*
    
    
    ## Brief change log
    
    *Various changes in plan translation, code generation, and API.*
    
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *Unit tests: org.apache.flink.table.expressions.ScalarOperatorsTest*
      - *Unit tests: org.apache.flink.table.expressions.validation.ScalarOperatorsValidationTest*
      - *Integration tests: org.apache.flink.table.runtime.batch.sql.SetOperatorsITCase*
      - *Integration tests: org.apache.flink.table.runtime.batch.table.SetOperatorsITCase*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? not documented
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-4565

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4404
    
----
commit c0e7f58973cd4237382811b7ae1ee4fb4cdcaebf
Author: DmytroShkvyra <ds...@gmail.com>
Date:   2017-03-09T19:37:46Z

    [FLINK-4565] [table] Support for SQL IN operator

commit 14ff3c23e04c03c018118cc8ab33ea7255277ef9
Author: twalthr <tw...@apache.org>
Date:   2017-07-26T16:11:38Z

    [FLINK-4565] [table] Full IN operator support for literals and sub-queries

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r131654308
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.expressions
    +
    +import com.google.common.collect.ImmutableList
    +import org.apache.calcite.rex.{RexNode, RexSubQuery}
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.typeutils.TypeCheckUtils._
    +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
    +
    +case class In(expression: Expression, elements: Seq[Expression]) extends Expression  {
    +
    +  override def toString = s"$expression.in(${elements.mkString(", ")})"
    +
    +  override private[flink] def children: Seq[Expression] = expression +: elements.distinct
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode))
    +
    +      case _ =>
    +        relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
    +    }
    +  }
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        if (elements.length != 1) {
    +          return ValidationFailure("IN operator supports only one table reference.")
    +        }
    +        if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) {
    +          return ValidationFailure(
    +            "Sub-query IN operator on stream tables is currently not supported.")
    +        }
    +        val tableOutput = table.logicalPlan.output
    +        if (tableOutput.length > 1) {
    +          return ValidationFailure(
    +            s"The sub-query table '$name' must not have more than one column.")
    +        }
    +        (expression.resultType, tableOutput.head.resultType) match {
    +          case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
    +          case (lType, rType) if lType == rType => ValidationSuccess
    --- End diff --
    
    Should I open a new JIRA ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r131511583
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -497,6 +497,23 @@ FROM (
     {% endhighlight %}
           </td>
         </tr>
    +
    +    <tr>
    +      <td>
    +        <strong>In</strong><br>
    +        <span class="label label-primary">Batch</span>
    +      </td>
    +      <td>
    +      Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.
    --- End diff --
    
    Should the syntax be enhanced so that user can specify one column in the table with multiple columns ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r130860371
  
    --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/util/CollectionDataSets.scala ---
    @@ -17,9 +17,9 @@
      */
     package org.apache.flink.api.scala.util
     
    +import java.sql.{Date, Time, Timestamp}
    --- End diff --
    
    remove unused imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r131634081
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.expressions
    +
    +import com.google.common.collect.ImmutableList
    +import org.apache.calcite.rex.{RexNode, RexSubQuery}
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.typeutils.TypeCheckUtils._
    +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
    +
    +case class In(expression: Expression, elements: Seq[Expression]) extends Expression  {
    +
    +  override def toString = s"$expression.in(${elements.mkString(", ")})"
    +
    +  override private[flink] def children: Seq[Expression] = expression +: elements.distinct
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode))
    +
    +      case _ =>
    +        relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
    +    }
    +  }
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        if (elements.length != 1) {
    +          return ValidationFailure("IN operator supports only one table reference.")
    +        }
    +        if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) {
    +          return ValidationFailure(
    +            "Sub-query IN operator on stream tables is currently not supported.")
    +        }
    +        val tableOutput = table.logicalPlan.output
    +        if (tableOutput.length > 1) {
    +          return ValidationFailure(
    +            s"The sub-query table '$name' must not have more than one column.")
    +        }
    +        (expression.resultType, tableOutput.head.resultType) match {
    +          case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
    +          case (lType, rType) if lType == rType => ValidationSuccess
    --- End diff --
    
    Good point, yes we could do it. Feel free to submit a patch for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r130858415
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SetOperatorsITCase.scala ---
    @@ -225,4 +227,62 @@ class SetOperatorsITCase(
         val expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n"
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
    +
    +  @Test
    +  def testInWithFilter(): Unit = {
    --- End diff --
    
    Add `stringexpr` tests to validate equivalence of Scala expression and Java String Table API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r130859046
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -260,4 +260,108 @@ class SetOperatorsITCase(
         val results = result.toDataSet[Row].collect()
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
    +
    +  @Test
    +  def testInWithFilter(): Unit = {
    --- End diff --
    
    Should we validate the feature with plan tests (`TableTestBase`) rather than runtime tests? 
    The feature does not require new operators, just expressions which are tested by dedicated tests and joins which are also tested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r131823213
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.expressions
    +
    +import com.google.common.collect.ImmutableList
    +import org.apache.calcite.rex.{RexNode, RexSubQuery}
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.typeutils.TypeCheckUtils._
    +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
    +
    +case class In(expression: Expression, elements: Seq[Expression]) extends Expression  {
    +
    +  override def toString = s"$expression.in(${elements.mkString(", ")})"
    +
    +  override private[flink] def children: Seq[Expression] = expression +: elements.distinct
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode))
    +
    +      case _ =>
    +        relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
    +    }
    +  }
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        if (elements.length != 1) {
    +          return ValidationFailure("IN operator supports only one table reference.")
    +        }
    +        if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) {
    +          return ValidationFailure(
    +            "Sub-query IN operator on stream tables is currently not supported.")
    +        }
    +        val tableOutput = table.logicalPlan.output
    +        if (tableOutput.length > 1) {
    +          return ValidationFailure(
    +            s"The sub-query table '$name' must not have more than one column.")
    +        }
    +        (expression.resultType, tableOutput.head.resultType) match {
    +          case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
    +          case (lType, rType) if lType == rType => ValidationSuccess
    --- End diff --
    
    @twalthr:
    Can you take a look at my PR ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r130860454
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestData.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.runtime.utils
    +
    +import java.sql.Timestamp
    +
    +import org.apache.flink.api.scala._
    +
    +import scala.collection.mutable
    +import scala.util.Random
    +
    +object BatchTestData {
    --- End diff --
    
    Not needed if we switch to plan tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r130713721
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala ---
    @@ -137,6 +133,72 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
       }
     
       @Test
    +  def testIn(): Unit = {
    --- End diff --
    
    Also add tests for expressions that result in `false`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r131633926
  
    --- Diff: docs/dev/table/sql.md ---
    @@ -497,6 +497,23 @@ FROM (
     {% endhighlight %}
           </td>
         </tr>
    +
    +    <tr>
    +      <td>
    +        <strong>In</strong><br>
    +        <span class="label label-primary">Batch</span>
    +      </td>
    +      <td>
    +      Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.
    --- End diff --
    
    What do you mean with "enhanced"? We cannot change the SQL standard. The user can nest another query and reduce the table to one column.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r130854117
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala ---
    @@ -138,6 +138,25 @@ case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None)
       override def toString: String = s"'$name"
     }
     
    +case class TableReference(name: String, table: Table) extends Attribute {
    --- End diff --
    
    `TableReference` shouldn't extend `Attribute` IMO.
    We can make it `extend LeafExpression with NamedExpression`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r131519235
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.expressions
    +
    +import com.google.common.collect.ImmutableList
    +import org.apache.calcite.rex.{RexNode, RexSubQuery}
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.typeutils.TypeCheckUtils._
    +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
    +
    +case class In(expression: Expression, elements: Seq[Expression]) extends Expression  {
    +
    +  override def toString = s"$expression.in(${elements.mkString(", ")})"
    +
    +  override private[flink] def children: Seq[Expression] = expression +: elements.distinct
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode))
    +
    +      case _ =>
    +        relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
    +    }
    +  }
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        if (elements.length != 1) {
    +          return ValidationFailure("IN operator supports only one table reference.")
    +        }
    +        if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) {
    +          return ValidationFailure(
    +            "Sub-query IN operator on stream tables is currently not supported.")
    +        }
    +        val tableOutput = table.logicalPlan.output
    +        if (tableOutput.length > 1) {
    +          return ValidationFailure(
    +            s"The sub-query table '$name' must not have more than one column.")
    +        }
    +        (expression.resultType, tableOutput.head.resultType) match {
    +          case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
    +          case (lType, rType) if lType == rType => ValidationSuccess
    --- End diff --
    
    Should this be moved ahead of the isNumeric() check since this check is cheaper ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r130859202
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/SetOperatorsITCase.scala ---
    @@ -225,4 +227,62 @@ class SetOperatorsITCase(
         val expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n"
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
    +
    +  @Test
    +  def testInWithFilter(): Unit = {
    --- End diff --
    
    These runtime tests could also be replaced by plan tests, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4404


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r131726341
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.expressions
    +
    +import com.google.common.collect.ImmutableList
    +import org.apache.calcite.rex.{RexNode, RexSubQuery}
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.typeutils.TypeCheckUtils._
    +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
    +
    +case class In(expression: Expression, elements: Seq[Expression]) extends Expression  {
    +
    +  override def toString = s"$expression.in(${elements.mkString(", ")})"
    +
    +  override private[flink] def children: Seq[Expression] = expression +: elements.distinct
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode))
    +
    +      case _ =>
    +        relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
    +    }
    +  }
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        if (elements.length != 1) {
    +          return ValidationFailure("IN operator supports only one table reference.")
    +        }
    +        if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) {
    +          return ValidationFailure(
    +            "Sub-query IN operator on stream tables is currently not supported.")
    +        }
    +        val tableOutput = table.logicalPlan.output
    +        if (tableOutput.length > 1) {
    +          return ValidationFailure(
    +            s"The sub-query table '$name' must not have more than one column.")
    +        }
    +        (expression.resultType, tableOutput.head.resultType) match {
    +          case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
    +          case (lType, rType) if lType == rType => ValidationSuccess
    --- End diff --
    
    Created #4493


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4404#discussion_r131655114
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/subquery.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.expressions
    +
    +import com.google.common.collect.ImmutableList
    +import org.apache.calcite.rex.{RexNode, RexSubQuery}
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.StreamTableEnvironment
    +import org.apache.flink.table.typeutils.TypeCheckUtils._
    +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
    +
    +case class In(expression: Expression, elements: Seq[Expression]) extends Expression  {
    +
    +  override def toString = s"$expression.in(${elements.mkString(", ")})"
    +
    +  override private[flink] def children: Seq[Expression] = expression +: elements.distinct
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        RexSubQuery.in(table.getRelNode, ImmutableList.of(expression.toRexNode))
    +
    +      case _ =>
    +        relBuilder.call(SqlStdOperatorTable.IN, children.map(_.toRexNode): _*)
    +    }
    +  }
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    // check if this is a sub-query expression or an element list
    +    elements.head match {
    +
    +      case TableReference(name, table) =>
    +        if (elements.length != 1) {
    +          return ValidationFailure("IN operator supports only one table reference.")
    +        }
    +        if (table.tableEnv.isInstanceOf[StreamTableEnvironment]) {
    +          return ValidationFailure(
    +            "Sub-query IN operator on stream tables is currently not supported.")
    +        }
    +        val tableOutput = table.logicalPlan.output
    +        if (tableOutput.length > 1) {
    +          return ValidationFailure(
    +            s"The sub-query table '$name' must not have more than one column.")
    +        }
    +        (expression.resultType, tableOutput.head.resultType) match {
    +          case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
    +          case (lType, rType) if lType == rType => ValidationSuccess
    --- End diff --
    
    I think a hotfix is enough. This part is not performance critical anyway, because it happens before runtime.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4404
  
    Thanks for reviewing @fhueske. I addressed your feedback and added documentation. I will merge this now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4404: [FLINK-4565] [table] Support for SQL IN operator

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on the issue:

    https://github.com/apache/flink/pull/4404
  
    @fhueske
    Can you take a look at #4493 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---