You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by pravin kumar <pk...@gmail.com> on 2018/03/06 14:07:01 UTC

Producing more number of Records than expected

I have run wikifeed example. i have three topics:
wikifeedInputtopicDemo2-10 partitions
wikifeedOutputtopicDemo2-10 partitions
sumoutputeventopicDemo2-5 partitions

i have produced 100000 records.but in the
inputTopic(wikifeedInputtopicDemo2) it receives more than 100000
records.
can someone explain how this happens??

[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1
wikifeedInputtopicDemo2:8:13400
wikifeedInputtopicDemo2:2:13401
wikifeedInputtopicDemo2:5:13400
wikifeedInputtopicDemo2:4:13400
wikifeedInputtopicDemo2:7:13399
wikifeedInputtopicDemo2:1:13399
wikifeedInputtopicDemo2:9:13400
wikifeedInputtopicDemo2:3:13400
wikifeedInputtopicDemo2:6:13400
wikifeedInputtopicDemo2:0:13400

here is my processorTopology code:

//------------------------

public static KafkaStreams getWikifeed(){

        Properties properties=new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
        properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
        //properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        StreamsBuilder builder= new StreamsBuilder();
        KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT);
        KTable<String,Long> kTable=inputStream
                .filter((key, value) -> value.isNew())
                .map(((key, value) -> KeyValue.pair(value.getName(),value)))
                .groupByKey()
                .count(Materialized.as(COUNT_STORE));
        kTable.toStream().to(WIKIFEED_OUTPUT,
Produced.with(Serdes.String(), Serdes.Long()));
        KafkaStreams streams= new KafkaStreams(builder.build(),properties);

        return streams;
    }

--------->


My driver code is in the attachment file.