You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by pravin kumar <pk...@gmail.com> on 2018/03/06 14:07:01 UTC
Producing more number of Records than expected
I have run wikifeed example. i have three topics:
wikifeedInputtopicDemo2-10 partitions
wikifeedOutputtopicDemo2-10 partitions
sumoutputeventopicDemo2-5 partitions
i have produced 100000 records.but in the
inputTopic(wikifeedInputtopicDemo2) it receives more than 100000
records.
can someone explain how this happens??
[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1
wikifeedInputtopicDemo2:8:13400
wikifeedInputtopicDemo2:2:13401
wikifeedInputtopicDemo2:5:13400
wikifeedInputtopicDemo2:4:13400
wikifeedInputtopicDemo2:7:13399
wikifeedInputtopicDemo2:1:13399
wikifeedInputtopicDemo2:9:13400
wikifeedInputtopicDemo2:3:13400
wikifeedInputtopicDemo2:6:13400
wikifeedInputtopicDemo2:0:13400
here is my processorTopology code:
//------------------------
public static KafkaStreams getWikifeed(){
Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
//properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder= new StreamsBuilder();
KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT);
KTable<String,Long> kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(Materialized.as(COUNT_STORE));
kTable.toStream().to(WIKIFEED_OUTPUT,
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);
return streams;
}
--------->
My driver code is in the attachment file.