You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by vasia <gi...@git.apache.org> on 2016/04/07 17:13:59 UTC

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

GitHub user vasia opened a pull request:

    https://github.com/apache/flink/pull/1862

    [FLINK-3640] Add support for SQL in DataSet programs

    This PR adds basic support for batch SQL queries embedded in Table API programs.
    In order to run a SQL query, a `DataSet` or `Table` needs to be registered in the `TableEnvironment` and then the query is executed using the `sql` method:
    
    ```
    val tEnv = getScalaTableEnvironment
    val t = getDataSet(env).toTable
    tEnv.registerTable("MyTable", t)
    val sqlQuery = "SELECT * FROM MyTable"
    val result = tEnv.sql(sqlQuery)
    ```
    The result of the `sql` method is a `Table` which can be used in subsequent Table API or SQL queries.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vasia/flink batch-sql

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1862.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1862
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1862#issuecomment-208272375
  
    Thanks, I will make the change and merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1862


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/1862#issuecomment-206986445
  
    Awesome! Really cool to see that coming to Flink.
    I played a bit around with it and it seems to work ;)
    
    ```java
    Table table = tableEnv.fromDataSet(input);
    tableEnv.registerTable("tab", table);
    tableEnv.registerTable("tab1", table);
    Table res = tableEnv.sql("SELECT COUNT(tab1.acount) AS acount, tab.word " +
    	"FROM tab, tab1 " +
    	"WHERE tab.word = tab1.word GROUP BY tab.word");
    res = res.filter("acount > 2");
    ```
    
    ```java
    Table res = tableEnv.sql("SELECT COUNT(acount) AS acount, word " +
    	"FROM (SELECT * FROM tab WHERE acount = 1 ) " +
    	"GROUP BY word");
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1862#discussion_r59181718
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---
    @@ -53,21 +55,26 @@ object TranslationContext {
         // configure sql parser
         // we use Java lex because back ticks are easier than double quotes in programming
         // and cases are preserved
    -    val parserConfig = SqlParser.configBuilder().setLex(Lex.JAVA).build()
    +    val parserConfig = SqlParser
    +      .configBuilder()
    +      .setLex(Lex.JAVA)
    +      .build()
     
         // initialize RelBuilder
         frameworkConfig = Frameworks
           .newConfigBuilder
           .defaultSchema(tables)
           .parserConfig(parserConfig)
           .costFactory(new DataSetCostFactory)
    -      .traitDefs(ConventionTraitDef.INSTANCE)
    +      .programs(Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES))
    --- End diff --
    
    I think this line can be removed because we set the rules explicitly before calling the optimizer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1862#discussion_r59025219
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala ---
    @@ -0,0 +1,264 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.sql.test
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.scala.util.CollectionDataSets
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.plan.TranslationContext
    +import org.apache.flink.api.table.test.utils.TableProgramsTestBase
    +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit._
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +
    +@RunWith(classOf[Parameterized])
    +class AggregationsITCase(
    --- End diff --
    
    Do we need each test for `DataSet` and `Table`? Wouldn't it be sufficient to test for `Table` and have one or two tests for `DataSet`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1862#discussion_r59007631
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala ---
    @@ -83,4 +84,17 @@ class AbstractTableEnvironment {
         )
         TranslationContext.registerTable(dataSetTable, name)
       }
    +
    +  /**
    +   * Execute a SQL query on a batch [[Table]].
    +   * The [[Table]] has to be registered in the tables registry
    --- End diff --
    
    We can query more than one table (all tables need to be registered). Tables should be registered at the TableEnvironment. I think the term `tables registry` is not defined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1862#issuecomment-208266838
  
    +1 to merge after resolving one last minor comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1862#discussion_r59007483
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala ---
    @@ -83,4 +84,17 @@ class AbstractTableEnvironment {
         )
         TranslationContext.registerTable(dataSetTable, name)
       }
    +
    +  /**
    +   * Execute a SQL query on a batch [[Table]].
    --- End diff --
    
    Keep the description of this method more generic as it will be the entry point for stream SQL queries as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1862#discussion_r59025508
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala ---
    @@ -0,0 +1,239 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.sql.test
    +
    +import org.apache.calcite.tools.ValidationException
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.scala.util.CollectionDataSets
    +import org.apache.flink.api.table.{TableException, Row}
    +import org.apache.flink.api.table.plan.TranslationContext
    +import org.apache.flink.api.table.test.utils.TableProgramsTestBase
    +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode
    +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
    +import org.apache.flink.test.util.TestBaseUtils
    +import org.junit._
    +import org.junit.runner.RunWith
    +import org.junit.runners.Parameterized
    +
    +import scala.collection.JavaConverters._
    +
    +@RunWith(classOf[Parameterized])
    +class JoinITCase(
    +    mode: TestExecutionMode,
    +    configMode: TableConfigMode)
    +  extends TableProgramsTestBase(mode, configMode) {
    +
    +  @Test
    +  def testJoin(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = getScalaTableEnvironment
    +    TranslationContext.reset()
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
    +
    +    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
    +    val results = result.toDataSet[Row](getConfig).collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testJoinWithFilter(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = getScalaTableEnvironment
    +    TranslationContext.reset()
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
    +
    +    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "Hi,Hallo\n"
    +    val results = result.toDataSet[Row](getConfig).collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testJoinWithJoinFilter(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = getScalaTableEnvironment
    +    TranslationContext.reset()
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"
    +
    +    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "Hello world, how are you?,Hallo Welt wie\n" +
    +      "I am fine.,Hallo Welt wie\n"
    +    val results = result.toDataSet[Row](getConfig).collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testJoinWithMultipleKeys(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = getScalaTableEnvironment
    +    TranslationContext.reset()
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
    +
    +    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sql(sqlQuery)
    +
    +    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
    +      "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
    +    val results = result.toDataSet[Row](getConfig).collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test(expected = classOf[ValidationException])
    +  def testJoinNonExistingKey(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = getScalaTableEnvironment
    +    TranslationContext.reset()
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e"
    +
    +    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    tEnv.sql(sqlQuery)
    +  }
    +
    +  @Test(expected = classOf[TableException])
    +  def testJoinNonMatchingKeyTypes(): Unit = {
    +
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = getScalaTableEnvironment
    +    TranslationContext.reset()
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g"
    +
    +    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c)
    +    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    tEnv.sql(sqlQuery).toDataSet[Row].collect()
    +  }
    +
    +  @Test(expected = classOf[ValidationException])
    +  def testJoinWithAmbiguousFields(): Unit = {
    --- End diff --
    
    Can you add another test that resolves the ambiguous field using a table alias, i.e., `SELECT Table5.c, g FROM Table3, Table5 WHERE a = d`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1862#issuecomment-207438727
  
    Thanks for the PR. I had a few minor comments but otherwise it looks really good. 
    
    There are a few follow up issues, IMO:
    - Check if we somehow can get around the `EnumerableToLogicalTableScan`. Maybe the Calcite community can help. I will open a JIRA for this once the PR is merged.
    - Check how we can exclude unsupported SQL features such as outer joins, intersection, etc. Also here, the Calcite community should be able to help. I will open a JIRA for this once the PR is merged.
    - Refactor `TranslationContext` and `TableEnvironment` to prevent that the same planner is used several times. I'll start a discussion about this soon.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3640] Add support for SQL in DataSet pr...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1862#issuecomment-207761336
  
    Thanks for the review @fhueske. I've addressed your comments :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---