You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2017/08/11 08:52:00 UTC
[jira] [Created] (FLINK-7426) Table API does not support null
values in keys
Timo Walther created FLINK-7426:
-----------------------------------
Summary: Table API does not support null values in keys
Key: FLINK-7426
URL: https://issues.apache.org/jira/browse/FLINK-7426
Project: Flink
Issue Type: Bug
Components: Table API & SQL
Reporter: Timo Walther
The Table API uses {{keyBy}} internally, however, the generated {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not able to serialize null values. This causes issues during checkpointing or when using the RocksDB state backend. We need to replace all {{keyBy}} calls with a custom {{RowKeySelector}}.
{code}
class AggregateITCase extends StreamingWithStateTestBase {
private val queryConfig = new StreamQueryConfig()
queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
@Test
def testDistinct(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend)
val tEnv = TableEnvironment.getTableEnvironment(env)
StreamITCase.clear
val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
.select('b, Null(Types.LONG)).distinct()
val results = t.toRetractStream[Row](queryConfig)
results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
env.execute()
val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", "5,null", "6,null")
assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)