You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by wuchong <gi...@git.apache.org> on 2016/06/24 12:44:35 UTC

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

GitHub user wuchong opened a pull request:

    https://github.com/apache/flink/pull/2159

    [FLINK-3942] [tableAPI] Add support for INTERSECT

    Internally, I translate INTERSECT into a Join on all fields and then a distinct for removing duplicate records. 
    
    As Calcite SQL Parser doesn't support `INTERSECT ALL` , so I didn't add `intersectAll()` function to `Table`.
    
    I can add the corresponding documents if needed. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wuchong/flink INTERSECT

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2159.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2159
    
----
commit a666a3aabb01090eb3c6205b28e54a0dbb2e8a05
Author: Jark Wu <wu...@alibaba-inc.com>
Date:   2016-06-24T11:57:09Z

    [FLINK-3942] [tableAPI] Add support for INTERSECT

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2159
  
    Hi @wuchong, thanks for the update! The PR looks mostly good. I left a few minor comments.
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r68474420
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/IntersectITCase.scala ---
    @@ -0,0 +1,108 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.batch.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
    +import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.scala.util.CollectionDataSets
    +import org.apache.flink.api.table.{Row, TableEnvironment}
    +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit._
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +import scala.util.Random
    +
    +@RunWith(classOf[Parameterized])
    +class IntersectITCase(
    +    mode: TestExecutionMode,
    +    configMode: TableConfigMode)
    +  extends TableProgramsTestBase(mode, configMode) {
    +
    +  @Test
    +  def testIntersect(): Unit = {
    --- End diff --
    
    I think we need only this test to verify that intersect works correctly. However, for each test class (child of `TableProgramsTestBase`) a new test cluster is started which causes significant overhead. Can you add this test to `UnionITCase` and rename it to `SetOperatorsITCase`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r68473826
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -446,6 +446,31 @@ class Table(
       }
     
       /**
    +    * Intersect two [[Table]]s with duplicate records removed. Intersect returns rows only
    --- End diff --
    
    The description of intersect suggests that the same row is emitted twice if it appears two times in the left table which is not correct. Rows that appear in both inputs are emitted exactly once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r69039071
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -446,9 +446,9 @@ class Table(
       }
     
       /**
    -    * Intersect two [[Table]]s with duplicate records removed. Intersect returns rows only
    -    * from the left table that are identical to a row in the right table.
    -    * Similar to an SQL INTERSECT. The fields of the two union operations must fully overlap.
    +    * Intersect two [[Table]]s with duplicate records removed. Intersect returns records that
    +    * exist in both tables, and emit exactly once. Similar to an SQL INTERSECT. The fields of
    --- End diff --
    
    I would rephrase this to "Intersect returns records that exist in both tables. If a record is present one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r69036989
  
    --- Diff: docs/apis/table.md ---
    @@ -535,6 +535,30 @@ Table result = left.unionAll(right);
           </td>
         </tr>
     
    +	<tr>
    +      <td><strong>Intersect</strong></td>
    +      <td>
    +        <p>Similar to a SQL INTERSECT clause. Intersects two tables with duplicate records removed. Both tables must have identical field types.</p>
    +{% highlight scala %}
    --- End diff --
    
    This should be Java code + `highlight java`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r68474570
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---
    @@ -260,10 +260,14 @@ class CodeGenerator(
         val input1AccessExprs = for (i <- 0 until input1.getArity)
           yield generateInputAccess(input1, input1Term, i)
     
    -    val input2AccessExprs = input2 match {
    -      case Some(ti) => for (i <- 0 until ti.getArity)
    -        yield generateInputAccess(ti, input2Term, i)
    -      case None => Seq() // add nothing
    +    val input2AccessExprs = if (input1.getArity < resultFieldNames.length) {
    --- End diff --
    
    Can this change be reverted if we use cogroup and an optional converter map? Or would you consider this as a bug fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2159
  
    Hi @wuchong, thanks for the PR. I think it is better to use a CoGroup instead of a Join for `INTERSECT` (even though, I said "Join" in the original JIRA). For `INTERSECT ALL`, CoGroup is required. It would also be good to update the Table API and SQL documentation as @mushketyk suggested. 
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2159
  
    I think you need to update documentation for this page: https://ci.apache.org/projects/flink/flink-docs-master/apis/table.html regarding newly added operations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r68473250
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.codegen.CodeGenerator
    +import org.apache.flink.api.table.runtime.FlatJoinRunner
    +import org.apache.flink.api.table.typeutils.TypeConverter._
    +import org.apache.flink.api.table.BatchTableEnvironment
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which translate Intersect into Join Operator.
    +  *
    +  */
    +class DataSetIntersect(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    left: RelNode,
    +    right: RelNode,
    +    rowType: RelDataType,
    +    all: Boolean,
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, left, right)
    +    with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetIntersect(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      all,
    +      ruleDescription
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"Intersect(intersect: ($intersectSelectionToString))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("intersect", intersectSelectionToString)
    +  }
    +
    +  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
    +    val children = this.getInputs
    +    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
    +      val rowCnt = metadata.getRowCount(child)
    +      val rowSize = this.estimateRowSize(child.getRowType)
    +      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
    +    }
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: BatchTableEnvironment,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    var leftDataSet: DataSet[Any] = null
    +    var rightDataSet: DataSet[Any] = null
    +
    +    expectedType match {
    +      case None =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +        rightDataSet =
    +          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    +      case _ =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    }
    +
    +    val config = tableEnv.getConfig
    +
    +    val returnType = determineReturnType(
    +      getRowType,
    +      expectedType,
    +      config.getNullCheck,
    +      config.getEfficientTypeUsage)
    +
    +    val generator = new CodeGenerator(
    +      config,
    +      false,
    +      leftDataSet.getType,
    +      Some(rightDataSet.getType))
    +
    +    val conversion = generator.generateConverterResultExpression(
    +      returnType,
    +      left.getRowType.getFieldNames)
    +
    +
    +    val body = s"""
    +            |${conversion.code}
    +            |${generator.collectorTerm}.collect(${conversion.resultTerm});
    +            |""".stripMargin
    +
    +    val genFunction = generator.generateFunction(
    +      ruleDescription,
    +      classOf[FlatJoinFunction[Any, Any, Any]],
    +      body,
    +      returnType)
    +
    +    val joinFun = new FlatJoinRunner[Any, Any, Any](
    +      genFunction.name,
    +      genFunction.code,
    +      genFunction.returnType)
    +
    +    val joinOpName = s"intersect: ($intersectSelectionToString)"
    +
    +    val leftKeys = 0 until left.getRowType.getFieldCount
    +    val rightKeys = 0 until left.getRowType.getFieldCount
    +
    +    val intersectDS = leftDataSet.join(rightDataSet).where(leftKeys: _*).equalTo(rightKeys: _*)
    +        .`with`(joinFun).name(joinOpName)
    +
    +    if (all) {
    +      intersectDS
    +    } else {
    +      intersectDS.distinct()
    --- End diff --
    
    I would also use a coGroup for `INTERSECT` (without `ALL`) because we would not need a distinct afterwards. It's the more robust choice because it won't create a huge intermediate result in case of many duplicate records.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/2159
  
    @wuchong Fabian is on vacation at the moment. I will review your code the next days.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r68473031
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.codegen.CodeGenerator
    +import org.apache.flink.api.table.runtime.FlatJoinRunner
    +import org.apache.flink.api.table.typeutils.TypeConverter._
    +import org.apache.flink.api.table.BatchTableEnvironment
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which translate Intersect into Join Operator.
    +  *
    +  */
    +class DataSetIntersect(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    left: RelNode,
    +    right: RelNode,
    +    rowType: RelDataType,
    +    all: Boolean,
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, left, right)
    +    with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetIntersect(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      all,
    +      ruleDescription
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"Intersect(intersect: ($intersectSelectionToString))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("intersect", intersectSelectionToString)
    +  }
    +
    +  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
    +    val children = this.getInputs
    +    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
    +      val rowCnt = metadata.getRowCount(child)
    +      val rowSize = this.estimateRowSize(child.getRowType)
    +      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
    +    }
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: BatchTableEnvironment,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    var leftDataSet: DataSet[Any] = null
    +    var rightDataSet: DataSet[Any] = null
    +
    +    expectedType match {
    +      case None =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +        rightDataSet =
    +          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    +      case _ =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    }
    +
    +    val config = tableEnv.getConfig
    +
    +    val returnType = determineReturnType(
    +      getRowType,
    +      expectedType,
    +      config.getNullCheck,
    +      config.getEfficientTypeUsage)
    +
    +    val generator = new CodeGenerator(
    +      config,
    +      false,
    +      leftDataSet.getType,
    +      Some(rightDataSet.getType))
    +
    +    val conversion = generator.generateConverterResultExpression(
    +      returnType,
    +      left.getRowType.getFieldNames)
    +
    +
    +    val body = s"""
    +            |${conversion.code}
    +            |${generator.collectorTerm}.collect(${conversion.resultTerm});
    +            |""".stripMargin
    +
    +    val genFunction = generator.generateFunction(
    +      ruleDescription,
    +      classOf[FlatJoinFunction[Any, Any, Any]],
    +      body,
    +      returnType)
    +
    +    val joinFun = new FlatJoinRunner[Any, Any, Any](
    +      genFunction.name,
    +      genFunction.code,
    +      genFunction.returnType)
    +
    +    val joinOpName = s"intersect: ($intersectSelectionToString)"
    +
    +    val leftKeys = 0 until left.getRowType.getFieldCount
    +    val rightKeys = 0 until left.getRowType.getFieldCount
    +
    +    val intersectDS = leftDataSet.join(rightDataSet).where(leftKeys: _*).equalTo(rightKeys: _*)
    +        .`with`(joinFun).name(joinOpName)
    +
    +    if (all) {
    +      intersectDS
    --- End diff --
    
    An `INTERSECT ALL` cannot be executed as a join because a join builds the Cartesian product within a join key. However, `INTERSECT ALL` should return as many identical records as are in both inputs, i.e., with `A = [1, 1, 1, 2, ,2]` and `B = [1, 2, 2, 3]` `A INTERSECT ALL B` should return `[1, 2, 2]`. Using a join we'd get `[1, 1, 1, 2, 2, 2, 2]`.
    
    `INTERSECT ALL` can be executed with a `CoGroup` which alternately forwards both iterators and only emits a record if both iterators had an element. We do not need a code-generated function for this, as the records are not modified but simply forwarded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r69037753
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala ---
    @@ -83,66 +82,68 @@ class DataSetIntersect(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    var leftDataSet: DataSet[Any] = null
    -    var rightDataSet: DataSet[Any] = null
    +    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     
    -    expectedType match {
    -      case None =>
    -        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -        rightDataSet =
    -          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    -      case _ =>
    -        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    -        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    -    }
    -
    -    val config = tableEnv.getConfig
    -
    -    val returnType = determineReturnType(
    -      getRowType,
    -      expectedType,
    -      config.getNullCheck,
    -      config.getEfficientTypeUsage)
    -
    -    val generator = new CodeGenerator(
    -      config,
    -      false,
    -      leftDataSet.getType,
    -      Some(rightDataSet.getType))
    -
    -    val conversion = generator.generateConverterResultExpression(
    -      returnType,
    -      left.getRowType.getFieldNames)
    +    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
     
    +    val leftType = leftDataSet.getType
    +    val rightType = rightDataSet.getType
     
    -    val body = s"""
    -            |${conversion.code}
    -            |${generator.collectorTerm}.collect(${conversion.resultTerm});
    -            |""".stripMargin
    +    // If it is atomic type, the field expression need to be "*".
    +    // Otherwise, we use int-based field position keys
    +    val coGroupedPredicateDs =
    +      if (leftType.isTupleType || leftType.isInstanceOf[CompositeType[Any]]) {
    +      coGroupedDs.where(0 until left.getRowType.getFieldCount: _*)
    +    } else {
    +      coGroupedDs.where("*")
    +    }
     
    -    val genFunction = generator.generateFunction(
    -      ruleDescription,
    -      classOf[FlatJoinFunction[Any, Any, Any]],
    -      body,
    -      returnType)
    +    val coGroupedWithoutFunctionDs =
    --- End diff --
    
    Same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r68474131
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/IntersectITCase.java ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.batch.table;
    +
    +import java.util.List;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.table.BatchTableEnvironment;
    +import org.apache.flink.api.java.tuple.Tuple3;
    +import org.apache.flink.api.table.Row;
    +import org.apache.flink.api.table.Table;
    +import org.apache.flink.api.table.TableEnvironment;
    +import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
    +import org.apache.flink.test.util.MultipleProgramsTestBase;
    +import org.junit.Test;
    +import org.junit.runner.RunWith;
    +import org.junit.runners.Parameterized;
    +
    +@RunWith(Parameterized.class)
    +public class IntersectITCase extends MultipleProgramsTestBase {
    --- End diff --
    
    We try to keep the number of integration tests for the Table API to a minimum to not exceed Travis' build time threshold of 2 hours. Actually, we try to remove as many as possible without cutting back test coverage. The Java Table API tests for intersect overlap with the Scala Table API and can be removed, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/2159
  
    @twalthr   Thanks a lot. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r69039259
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -471,6 +471,32 @@ class Table(
       }
     
       /**
    +    * Intersect two [[Table]]s. Intersect All returns records that exist in both tables, but not
    --- End diff --
    
    I would rephrase this to "Intersect All returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r69037698
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala ---
    @@ -83,66 +82,68 @@ class DataSetIntersect(
           tableEnv: BatchTableEnvironment,
           expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
     
    -    var leftDataSet: DataSet[Any] = null
    -    var rightDataSet: DataSet[Any] = null
    +    val leftDataSet: DataSet[Any] = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +    val rightDataSet: DataSet[Any] = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     
    -    expectedType match {
    -      case None =>
    -        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    -        rightDataSet =
    -          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    -      case _ =>
    -        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    -        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    -    }
    -
    -    val config = tableEnv.getConfig
    -
    -    val returnType = determineReturnType(
    -      getRowType,
    -      expectedType,
    -      config.getNullCheck,
    -      config.getEfficientTypeUsage)
    -
    -    val generator = new CodeGenerator(
    -      config,
    -      false,
    -      leftDataSet.getType,
    -      Some(rightDataSet.getType))
    -
    -    val conversion = generator.generateConverterResultExpression(
    -      returnType,
    -      left.getRowType.getFieldNames)
    +    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
     
    +    val leftType = leftDataSet.getType
    +    val rightType = rightDataSet.getType
     
    -    val body = s"""
    -            |${conversion.code}
    -            |${generator.collectorTerm}.collect(${conversion.resultTerm});
    -            |""".stripMargin
    +    // If it is atomic type, the field expression need to be "*".
    --- End diff --
    
    I think you can use `*` for all cases. Composite types should be (recursively) expanded to all atomic member types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/2159
  
    Thanks @fhueske for your review,  I have addressed all the comments and squashed the commit. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r68473636
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -446,6 +446,31 @@ class Table(
       }
     
       /**
    +    * Intersect two [[Table]]s with duplicate records removed. Intersect returns rows only
    +    * from the left table that are identical to a row in the right table.
    +    * Similar to an SQL INTERSECT. The fields of the two union operations must fully overlap.
    --- End diff --
    
    "The fields of the two union operations must fully overlap." -> "The fields of the two intersected tables must fully overlap."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on the issue:

    https://github.com/apache/flink/pull/2159
  
    Hi @fhueske,  that's a good idea to use CoGroup instead of Join.  I updated the PR according to your advice. Meanwhile, I updated the document too (correct me if I describe wrong). 
    
    NOTE:
    
    1. use CoGroup instead of Join,  no code gen.
    1. add INTERSECT related tests into UnionITCase, and rename it to `SetOperatorsITCase`
    2. remove INTERSECT Java API tests 
    3. add the `intersectAll` function to `Table`
    4. mark `testIntersectAll` as `@ignore` in `sql/SetOperatorsITCase`, because calcite sql parser doesn't support INTERSECT ALL, it will throw the following exception:
    
    ```
    java.lang.AssertionError: Internal error: set operator INTERSECT ALL not suported
    
      at org.apache.calcite.util.Util.newInternal(Util.java:777)
      at org.apache.calcite.sql2rel.SqlToRelConverter.convertSetOp(SqlToRelConverter.java:2920)
      at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:2885)
      at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
      at org.apache.flink.api.table.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:114)
      at org.apache.flink.api.table.BatchTableEnvironment.sql(BatchTableEnvironment.scala:132)
      at org.apache.flink.api.scala.batch.sql.SetOperatorsITCase.testIntersectAll(SetOperatorsITCase.scala:169)
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r68471662
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.codegen.CodeGenerator
    +import org.apache.flink.api.table.runtime.FlatJoinRunner
    +import org.apache.flink.api.table.typeutils.TypeConverter._
    +import org.apache.flink.api.table.BatchTableEnvironment
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which translate Intersect into Join Operator.
    +  *
    +  */
    +class DataSetIntersect(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    left: RelNode,
    +    right: RelNode,
    +    rowType: RelDataType,
    +    all: Boolean,
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, left, right)
    +    with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetIntersect(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      all,
    +      ruleDescription
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"Intersect(intersect: ($intersectSelectionToString))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("intersect", intersectSelectionToString)
    +  }
    +
    +  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
    +    val children = this.getInputs
    +    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
    +      val rowCnt = metadata.getRowCount(child)
    +      val rowSize = this.estimateRowSize(child.getRowType)
    +      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
    +    }
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: BatchTableEnvironment,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    var leftDataSet: DataSet[Any] = null
    +    var rightDataSet: DataSet[Any] = null
    +
    +    expectedType match {
    +      case None =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +        rightDataSet =
    +          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    +      case _ =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    }
    +
    +    val config = tableEnv.getConfig
    +
    +    val returnType = determineReturnType(
    +      getRowType,
    +      expectedType,
    +      config.getNullCheck,
    +      config.getEfficientTypeUsage)
    +
    +    val generator = new CodeGenerator(
    +      config,
    +      false,
    +      leftDataSet.getType,
    +      Some(rightDataSet.getType))
    +
    +    val conversion = generator.generateConverterResultExpression(
    +      returnType,
    +      left.getRowType.getFieldNames)
    +
    +
    +    val body = s"""
    +            |${conversion.code}
    +            |${generator.collectorTerm}.collect(${conversion.resultTerm});
    +            |""".stripMargin
    +
    +    val genFunction = generator.generateFunction(
    +      ruleDescription,
    +      classOf[FlatJoinFunction[Any, Any, Any]],
    +      body,
    +      returnType)
    +
    +    val joinFun = new FlatJoinRunner[Any, Any, Any](
    +      genFunction.name,
    +      genFunction.code,
    +      genFunction.returnType)
    +
    +    val joinOpName = s"intersect: ($intersectSelectionToString)"
    +
    +    val leftKeys = 0 until left.getRowType.getFieldCount
    +    val rightKeys = 0 until left.getRowType.getFieldCount
    --- End diff --
    
    should be `right` instead of `left` but both inputs need to have the same number of fields anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r69037003
  
    --- Diff: docs/apis/table.md ---
    @@ -535,6 +535,30 @@ Table result = left.unionAll(right);
           </td>
         </tr>
     
    +	<tr>
    +      <td><strong>Intersect</strong></td>
    +      <td>
    +        <p>Similar to a SQL INTERSECT clause. Intersects two tables with duplicate records removed. Both tables must have identical field types.</p>
    +{% highlight scala %}
    +val left = ds1.toTable(tableEnv, "a, b, c");
    +val right = ds2.toTable(tableEnv, "d, e, f");
    +val result = left.intersect(right);
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +
    +	<tr>
    +      <td><strong>IntersectAll</strong></td>
    +      <td>
    +        <p>Similar to a SQL INTERSECT ALL clause. Intersects two tables. It returns as many identical records as are in both tables. Both tables must have identical field types.</p>
    +{% highlight scala %}
    +val left = ds1.toTable(tableEnv, "a, b, c");
    --- End diff --
    
    This should be Java code + `highlight java`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/2159
  
    Thanks for the contribution @wuchong! I have fixed some typos but the PR looks good to merge.
    
    Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2159#discussion_r68473118
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetIntersect.scala ---
    @@ -0,0 +1,153 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelOptCluster, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{RelWriter, BiRel, RelNode}
    +import org.apache.flink.api.common.functions.FlatJoinFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.codegen.CodeGenerator
    +import org.apache.flink.api.table.runtime.FlatJoinRunner
    +import org.apache.flink.api.table.typeutils.TypeConverter._
    +import org.apache.flink.api.table.BatchTableEnvironment
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which translate Intersect into Join Operator.
    +  *
    +  */
    +class DataSetIntersect(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    left: RelNode,
    +    right: RelNode,
    +    rowType: RelDataType,
    +    all: Boolean,
    +    ruleDescription: String)
    +  extends BiRel(cluster, traitSet, left, right)
    +    with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetIntersect(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      all,
    +      ruleDescription
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"Intersect(intersect: ($intersectSelectionToString))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("intersect", intersectSelectionToString)
    +  }
    +
    +  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
    +    val children = this.getInputs
    +    children.foldLeft(planner.getCostFactory.makeCost(0, 0, 0)) { (cost, child) =>
    +      val rowCnt = metadata.getRowCount(child)
    +      val rowSize = this.estimateRowSize(child.getRowType)
    +      cost.plus(planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * rowSize))
    +    }
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: BatchTableEnvironment,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    var leftDataSet: DataSet[Any] = null
    +    var rightDataSet: DataSet[Any] = null
    +
    +    expectedType match {
    --- End diff --
    
    We can do the type conversion after the co-group with a map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2159: [FLINK-3942] [tableAPI] Add support for INTERSECT

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2159


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---