You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Andy Coates (Jira)" <ji...@apache.org> on 2020/08/10 17:45:00 UTC

[jira] [Updated] (KAFKA-10077) Filter downstream of state-store results in spurious tombstones

     [ https://issues.apache.org/jira/browse/KAFKA-10077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andy Coates updated KAFKA-10077:
--------------------------------
    Summary: Filter downstream of state-store results in spurious tombstones  (was: Filter downstream of state-store results in suprious tombstones)

> Filter downstream of state-store results in spurious tombstones
> ---------------------------------------------------------------
>
>                 Key: KAFKA-10077
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10077
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Andy Coates
>            Assignee: Andy Coates
>            Priority: Major
>
> Adding a `filter` call downstream of anything that has a state store, e.g. a table source, results in spurious tombstones being emitted from the topology for any key where a new entry doesn't match the filter, _even when no previous value existed for the row_.
> To put this another way: a filer downstream of a state-store will output a tombstone on an INSERT the doesn't match the filter, when it should only output a tombstone on an UPDATE.
>  
> This code shows the problem:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> builder
>  .table("table", Materialized.with(Serdes.Long(), Serdes.Long()))
>  .filter((k, v) -> v % 2 == 0)
>  .toStream()
>  .to("bob");
> final Topology topology = builder.build();
> final Properties props = new Properties();
> props.put("application.id", "fred");
> props.put("bootstrap.servers", "who cares");
> final TopologyTestDriver driver = new TopologyTestDriver(topology, props);
> final TestInputTopic<Long, Long> input = driver
>  .createInputTopic("table", Serdes.Long().serializer(), Serdes.Long().serializer());
> input.pipeInput(1L, 2L);
> input.pipeInput(1L, 1L);
> input.pipeInput(2L, 1L);
> final TestOutputTopic<Long, Long> output = driver
>  .createOutputTopic("bob", Serdes.Long().deserializer(), Serdes.Long().deserializer());
> final List<KeyValue<Long, Long>> keyValues = output.readKeyValuesToList();
> // keyValues contains:
> // 1 -> 1
> // 1 -> null <-- correct tombstone: deletes previous row.
> // 2 -> null <-- spurious tombstone: no previous row. 
> {code}
>  
> These spurious tombstones can cause a LOT of noise when, for example, the filter is looking for a specific key.  In such a situation, _every input record that does not have that key results in a tombstone!_ meaning there are many more tombstones than useful data.
>  I believe the fix is to turn on {{KTableImpl::enableSendingOldValues}} for any filter that is downstream of a statestore 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)