You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2017/08/11 08:52:00 UTC

[jira] [Created] (FLINK-7426) Table API does not support null values in keys

Timo Walther created FLINK-7426:
-----------------------------------

             Summary: Table API does not support null values in keys
                 Key: FLINK-7426
                 URL: https://issues.apache.org/jira/browse/FLINK-7426
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: Timo Walther


The Table API uses {{keyBy}} internally, however, the generated {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not able to serialize null values. This causes issues during checkpointing or when using the RocksDB state backend. We need to replace all {{keyBy}} calls with a custom {{RowKeySelector}}.

{code}
class AggregateITCase extends StreamingWithStateTestBase {
  private val queryConfig = new StreamQueryConfig()
  queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))


  @Test
  def testDistinct(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStateBackend(getStateBackend)
    val tEnv = TableEnvironment.getTableEnvironment(env)
    StreamITCase.clear

    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
      .select('b, Null(Types.LONG)).distinct()

    val results = t.toRetractStream[Row](queryConfig)
    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
    env.execute()

    val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", "5,null", "6,null")
    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
  }
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)