You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Phil Derome (JIRA)" <ji...@apache.org> on 2016/06/22 11:01:04 UTC

[jira] [Created] (KAFKA-3891) A KTable with Long values with a numeric filter apparently may retain null values

Phil Derome created KAFKA-3891:
----------------------------------

             Summary: A KTable with Long values with a numeric filter apparently may retain null values
                 Key: KAFKA-3891
                 URL: https://issues.apache.org/jira/browse/KAFKA-3891
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 0.10.0.0
            Reporter: Phil Derome
            Assignee: Guozhang Wang
            Priority: Minor


See Confluent's UserRegionLambdaExample for full detail. Not sure if this qualifies as a bug as I am new to community, but to me it looks like a bug (resolved KAFKA-739 and KAFKA-2026 also pertain to undesirable nulls and they were deemed Major Bugs).

The first filter on KTable for count below should filter correctly for null since null does not satisfy predicate count >= 2.

Variable regionCounts apparently contain some null values despite the filter on count given the second filter that takes place. It's quite confusing. Why would we want to publish these null values on any topic given the filter's intent should be quite clear?

  // Aggregate the user counts of by region
    KTable<String, Long> regionCounts = userRegions
        // Count by region
        // We do not need to specify any explict serdes because the key and value types do not change
        .groupBy((userId, region) -> KeyValue.pair(region, region))
        .count("CountsByRegion")
        // discard any regions with only 1 user
        .filter((regionName, count) -> count >= 2);

    // Note: The following operations would NOT be needed for the actual users-per-region
    // computation, which would normally stop at the filter() above.  We use the operations
    // below only to "massage" the output data so it is easier to inspect on the console via
    // kafka-console-consumer.
    //
    KStream<String, Long> regionCountsForConsole = regionCounts
        // get rid of windows (and the underlying KTable) by transforming the KTable to a KStream
        .toStream()
        // sanitize the output by removing null record values (again, we do this only so that the
        // output is easier to read via kafka-console-consumer combined with LongDeserializer
        // because LongDeserializer fails on null values, and even though we could configure
        // kafka-console-consumer to skip messages on error the output still wouldn't look pretty)
        .filter((regionName, count) -> count != null);

    // write to the result topic, we need to override the value serializer to for type long
    regionCountsForConsole.to(stringSerde, longSerde, "LargeRegions");




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)