You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by John Reilly <jr...@inconspicuous.org> on 2016/04/04 19:32:55 UTC

Re: storm-kafka KafkaSpout always consuming the entire Topic

It sounds like your bolt is not acking the tuples.

On Tue, Mar 29, 2016 at 7:10 PM I PVP <ip...@hotmail.com> wrote:

> Hi everyone,
>
> What needs to be done in order for a Topology to let the
> KafkaSpout/Zookeeper know  that messages were successfully
> consumed/processed and should not be sent to that Topology again ?
>
> Is it something to be controlled  automatically or manually  within the
> Bolt?
> or
> Is it set on the SpoutConfig ?
>
> I am new to Storm.
>
> The topologies I built so far are all  performing the work expected , but
> are always and continuously consuming all messages  there were ever sent to
> the Topic .
>
> On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are:
>  SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the TopologyName>
>
> The Bolt implements IRichBolt and the only code in it is some "saveToDb"
> work within the execute(Tuple tuple)method.
> MyTopology.java - main(String[] args) method:
> -----
> .....
> BrokerHosts hosts = new ZkHosts("<ip>:<port>");
> SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/" +<topic
> name>,<TopologyName>);
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
> builder.setBolt(<bolt name>, new MyBolt()).shuffleGrouping("kafkaspout_" +
> <topic name>);
> Config conf = new Config();
> conf.setDebug(true);
> StormSubmitter.submitTopology(<topology name>, conf,
> builder.createTopology());
> .....
> ——
>
> Thanks
>
> --
> IPVP
>
>