You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by vasia <gi...@git.apache.org> on 2016/04/20 18:30:37 UTC

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

GitHub user vasia opened a pull request:

    https://github.com/apache/flink/pull/1917

    [FLINK-3727] Embedded streaming SQL projection, filtering, union

    This PR adds support for embedded streaming SQL (projection, filtering, union):
    - methods to register DataStreams
    - sql translation method in StreamTableEnvironment
    - a custom rule to convert to streamable table
    - docs for streaming table and sql
    - java SQL tests
    
    A streaming SQL query can be executed on a streaming Table by simply adding the `STREAM` keyword in front of the table name. Registering DataStream tables and conversions work in a similar way to that of DataStream tables.
    Here's a filtering example:
    ```
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    
    val dataStream = env.addSource(...)
    val t = dataStream.toTable(tEnv).as('a, 'b, 'c)
    tEnv.registerTable("MyTable", t)
    
    val sqlQuery = "SELECT STREAM * FROM MyTable WHERE a = 3"
    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    ```


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vasia/flink stream-sql

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1917.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1917
    
----
commit 6b747bd6902074d0475de9519c1c3bd693487eef
Author: vasia <va...@apache.org>
Date:   2016-04-15T11:35:24Z

    [FLINK-3727] Add support for embedded streaming SQL (projection, filter, union)
    
    - add methods to register DataStreams
    - add sql translation method in StreamTableEnvironment
    - add a custom rule to convert to streamable table
    - add docs for streaming table and sql

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60477908
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/SelectITCase.scala ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.sql.streaming.test
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
    +import org.apache.flink.api.table.{Row, TableEnvironment}
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +import org.junit.Assert._
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class SelectITCase extends StreamingMultipleProgramsTestBase {
    +
    +  @Test
    +  def testSelectStarFromTable(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val sqlQuery = "SELECT STREAM * FROM MyTable"
    +
    +    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,1,Hi",
    +      "2,2,Hello",
    +      "3,2,Hello world")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testSelectFirstFromTable(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val sqlQuery = "SELECT STREAM a FROM MyTable"
    +
    +    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("1", "2", "3")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testSelectExpressionFromTable(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val sqlQuery = "SELECT STREAM a * 2, b - 1 FROM MyTable"
    +
    +    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("2,0", "4,1", "6,1")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +
    +  @Test
    +  def testSelectSecondFromDataStream(): Unit = {
    --- End diff --
    
    This test might be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1917#issuecomment-212583555
  
    Thanks for the PR. Looks good. I had a few minor comments. 
    I also marked a few tests that could be removed because they do not increase test coverage, IMO. In general, we should be more careful when adding tests. Builds start to fail on Travis because the 2h threshold is exceeded.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1917#issuecomment-212892867
  
    Thanks for the update @vasia! +1 to merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60477818
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/FilterITCase.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.sql.streaming.test
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
    +import org.apache.flink.api.table.{Row, TableEnvironment}
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +import org.junit.Assert._
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class FilterITCase extends StreamingMultipleProgramsTestBase {
    +
    +  @Test
    +  def testSimpleFilter(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val sqlQuery = "SELECT STREAM * FROM MyTable WHERE a = 3"
    +
    +    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("3,2,Hello world")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testAllRejectingFilter(): Unit = {
    --- End diff --
    
    This test might be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1917


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60477897
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/SelectITCase.scala ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.sql.streaming.test
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
    +import org.apache.flink.api.table.{Row, TableEnvironment}
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +import org.junit.Assert._
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class SelectITCase extends StreamingMultipleProgramsTestBase {
    +
    +  @Test
    +  def testSelectStarFromTable(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val sqlQuery = "SELECT STREAM * FROM MyTable"
    +
    +    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,1,Hi",
    +      "2,2,Hello",
    +      "3,2,Hello world")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testSelectFirstFromTable(): Unit = {
    --- End diff --
    
    This test might be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60473805
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---
    @@ -98,8 +98,42 @@ abstract class TableEnvironment(val config: TableConfig) {
     
         checkValidTableName(name)
     
    -    val tableTable = new TableTable(table.getRelNode)
    -    registerTableInternal(name, tableTable)
    +    table.tableEnv match {
    +      case e: BatchTableEnvironment =>
    +        val tableTable = new TableTable(table.getRelNode)
    +        registerTableInternal(name, tableTable)
    +      case e: StreamTableEnvironment =>
    +        val sTableTable = new TransStreamTable(table.getRelNode, true)
    +        tables.add(name, sTableTable)
    +    }
    +
    +  }
    +
    +  protected def registerStreamTableInternal(name: String, table: AbstractTable): Unit = {
    +
    +    if (isRegistered(name)) {
    +      throw new TableException(s"Table \'$name\' already exists. " +
    +        s"Please, choose a different name.")
    +    } else {
    +      tables.add(name, table)
    +    }
    +  }
    +
    +  /**
    +   * Replaces a registered Table with another Table under the same name.
    +   * We use this method to replace a [[org.apache.flink.api.table.plan.schema.DataStreamTable]]
    +   * with a [[org.apache.calcite.schema.TranslatableTable]].
    +   *
    +   * @param name
    +   * @param table
    +   */
    +  protected def replaceRegisteredStreamTable(name: String, table: AbstractTable): Unit = {
    --- End diff --
    
    rename to `replaceRegisteredTable()` since it is not specific for stream tables?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60473002
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---
    @@ -112,19 +122,31 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
         *
         * @param name The name under which the table is registered in the catalog.
         * @param dataStream The [[DataStream]] to register as table in the catalog.
    +    * @param wrapper True if the registration has to wrap the datastreamTable
    +   *                into a [[org.apache.calcite.schema.StreamableTable]]
    --- End diff --
    
    comment indention


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60472476
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala ---
    @@ -83,11 +83,54 @@ class StreamTableEnvironment(
           .toArray
     
         val name = createUniqueTableName()
    -    registerDataStreamInternal(name, dataStream, exprs)
    +    registerDataStreamInternal(name, dataStream, exprs, false)
         ingest(name)
       }
     
       /**
    +   * Registers the given [[DataStream]] as table in the
    --- End diff --
    
    nitpicking, ScalaDoc indents the `*` under the second `*` of the first line, i.e.,
    ```
    /**
      *
      */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60471750
  
    --- Diff: docs/apis/batch/libs/table.md ---
    @@ -419,8 +438,8 @@ Only the types `LONG` and `STRING` can be casted to `DATE` and vice versa. A `LO
     SQL
     ----
     The Table API also supports embedded SQL queries.
    -In order to use a `Table` or `DataSet` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
    -A registered `Table` can be retrieved back from the `TableEnvironment` using the `scan()` method:
    +In order to use a `Table`, `DataSet`, or `DataStream` in a SQL query, it has to be registered in the `TableEnvironment`, using a unique name.
    +A registered Dataset `Table` can be retrieved back from the `TableEnvironment` using the `scan()` method:
    --- End diff --
    
    Add a sentence about the `ingest()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60473701
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---
    @@ -98,8 +98,42 @@ abstract class TableEnvironment(val config: TableConfig) {
     
         checkValidTableName(name)
     
    -    val tableTable = new TableTable(table.getRelNode)
    -    registerTableInternal(name, tableTable)
    +    table.tableEnv match {
    +      case e: BatchTableEnvironment =>
    +        val tableTable = new TableTable(table.getRelNode)
    +        registerTableInternal(name, tableTable)
    +      case e: StreamTableEnvironment =>
    +        val sTableTable = new TransStreamTable(table.getRelNode, true)
    +        tables.add(name, sTableTable)
    +    }
    +
    +  }
    +
    +  protected def registerStreamTableInternal(name: String, table: AbstractTable): Unit = {
    --- End diff --
    
    same as `registerTableInternal()`, no? So can be removed, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60477843
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/FilterITCase.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.sql.streaming.test
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
    +import org.apache.flink.api.table.{Row, TableEnvironment}
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +import org.junit.Assert._
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class FilterITCase extends StreamingMultipleProgramsTestBase {
    +
    +  @Test
    +  def testSimpleFilter(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val sqlQuery = "SELECT STREAM * FROM MyTable WHERE a = 3"
    +
    +    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("3,2,Hello world")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testAllRejectingFilter(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val sqlQuery = "SELECT STREAM * FROM MyTable WHERE false"
    +
    +    val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    assertEquals(true, StreamITCase.testResults.isEmpty)
    +  }
    +
    +  @Test
    +  def testAllPassingFilter(): Unit = {
    --- End diff --
    
    This test might be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60477888
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/streaming/test/SelectITCase.scala ---
    @@ -0,0 +1,117 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.scala.sql.streaming.test
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.table._
    +import org.apache.flink.api.scala.table.streaming.test.utils.{StreamITCase, StreamTestData}
    +import org.apache.flink.api.table.{Row, TableEnvironment}
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
    +import org.junit.Assert._
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class SelectITCase extends StreamingMultipleProgramsTestBase {
    +
    +  @Test
    +  def testSelectStarFromTable(): Unit = {
    --- End diff --
    
    This test might be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1917#issuecomment-212828521
  
    Thank you for the review @fhueske! I've addressed your comments. We can definitely try to figure out a better way to handle the sql tests, as there is a lot of overlap with the table api tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3727] Embedded streaming SQL projection...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1917#discussion_r60472854
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---
    @@ -86,6 +86,8 @@ abstract class StreamTableEnvironment(config: TableConfig) extends TableEnvironm
     
         if (isRegistered(tableName)) {
           relBuilder.scan(tableName)
    +      //val delta: LogicalDelta = LogicalDelta.create(relBuilder.build())
    --- End diff --
    
    can be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---