You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by pramod niralakeri <pr...@gmail.com> on 2016/09/19 04:55:23 UTC

Trident Storm kafka spout reading repeated messages

Hi Storm Team, i have deployed storm topology in cluster, working all fine,
only problem is that it reading repeated msg's.

following is my storm configuration, hoping for solution, Thanks in advance

Config conf = new Config();

BrokerHosts hosts = new ZkHosts("localhost:2181");
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topicName);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.forceFromStart = true;
kafkaConfig.fetchSizeBytes = 1024 * 1024;
kafkaConfig.bufferSizeBytes = 1024 * 1024;
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
OpaqueTridentKafkaSpout spout =  new OpaqueTridentKafkaSpout(kafkaConfig);
StormSubmitter.submitTopology(name, conf,buildTopology(spout));
.............
.............
.............
.............


/****STREAM****/
Stream stream = topology.newStream("spout", spout)
.each(new Fields("str"), new TupleParser(), new Fields("parse"))
.each(new Fields("parse"), new TupleFilter())
.each(new Fields("parse"), new TupleTransformation(), new Fields("trans"))
.each(new Fields("trans"), new DimensionUpdate(), new
Fields(finalColumnList));



Regards
pramod

Re: Trident Storm kafka spout reading repeated messages

Posted by Manu Zhang <ow...@gmail.com>.
Hi,

You set startOffsetTime to "earliest" so that it will always read from the
beginning of Kafka. You may try "latest".

Thanks,
Manu Zhang

pramod niralakeri <pr...@gmail.com>于2016年9月19日周一 下午1:07写道:

> Hi Storm Team, i have deployed storm topology in cluster, working all fine,
> only problem is that it reading repeated msg's.
>
> following is my storm configuration, hoping for solution, Thanks in advance
>
> Config conf = new Config();
>
> BrokerHosts hosts = new ZkHosts("localhost:2181");
> TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topicName);
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> kafkaConfig.forceFromStart = true;
> kafkaConfig.fetchSizeBytes = 1024 * 1024;
> kafkaConfig.bufferSizeBytes = 1024 * 1024;
> kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
> OpaqueTridentKafkaSpout spout =  new OpaqueTridentKafkaSpout(kafkaConfig);
> StormSubmitter.submitTopology(name, conf,buildTopology(spout));
> .............
> .............
> .............
> .............
>
>
> /****STREAM****/
> Stream stream = topology.newStream("spout", spout)
> .each(new Fields("str"), new TupleParser(), new Fields("parse"))
> .each(new Fields("parse"), new TupleFilter())
> .each(new Fields("parse"), new TupleTransformation(), new Fields("trans"))
> .each(new Fields("trans"), new DimensionUpdate(), new
> Fields(finalColumnList));
>
>
>
> Regards
> pramod
>