You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by vasia <gi...@git.apache.org> on 2016/02/12 21:22:40 UTC

[GitHub] flink pull request: [FLINK-3226] Translate logical joins to physic...

GitHub user vasia opened a pull request:

    https://github.com/apache/flink/pull/1632

    [FLINK-3226] Translate logical joins to physical

    This PR contains the logical to physical translation for joins.
    - Joins with equality conditions only do not generate a join function.
    - Joins with equality and non-equality conditions evaluate the non-equality conditions inside a `FlatJoinFunction`.
    - Joins without any equality conditions e.g. `in1.join.n2.where(a > b)` are currently not supported. We could consider supporting those later by generating a cross function.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vasia/flink translateJoin

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1632.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1632
    
----
commit f4563cf2ea78a993b00f6398006b1a064ad83857
Author: vasia <va...@apache.org>
Date:   2016-02-11T17:04:45Z

    [FLINK-3226] Translate logical joins to physical

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical joins to physic...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1632#issuecomment-184170164
  
    Great, merging this also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical joins to physic...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1632#issuecomment-183957839
  
    Thanks for the review @twalthr! I've addressed your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical joins to physic...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1632#discussion_r52826951
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala ---
    @@ -63,7 +63,7 @@ class DataSetMap(
           config: TableConfig,
           expectedType: Option[TypeInformation[Any]])
         : DataSet[Any] = {
    -    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config)
    +    val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)
    --- End diff --
    
    You cannot forward the expected type, since you don't know what the previous operator does.
    E.g. if the expected type is `Tuple2` but the previous operator outputs records with one field.
    Why did you change this call?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical joins to physic...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1632#discussion_r52826898
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---
    @@ -148,16 +148,24 @@ class CodeGenerator(
           if (clazz == classOf[FlatMapFunction[_,_]]) {
             val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
             (s"void flatMap(Object _in1, org.apache.flink.util.Collector $collectorTerm)",
    -          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
           }
     
           // MapFunction
           else if (clazz == classOf[MapFunction[_,_]]) {
             val inputTypeTerm = boxedTypeTermForTypeInfo(input1)
             ("Object map(Object _in1)",
    -          s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")
    +          List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;"))
           }
     
    +      // FlatJoinFunction
    +      else if (clazz == classOf[FlatJoinFunction[_,_,_]]) {
    +        val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1)
    +        val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.get) //TODO check this
    --- End diff --
    
    Why don't you call `getOrElse` here and throw an CodeGenException if input2 is not present? Then we can get rid of the TODO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical joins to physic...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/flink/pull/1632#issuecomment-184113402
  
    Thanks for the changes. Looks good to merge ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical joins to physic...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1632#discussion_r52827003
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---
    @@ -39,18 +49,80 @@ class DataSetJoinRule
         val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
         val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
     
    -    new DataSetJoin(
    -      rel.getCluster,
    -      traitSet,
    -      convLeft,
    -      convRight,
    -      rel.getRowType,
    -      join.toString,
    -      Array[Int](),
    -      Array[Int](),
    -      JoinType.INNER,
    -      null,
    -      null)
    +    // get the equality keys
    +    val joinInfo = join.analyzeCondition
    +    val keyPairs = joinInfo.pairs
    +
    +    if (keyPairs.isEmpty) { // if no equality keys => not supported
    +      throw new TableException("Joins should have at least one equality condition")
    +    }
    +    else { // at least one equality expression => generate a join function
    +      val conditionType = join.getCondition.getType
    +      val func = getJoinFunction(join, joinInfo)
    +      val leftKeys = ArrayBuffer.empty[Int]
    +      val rightKeys = ArrayBuffer.empty[Int]
    +
    +      keyPairs.foreach(pair => {
    +        leftKeys.add(pair.source)
    +        rightKeys.add(pair.target)}
    +      )
    +
    +      new DataSetJoin(
    +        rel.getCluster,
    +        traitSet,
    +        convLeft,
    +        convRight,
    +        rel.getRowType,
    +        join.toString,
    +        leftKeys.toArray,
    +        rightKeys.toArray,
    +        JoinType.INNER,
    +        null,
    +        func)
    +    }
    +  }
    +
    +  def getJoinFunction(join: FlinkJoin, joinInfo: JoinInfo):
    +      ((TableConfig, TypeInformation[Any], TypeInformation[Any], TypeInformation[Any]) =>
    +      FlatJoinFunction[Any, Any, Any]) = {
    +
    +    if (joinInfo.isEqui) {
    +      // only equality condition => no join function necessary
    +      null
    --- End diff --
    
    In general, `null` is not very welcome in Scala. Could you return a `FlatJoinFunction` (containing only a `ConverterResultExpression`) here too? We can then get rid of the `Tuple2RowMapper` and support any type as output type of the join operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical joins to physic...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1632#discussion_r52827029
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetJoinRule.scala ---
    @@ -39,18 +49,80 @@ class DataSetJoinRule
         val convLeft: RelNode = RelOptRule.convert(join.getInput(0), DataSetConvention.INSTANCE)
         val convRight: RelNode = RelOptRule.convert(join.getInput(1), DataSetConvention.INSTANCE)
     
    -    new DataSetJoin(
    -      rel.getCluster,
    -      traitSet,
    -      convLeft,
    -      convRight,
    -      rel.getRowType,
    -      join.toString,
    -      Array[Int](),
    -      Array[Int](),
    -      JoinType.INNER,
    -      null,
    -      null)
    +    // get the equality keys
    +    val joinInfo = join.analyzeCondition
    +    val keyPairs = joinInfo.pairs
    +
    +    if (keyPairs.isEmpty) { // if no equality keys => not supported
    +      throw new TableException("Joins should have at least one equality condition")
    +    }
    +    else { // at least one equality expression => generate a join function
    +      val conditionType = join.getCondition.getType
    +      val func = getJoinFunction(join, joinInfo)
    +      val leftKeys = ArrayBuffer.empty[Int]
    +      val rightKeys = ArrayBuffer.empty[Int]
    +
    +      keyPairs.foreach(pair => {
    +        leftKeys.add(pair.source)
    +        rightKeys.add(pair.target)}
    +      )
    +
    +      new DataSetJoin(
    +        rel.getCluster,
    +        traitSet,
    +        convLeft,
    +        convRight,
    +        rel.getRowType,
    +        join.toString,
    +        leftKeys.toArray,
    +        rightKeys.toArray,
    +        JoinType.INNER,
    +        null,
    +        func)
    +    }
    +  }
    +
    +  def getJoinFunction(join: FlinkJoin, joinInfo: JoinInfo):
    +      ((TableConfig, TypeInformation[Any], TypeInformation[Any], TypeInformation[Any]) =>
    +      FlatJoinFunction[Any, Any, Any]) = {
    +
    +    if (joinInfo.isEqui) {
    +      // only equality condition => no join function necessary
    +      null
    +    }
    +    else {
    +      val func = (
    +        config: TableConfig,
    +        leftInputType: TypeInformation[Any],
    +        rightInputType: TypeInformation[Any],
    +        returnType: TypeInformation[Any]) => {
    +
    +      val generator = new CodeGenerator(config, leftInputType, Some(rightInputType))
    +      val condition = generator.generateExpression(join.getCondition)
    +      val body = {
    --- End diff --
    
    Unnecessary brackets?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3226] Translate logical joins to physic...

Posted by vasia <gi...@git.apache.org>.
Github user vasia closed the pull request at:

    https://github.com/apache/flink/pull/1632


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---