You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mushketyk <gi...@git.apache.org> on 2016/06/27 19:47:10 UTC

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

GitHub user mushketyk opened a pull request:

    https://github.com/apache/flink/pull/2169

    [FLINK-3943] Add support for EXCEPT operator

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mushketyk/flink except-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2169.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2169
    
----
commit d6ad27965068bfc3dde0324e2b4c75c07b30d193
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-06-26T14:20:25Z

    Not yet done

commit 4925340f17fb8327d9ba38704d4281ae760121db
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-06-27T19:27:33Z

    [FLINK-3943] Add EXCEPT operator

commit daea09fc0946d676c5ec153e3c9a90202f2d0687
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-06-27T19:43:38Z

    [FLINK-3943] Add EXCEPT documentation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r70070564
  
    --- Diff: docs/apis/table.md ---
    @@ -695,6 +718,30 @@ val result = left.unionAll(right);
         </tr>
     
         <tr>
    +      <td><strong>Minus</strong></td>
    +      <td>
    +        <p>Similar to a SQL EXCEPT clause. Returns elements from the first table that do not exist in the second table. Both tables must have identical schema(field names and types).</p>
    --- End diff --
    
    Can you copy the description from the Java part also in here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    @mushketyk Thanks for the PR.
    I also reviewed the current status. I think it's good to compare your code with #2159 before you rework it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r70071856
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala ---
    @@ -0,0 +1,150 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import java.lang.Iterable
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.flink.api.common.functions.CoGroupFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.BatchTableEnvironment
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which matches along with DataSetOperator.
    --- End diff --
    
    This description does not make much sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69048797
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala ---
    @@ -139,4 +154,105 @@ class UnionITCase(
         // Must fail. Tables are bound to different TableEnvironments.
         ds1.unionAll(ds2).select('c)
       }
    +
    +  @Test
    +  def testSetMinusAll(): Unit = {
    --- End diff --
    
    Can you combine this and the next test by using test data that covers both cases for different records, i.e., have some records with duplicates in this first, second, none, and both data sets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2169


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69047875
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -426,6 +426,49 @@ class Table(
       }
     
       /**
    +    * Set minus between two [[Table]]s. Similar to an SQL EXCEPT ALL.
    +    * The fields of the two minus operands must fully overlap.
    +    *
    +    * Note: Both tables must be bound to the same [[TableEnvironment]].
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   left.minusAll(right)
    +    * }}}
    +    */
    +  def minusAll(right: Table): Table = {
    +    // check that right table belongs to the same TableEnvironment
    +    if (right.tableEnv != this.tableEnv) {
    +      throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
    +    }
    +    new Table(tableEnv, SetMinus(logicalPlan, right.logicalPlan, true).validate(tableEnv))
    +  }
    +
    +  /**
    +    * Perform set minus between [[Table]]s with duplicate records removed.
    --- End diff --
    
    Please describe the semantics of minus all in more detail:
    - records of the first table are returned
    - how many identical records are returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    @twalthr Thank you for accepting my changes!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69047378
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.BatchTableEnvironment
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which matches along with DataSetOperator.
    +  *
    +  */
    +class DataSetMinus(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    left: RelNode,
    +    right: RelNode,
    +    rowType: RelDataType,
    +    all: Boolean)
    +  extends BiRel(cluster, traitSet, left, right)
    +    with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetMinus(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      all
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"SetMinus(setMinus: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
    +  }
    +
    +  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
    +
    +    val children = this.getInputs
    +    val rowCnt = children.foldLeft(0D) { (rows, child) =>
    +      rows + metadata.getRowCount(child)
    +    }
    +
    +    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: BatchTableEnvironment,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    var leftDataSet: DataSet[Any] = null
    +    var rightDataSet: DataSet[Any] = null
    +
    +    expectedType match {
    +      case None =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +        rightDataSet =
    +          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    +      case _ =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    }
    +
    +    val minusRes = leftDataSet.minus(rightDataSet)
    +    if (!all) {
    +      minusRes.distinct()
    --- End diff --
    
    I would like to move the code of `DataSet.minus()` here. However, I think the semantics of `EXCEPT ALL` are a bit different than in your implementation. It is not simply checking if there is a match in the second input and forwarding everything if there is none. It basically removes for each match in the second input one matching record from the first input (see also the [PostgreSQL docs](https://www.postgresql.org/docs/9.4/static/sql-select.html#SQL-EXCEPT)).
    
    I would be in favor of an implementation that is similar to @wuchong's implementation of `INTERSECT` / `INTERSECT ALL` in PR #2159.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    Hi @mushketyk, thanks for the PR! I added a few comments inline.
    
    Best, Fabian
    @wuchong, thanks for reviewing!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69043528
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1193,21 +1196,46 @@ public long count() throws Exception {
     		operation.setInput(this);
     		return operation.createResult();
     	}
    -	
    +
     	// --------------------------------------------------------------------------------------------
     	//  Union
     	// --------------------------------------------------------------------------------------------
     
     	/**
     	 * Creates a union of this DataSet with an other DataSet. The other DataSet must be of the same data type.
    -	 * 
    +	 *
     	 * @param other The other DataSet which is unioned with the current DataSet.
     	 * @return The resulting DataSet.
     	 */
     	public UnionOperator<T> union(DataSet<T> other){
     		return new UnionOperator<>(this, other, Utils.getCallLocationName());
     	}
     
    +	/**
    +	* Creates a set minus of this DataSet with an other DataSet. The other DataSet must be of the same data type.
    +	*
    +	* @param other The other DataSet which is set minus with the current DataSet.
    +	* @return The resulting DataSet.
    +	*/
    +	public CoGroupOperator<T, T, T> minus(DataSet<T> other){
    --- End diff --
    
    This issue is about adding `EXCEPT` to the Table. The DataSet API which is touched here is a rather low level API and we are quite careful about adding new operators. Therefore, changes to the DataSet API should go through a separate JIRA issue. Please move this code to `DataSetMinus` class and revert the changes to this file. You can open a JIRA issue to discuss adding a `minus` operator to the DataSet API. Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69046225
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -1193,21 +1196,46 @@ public long count() throws Exception {
     		operation.setInput(this);
     		return operation.createResult();
     	}
    -	
    +
     	// --------------------------------------------------------------------------------------------
     	//  Union
     	// --------------------------------------------------------------------------------------------
     
     	/**
     	 * Creates a union of this DataSet with an other DataSet. The other DataSet must be of the same data type.
    -	 * 
    +	 *
     	 * @param other The other DataSet which is unioned with the current DataSet.
     	 * @return The resulting DataSet.
     	 */
     	public UnionOperator<T> union(DataSet<T> other){
     		return new UnionOperator<>(this, other, Utils.getCallLocationName());
     	}
     
    +	/**
    +	* Creates a set minus of this DataSet with an other DataSet. The other DataSet must be of the same data type.
    +	*
    +	* @param other The other DataSet which is set minus with the current DataSet.
    +	* @return The resulting DataSet.
    +	*/
    +	public CoGroupOperator<T, T, T> minus(DataSet<T> other){
    +		return coGroup(other)
    +			.where("*")
    +			.equalTo("*")
    +			.with(new RichCoGroupFunction<T, T, T>() {
    --- End diff --
    
    A non-rich `CoGroupFunction` is sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69048368
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperationsITCase.scala ---
    @@ -121,4 +121,66 @@ class UnionITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  @Test
    +  def testExcept(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +
    +    val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
    +
    +    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
    +    val ds2 = CollectionDataSets.getOneElement3TupleDataSet(env)
    +    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
    +    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "Hello\n" + "Hello world\n"
    +    val results = result.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testExceptWithFilter(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +
    +    val sqlQuery = "SELECT c FROM (" +
    +      "SELECT * FROM t1 EXCEPT (SELECT a, b, c FROM t2))" +
    +      "WHERE b < 2"
    +
    +    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env)
    +    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
    +    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "Hi\n"
    +    val results = result.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testExceptWithAggregation(): Unit = {
    --- End diff --
    
    Please remove this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    Renamed UnionITCase to SetOperationsITCase as suggested here: https://github.com/apache/flink/pull/2159


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69048019
  
    --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
    @@ -1353,6 +1353,16 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
         getCallLocationName()))
     
       // --------------------------------------------------------------------------------------------
    +  //  Minus
    +  // --------------------------------------------------------------------------------------------
    +
    +  /**
    +    * Creates a new DataSet containing the elements from `this` DataSet minus elements from `other`
    +    * DataSet.
    +    */
    +  def minus(other: DataSet[T]): DataSet[T] = wrap(javaSet.minus(other.javaSet))
    --- End diff --
    
    Please remove these changes as well. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r70075759
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala ---
    @@ -0,0 +1,150 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import java.lang.Iterable
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.flink.api.common.functions.CoGroupFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.BatchTableEnvironment
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which matches along with DataSetOperator.
    +  *
    +  */
    +class DataSetMinus(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    left: RelNode,
    +    right: RelNode,
    +    rowType: RelDataType,
    +    all: Boolean)
    +  extends BiRel(cluster, traitSet, left, right)
    +    with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetMinus(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      all
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"SetMinus(setMinus: ($setMinusSelectionToString}))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
    +  }
    +
    +  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
    +
    +    val children = this.getInputs
    +    val rowCnt = children.foldLeft(0D) { (rows, child) =>
    +      rows + metadata.getRowCount(child)
    +    }
    +
    +    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: BatchTableEnvironment,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    var leftDataSet: DataSet[Any] = null
    +    var rightDataSet: DataSet[Any] = null
    +
    +    expectedType match {
    +      case None =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +        rightDataSet =
    +          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    +      case _ =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    }
    +
    +    val leftType = leftDataSet.getType
    +    val rightType = rightDataSet.getType
    +    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
    +
    +    // If it is atomic type, the field expression need to be "*".
    +    // Otherwise, we use int-based field position keys
    +    val coGroupedPredicateDs =
    +    if (leftType.isTupleType || leftType.isInstanceOf[CompositeType[Any]]) {
    +      coGroupedDs.where(0 until left.getRowType.getFieldCount: _*)
    +    } else {
    +      coGroupedDs.where("*")
    +    }
    +
    +    val coGroupedWithoutFunctionDs =
    +    if (rightType.isTupleType || rightType.isInstanceOf[CompositeType[Any]]) {
    +      coGroupedPredicateDs.equalTo(0 until right.getRowType.getFieldCount: _*)
    +    } else {
    +      coGroupedPredicateDs.equalTo("*")
    +    }
    +
    +    coGroupedWithoutFunctionDs.`with`(new MinusCoGroupFunction[Any](all))
    +      .name(s"intersect: $setMinusSelectionToString")
    +  }
    +
    +  private def setMinusSelectionToString: String = {
    +    rowType.getFieldNames.asScala.toList.mkString(", ")
    +  }
    +
    +}
    +
    +class MinusCoGroupFunction[T](all: Boolean) extends CoGroupFunction[T, T, T] {
    --- End diff --
    
    This belongs into the table runtime package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    @twalthr Updated PR according to your commits and rebased on top of the master branch to avoid merge conflicts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69047691
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---
    @@ -426,6 +426,49 @@ class Table(
       }
     
       /**
    +    * Set minus between two [[Table]]s. Similar to an SQL EXCEPT ALL.
    --- End diff --
    
    Please describe the semantics of minus all in more detail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69045985
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -236,6 +236,32 @@ case class Aggregate(
       }
     }
     
    +case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
    +  override def output: Seq[Attribute] = left.output
    +
    +  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    +    left.construct(relBuilder)
    +    right.construct(relBuilder)
    +    relBuilder.minus(all)
    +  }
    +
    +  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    +    val resolvedMinus = super.validate(tableEnv).asInstanceOf[SetMinus]
    +    if (left.output.length != right.output.length) {
    +      failValidation(s"Set minus two table of different column sizes:" +
    +        s" ${left.output.size} and ${right.output.size}")
    +    }
    +    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
    +      l.resultType == r.resultType && l.name == r.name }
    --- End diff --
    
    I think @wuchong refers exactly to the last case. If number of fields and field types are identical, it should be possible to do a `EXCEPT` even if the field names are not the same.
    The situation for `UNION` is a bit special due to Flink's internals and the way the `RowTypeInfo` is implemented but I think we can remove that restriction in the future. 
    
    So, we must keep the checks for number of fields and field types but can remove the check of the field names, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69048325
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SetOperationsITCase.scala ---
    @@ -121,4 +121,66 @@ class UnionITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  @Test
    +  def testExcept(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +
    +    val sqlQuery = "SELECT c FROM t1 EXCEPT (SELECT c FROM t2)"
    +
    +    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
    +    val ds2 = CollectionDataSets.getOneElement3TupleDataSet(env)
    +    tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c)
    +    tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "Hello\n" + "Hello world\n"
    +    val results = result.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testExceptWithFilter(): Unit = {
    --- End diff --
    
    Please remove this test. We need to be very careful about the build time of the project (we are very close to hit the 2h build timeout of Travis) and integration tests are quite time consuming. So we try to avoid integration tests that do not add to the test coverage. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    @fhueske Hey. Sorry for bothering you. Do I need to change something else in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69044111
  
    --- Diff: docs/apis/table.md ---
    @@ -536,6 +536,29 @@ Table result = left.unionAll(right);
         </tr>
     
         <tr>
    +      <td><strong>Minus</strong></td>
    +      <td>
    +        <p>Similar to a SQL EXCEPT clause. Returns elements from the first table that do not exist in the second table. Both tables must have identical schema, i.e., field names and types.</p>
    --- End diff --
    
    Please rephrase to "Except returns records from the first table that do not exist in the second table. Records that exist multiple times in the first table are returned exactly once, i.e., duplicates are removed.".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69048653
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala ---
    @@ -61,14 +62,28 @@ class UnionITCase(
         val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
         val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     
    -    val unionDs = ds1.union(ds2).select('c)
    +    val minusDs = ds1.union(ds2).select('c)
     
    -    val results = unionDs.toDataSet[Row].collect()
    +    val results = minusDs.toDataSet[Row].collect()
         val expected = "Hi\n" + "Hello\n" + "Hello world\n"
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
       @Test
    +  def testSetMinusAllSameSets(): Unit = {
    --- End diff --
    
    Please remove this test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69065352
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -236,6 +236,32 @@ case class Aggregate(
       }
     }
     
    +case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
    +  override def output: Seq[Attribute] = left.output
    +
    +  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    +    left.construct(relBuilder)
    +    right.construct(relBuilder)
    +    relBuilder.minus(all)
    +  }
    +
    +  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    +    val resolvedMinus = super.validate(tableEnv).asInstanceOf[SetMinus]
    +    if (left.output.length != right.output.length) {
    +      failValidation(s"Set minus two table of different column sizes:" +
    +        s" ${left.output.size} and ${right.output.size}")
    +    }
    +    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
    +      l.resultType == r.resultType && l.name == r.name }
    --- End diff --
    
    Yes, I refer to the last case. I agree with @fhueske 's opinion, we can remove the check of field names in `EXCEPT` and `INTERSECT` now, and remove the restriction in `UNION` in the future. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69044780
  
    --- Diff: docs/apis/table.md ---
    @@ -536,6 +536,29 @@ Table result = left.unionAll(right);
         </tr>
     
         <tr>
    +      <td><strong>Minus</strong></td>
    +      <td>
    +        <p>Similar to a SQL EXCEPT clause. Returns elements from the first table that do not exist in the second table. Both tables must have identical schema, i.e., field names and types.</p>
    +{% highlight java %}
    +Table left = tableEnv.fromDataSet(ds1, "a, b, c");
    +Table right = tableEnv.fromDataSet(ds2, "a, b, c");
    +Table result = left.minus(right);
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>MinusAll</strong></td>
    +      <td>
    +        <p>Similar to a SQL EXCEPT ALL clause. Returns elements from the first table that do not exist in the second table without removing duplicates. Both tables must have identical schema, i.e., field names and types.</p>
    --- End diff --
    
    Please rephrase to "Except All returns the records that do not exist in the second table. A record that is present n times in the first table and m times in the second table is returned (n - m) times, i.e., as many duplicates as are present in the second table are removed.".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69048813
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala ---
    @@ -139,4 +154,105 @@ class UnionITCase(
         // Must fail. Tables are bound to different TableEnvironments.
         ds1.unionAll(ds2).select('c)
       }
    +
    +  @Test
    +  def testSetMinusAll(): Unit = {
    +    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +
    +    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
    +    val ds2 = CollectionDataSets.getOneElement3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
    +
    +    val minusDs = ds1.minusAll(ds2).select('c)
    +
    +    val results = minusDs.toDataSet[Row].collect()
    +    val expected = "Hello\n" + "Hello world\n"
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testSetMinusAllWithDuplicates(): Unit = {
    +    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +
    +    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
    +    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
    +    val ds3 = CollectionDataSets.getOneElement3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
    +
    +    val minusDs = ds1.unionAll(ds2).minusAll(ds3).select('c)
    +
    +    val results = minusDs.toDataSet[Row].collect()
    +    val expected = "Hello\n" + "Hello world\n" +
    +      "Hello\n" + "Hello world\n"
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testSetMinus(): Unit = {
    --- End diff --
    
    Can you combine this and the next test by using test data that covers both cases for different records, i.e., have some records with duplicates in this first, second, none, and both data sets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69021580
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala ---
    @@ -0,0 +1,106 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.BatchTableEnvironment
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which matches along with DataSetOperator.
    +  *
    +  */
    +class DataSetMinus(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    left: RelNode,
    +    right: RelNode,
    +    rowType: RelDataType,
    +    all: Boolean)
    +  extends BiRel(cluster, traitSet, left, right)
    +    with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetMinus(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      all
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"SetMinus(setMinus: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
    +  }
    +
    +  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
    +
    +    val children = this.getInputs
    +    val rowCnt = children.foldLeft(0D) { (rows, child) =>
    +      rows + metadata.getRowCount(child)
    +    }
    +
    +    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: BatchTableEnvironment,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    var leftDataSet: DataSet[Any] = null
    +    var rightDataSet: DataSet[Any] = null
    +
    +    expectedType match {
    +      case None =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +        rightDataSet =
    +          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    +      case _ =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    }
    +
    +    val minusRes = leftDataSet.minus(rightDataSet)
    +    if (!all) {
    +      minusRes.distinct()
    --- End diff --
    
    On the second hand "union" method in DataSet.java preserves duplicates and used to implement UNION ALL. UNION is implemented by using "distinct" operation on top of UNION ALL.
    It seems that what you suggest will only add code duplication since it will basically implement a specialized version of "distinct" operator specifically for the "minus" method.
    
    I have no strong preferences though.  @fhueske what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r70071730
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -236,6 +236,32 @@ case class Aggregate(
       }
     }
     
    +case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
    --- End diff --
    
    I would still call it `Minus` to be in sync with `Union`, `Intersect`, etc. but that's just my personal opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69047560
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetMinusRule.scala ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.rules.dataSet
    +
    +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.convert.ConverterRule
    +import org.apache.calcite.rel.logical.LogicalMinus
    +import org.apache.calcite.rel.rules.UnionToDistinctRule
    +import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetMinus}
    +
    +class DataSetMinusRule
    +  extends ConverterRule(
    +    classOf[LogicalMinus],
    +    Convention.NONE,
    +    DataSetConvention.INSTANCE,
    +    "DataSetMinusRule")
    +{
    +
    +  /**
    +    * Translate EXCEPT and EXCEPT ALL.
    +    */
    +  override def matches(call: RelOptRuleCall): Boolean = {
    --- End diff --
    
    This method can be removed. It always returns `true` which is the default implementation which is overwritten here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r70069883
  
    --- Diff: docs/apis/table.md ---
    @@ -536,6 +536,29 @@ Table result = left.unionAll(right);
         </tr>
     
         <tr>
    +      <td><strong>Minus</strong></td>
    +      <td>
    +        <p>Similar to a SQL EXCEPT clause. Except returns records from the first table that do not exist in the second table. Duplicate records in the first table are returned exactly once, i.e., duplicates are removed. Both tables must have identical schema, i.e., field names and types.</p>
    --- End diff --
    
    "Except returns records" should be "Minus returns records" to be consistent. I would also use "left/right" table instead of first and second according to your example code. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    Updated documentation as suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    @fhueske Thank you for the detailed review! I've updated my code according to your comments.
    
    I noticed that @wuchong is performing some type conversions in his INTERSECT implementation: https://github.com/apache/flink/pull/2159/files#diff-a6c2112ca46d26fcf49f1edba1c73f75R121
    
    Should I do something similar in the EXCEPT case? If yes, does it mean that my test coverage is not sufficient and does not cover some particular case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    Thanks for updating the PR. I reviewed the code again. There were still some issues regarding the documentation and also `expectedType` handling in `DataSetMinus`, I fixed them myself and will merge now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2169
  
    @twalthr Thank you for your review! I've update the PR according to your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69045241
  
    --- Diff: docs/apis/table.md ---
    @@ -873,7 +920,7 @@ val result = tableEnv.sql(
     
     #### Limitations
     
    -The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE`, and `UNION` clauses. Aggregations or joins are not supported yet.
    +The current version of streaming SQL only supports `SELECT`, `FROM`, `WHERE` and `UNION` clauses. Aggregations or joins are not supported yet.
    --- End diff --
    
    We try to consistently use [serial commas](https://en.wikipedia.org/wiki/Serial_comma) in this document.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r68704013
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -236,6 +236,32 @@ case class Aggregate(
       }
     }
     
    +case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
    +  override def output: Seq[Attribute] = left.output
    +
    +  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    +    left.construct(relBuilder)
    +    right.construct(relBuilder)
    +    relBuilder.minus(all)
    +  }
    +
    +  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    +    val resolvedMinus = super.validate(tableEnv).asInstanceOf[SetMinus]
    +    if (left.output.length != right.output.length) {
    +      failValidation(s"Set minus two table of different column sizes:" +
    +        s" ${left.output.size} and ${right.output.size}")
    +    }
    +    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
    +      l.resultType == r.resultType && l.name == r.name }
    --- End diff --
    
    Hm... I don't think I fully understand reasoning behind this. Could you elaborate please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r69048442
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SetOperationsITCase.scala ---
    @@ -61,14 +62,28 @@ class UnionITCase(
         val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
         val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     
    -    val unionDs = ds1.union(ds2).select('c)
    +    val minusDs = ds1.union(ds2).select('c)
    --- End diff --
    
    Why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r70073020
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala ---
    @@ -0,0 +1,150 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import java.lang.Iterable
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.flink.api.common.functions.CoGroupFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.BatchTableEnvironment
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which matches along with DataSetOperator.
    +  *
    +  */
    +class DataSetMinus(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    left: RelNode,
    +    right: RelNode,
    +    rowType: RelDataType,
    +    all: Boolean)
    +  extends BiRel(cluster, traitSet, left, right)
    +    with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetMinus(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      all
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"SetMinus(setMinus: ($setMinusSelectionToString}))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
    +  }
    +
    +  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
    +
    +    val children = this.getInputs
    +    val rowCnt = children.foldLeft(0D) { (rows, child) =>
    +      rows + metadata.getRowCount(child)
    +    }
    +
    +    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    --- End diff --
    
    Shouldn't the cost be similar to an Intersect because both are using a CoGroup?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r70075580
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMinus.scala ---
    @@ -0,0 +1,150 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.plan.nodes.dataset
    +
    +import java.lang.Iterable
    +
    +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
    +import org.apache.calcite.rel.`type`.RelDataType
    +import org.apache.calcite.rel.metadata.RelMetadataQuery
    +import org.apache.calcite.rel.{BiRel, RelNode, RelWriter}
    +import org.apache.flink.api.common.functions.CoGroupFunction
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.common.typeutils.CompositeType
    +import org.apache.flink.api.java.DataSet
    +import org.apache.flink.api.table.BatchTableEnvironment
    +import org.apache.flink.util.Collector
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.JavaConversions._
    +
    +/**
    +  * Flink RelNode which matches along with DataSetOperator.
    +  *
    +  */
    +class DataSetMinus(
    +    cluster: RelOptCluster,
    +    traitSet: RelTraitSet,
    +    left: RelNode,
    +    right: RelNode,
    +    rowType: RelDataType,
    +    all: Boolean)
    +  extends BiRel(cluster, traitSet, left, right)
    +    with DataSetRel {
    +
    +  override def deriveRowType() = rowType
    +
    +  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
    +    new DataSetMinus(
    +      cluster,
    +      traitSet,
    +      inputs.get(0),
    +      inputs.get(1),
    +      rowType,
    +      all
    +    )
    +  }
    +
    +  override def toString: String = {
    +    s"SetMinus(setMinus: ($setMinusSelectionToString}))"
    +  }
    +
    +  override def explainTerms(pw: RelWriter): RelWriter = {
    +    super.explainTerms(pw).item("setMinus", setMinusSelectionToString)
    +  }
    +
    +  override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
    +
    +    val children = this.getInputs
    +    val rowCnt = children.foldLeft(0D) { (rows, child) =>
    +      rows + metadata.getRowCount(child)
    +    }
    +
    +    planner.getCostFactory.makeCost(rowCnt, 0, 0)
    +  }
    +
    +  override def translateToPlan(
    +      tableEnv: BatchTableEnvironment,
    +      expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
    +
    +    var leftDataSet: DataSet[Any] = null
    +    var rightDataSet: DataSet[Any] = null
    +
    +    expectedType match {
    +      case None =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
    +        rightDataSet =
    +          right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, Some(leftDataSet.getType))
    +      case _ =>
    +        leftDataSet = left.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +        rightDataSet = right.asInstanceOf[DataSetRel].translateToPlan(tableEnv, expectedType)
    +    }
    +
    +    val leftType = leftDataSet.getType
    +    val rightType = rightDataSet.getType
    +    val coGroupedDs = leftDataSet.coGroup(rightDataSet)
    +
    +    // If it is atomic type, the field expression need to be "*".
    +    // Otherwise, we use int-based field position keys
    +    val coGroupedPredicateDs =
    --- End diff --
    
    You can use "*" for all types. So the case distinction is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r70071069
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---
    @@ -18,8 +18,8 @@
     
     package org.apache.flink.api.java;
    --- End diff --
    
    I think you did not intend to change this file. Can you remove it from this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2169: [FLINK-3943] Add support for EXCEPT operator

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2169#discussion_r68706865
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala ---
    @@ -236,6 +236,32 @@ case class Aggregate(
       }
     }
     
    +case class SetMinus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
    +  override def output: Seq[Attribute] = left.output
    +
    +  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
    +    left.construct(relBuilder)
    +    right.construct(relBuilder)
    +    relBuilder.minus(all)
    +  }
    +
    +  override def validate(tableEnv: TableEnvironment): LogicalNode = {
    +    val resolvedMinus = super.validate(tableEnv).asInstanceOf[SetMinus]
    +    if (left.output.length != right.output.length) {
    +      failValidation(s"Set minus two table of different column sizes:" +
    +        s" ${left.output.size} and ${right.output.size}")
    +    }
    +    val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
    +      l.resultType == r.resultType && l.name == r.name }
    --- End diff --
    
    I think both tables must have the same number of fields with similar data types, but they can accept different field names. e.g.  `testMinusDifferentFieldNames`  should pass without no exception. Correct me if I'm wrong. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---