You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2018/08/30 15:23:00 UTC

[jira] [Updated] (FLINK-10261) INSERT INTO does not work with ORDER BY clause

     [ https://issues.apache.org/jira/browse/FLINK-10261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timo Walther updated FLINK-10261:
---------------------------------
    Description: 
It seems that INSERT INTO and ORDER BY do not work well together.

An AssertionError is thrown and the ORDER BY clause is duplicated. I guess this is a Calcite issue.

Example:
{code}
@Test
  def testInsertIntoMemoryTable(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tEnv = TableEnvironment.getTableEnvironment(env)
    MemoryTableSourceSinkUtil.clear()

    val t = StreamTestData.getSmall3TupleDataStream(env)
        .assignAscendingTimestamps(x => x._2)
      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
    tEnv.registerTable("sourceTable", t)

    val fieldNames = Array("d", "e", "f", "t")
    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
      .asInstanceOf[Array[TypeInformation[_]]]
    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)

    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable ORDER BY a"
    tEnv.sqlUpdate(sql)
    env.execute()
{code}

Error:
{code}
java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime`
FROM `sourceTable` AS `sourceTable`
ORDER BY `a`
ORDER BY `a`

	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557)
	at org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104)
	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717)
	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
{code}


  was:
It seems that INSERT INTO and ORDER BY do not work well together.

An AssertionError is thrown and the ORDER BY clause is duplicated.

Example:
{code}
@Test
  def testInsertIntoMemoryTable(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tEnv = TableEnvironment.getTableEnvironment(env)
    MemoryTableSourceSinkUtil.clear()

    val t = StreamTestData.getSmall3TupleDataStream(env)
        .assignAscendingTimestamps(x => x._2)
      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
    tEnv.registerTable("sourceTable", t)

    val fieldNames = Array("d", "e", "f", "t")
    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
      .asInstanceOf[Array[TypeInformation[_]]]
    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)

    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable ORDER BY a"
    tEnv.sqlUpdate(sql)
    env.execute()
{code}

Error:
{code}
java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime`
FROM `sourceTable` AS `sourceTable`
ORDER BY `a`
ORDER BY `a`

	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557)
	at org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104)
	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717)
	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
{code}



> INSERT INTO does not work with ORDER BY clause
> ----------------------------------------------
>
>                 Key: FLINK-10261
>                 URL: https://issues.apache.org/jira/browse/FLINK-10261
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Priority: Major
>
> It seems that INSERT INTO and ORDER BY do not work well together.
> An AssertionError is thrown and the ORDER BY clause is duplicated. I guess this is a Calcite issue.
> Example:
> {code}
> @Test
>   def testInsertIntoMemoryTable(): Unit = {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     MemoryTableSourceSinkUtil.clear()
>     val t = StreamTestData.getSmall3TupleDataStream(env)
>         .assignAscendingTimestamps(x => x._2)
>       .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
>     tEnv.registerTable("sourceTable", t)
>     val fieldNames = Array("d", "e", "f", "t")
>     val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
>       .asInstanceOf[Array[TypeInformation[_]]]
>     val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
>     tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
>     val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime FROM sourceTable ORDER BY a"
>     tEnv.sqlUpdate(sql)
>     env.execute()
> {code}
> Error:
> {code}
> java.lang.AssertionError: not a query: SELECT `sourceTable`.`a`, `sourceTable`.`b`, `sourceTable`.`c`, `sourceTable`.`rowtime`
> FROM `sourceTable` AS `sourceTable`
> ORDER BY `a`
> ORDER BY `a`
> 	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3069)
> 	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:557)
> 	at org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:104)
> 	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:717)
> 	at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
> 	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testInsertIntoMemoryTable(SqlITCase.scala:735)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)