You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by I PVP <ip...@hotmail.com> on 2016/03/30 04:10:28 UTC

storm-kafka KafkaSpout always consuming the entire Topic

Hi everyone,

What needs to be done in order for a Topology to let the KafkaSpout/Zookeeper know  that messages were successfully consumed/processed and should not be sent to that Topology again ?

Is it something to be controlled  automatically or manually  within the Bolt?
or
Is it set on the SpoutConfig ?

I am new to Storm.

The topologies I built so far are all  performing the work expected , but are always and continuously consuming all messages  there were ever sent to the Topic .

On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are:  SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the TopologyName>

The Bolt implements IRichBolt and the only code in it is some "saveToDb" work within the execute(Tuple tuple)method.
MyTopology.java - main(String[] args) method:
-----
.....
BrokerHosts hosts = new ZkHosts("<ip>:<port>");
SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/" +<topic name>,<TopologyName>);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
builder.setBolt(<bolt name>, new MyBolt()).shuffleGrouping("kafkaspout_" + <topic name>);
Config conf = new Config();
conf.setDebug(true);
StormSubmitter.submitTopology(<topology name>, conf, builder.createTopology());
.....
——

Thanks

--
IPVP


Re: storm-kafka KafkaSpout always consuming the entire Topic

Posted by Aurelien Violette <au...@webgroup-limited.com>.
Hi,

First, depends on your use case. Do you want to have processing : at 
least once, at most once or exactly once.

This is mostly dependent on the way you anchor and ack tuples. For 
"exactly once" use case, this is a bit more trickier. See documentation :
http://storm.apache.org/releases/0.10.0/Guaranteeing-message-processing.html

As for KafkaSpout, it starts back at the last offset validated. Thing 
is, if you accept more than 1 tuple pending in your topology, kafkaSpout 
will consume as many tuple as possible until the topology has the number 
of tuples in flight.

If you allow 10 tuples in flight, then it consumes the 10 tuples, but if 
you manage to ack tuple 1 and 2, but tuple 3 fails, tuple 4 and 5 ack 
and the others are still in flight when you receive the failure from 
tuple 3, kafka spout will go backward up to tuple 3 to replay from tuple 
3. It will so replay 3, 4, 5, ... leading to potential duplicates.

You can use the formalism of "exactly once" to handle the issue or very 
strictly limit the number of tuples in flight to 1. You'll process 
tuples one at a time though, so far from full speed I guess.


Le 30/03/2016 04:10, I PVP a écrit :
> Hi everyone,
>
> What needs to be done in order for a Topology to let the 
> KafkaSpout/Zookeeper know  that messages were successfully 
> consumed/processed and should not be sent to that Topology again ?
>
> Is it something to be controlled  automatically or manually  within 
> the Bolt?
> or
> Is it set on the SpoutConfig ?
>
> I am new to Storm.
>
> The topologies I built so far are all  performing the work expected , 
> but are always and continuously consuming all messages  there were 
> ever sent to the Topic .
>
> On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are: 
>  SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the 
> TopologyName>
>
> The Bolt implements IRichBolt and the only code in it is some 
> "saveToDb" work within the execute(Tuple tuple)method.
> MyTopology.java - main(String[] args) method:
> -----
> .....
> BrokerHosts hosts = new ZkHosts("<ip>:<port>");
> SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/" 
> +<topic name>,<TopologyName>);
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
> builder.setBolt(<bolt name>, new 
> MyBolt()).shuffleGrouping("kafkaspout_" + <topic name>);
> Config conf = new Config();
> conf.setDebug(true);
> StormSubmitter.submitTopology(<topology name>, conf, 
> builder.createTopology());
> .....
> ——
>
> Thanks
>
> -- 
> IPVP
>

-- 
BR,
Aurelien Violette


Re: storm-kafka KafkaSpout always consuming the entire Topic

Posted by John Reilly <jr...@inconspicuous.org>.
It sounds like your bolt is not acking the tuples.

On Tue, Mar 29, 2016 at 7:10 PM I PVP <ip...@hotmail.com> wrote:

> Hi everyone,
>
> What needs to be done in order for a Topology to let the
> KafkaSpout/Zookeeper know  that messages were successfully
> consumed/processed and should not be sent to that Topology again ?
>
> Is it something to be controlled  automatically or manually  within the
> Bolt?
> or
> Is it set on the SpoutConfig ?
>
> I am new to Storm.
>
> The topologies I built so far are all  performing the work expected , but
> are always and continuously consuming all messages  there were ever sent to
> the Topic .
>
> On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are:
>  SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the TopologyName>
>
> The Bolt implements IRichBolt and the only code in it is some "saveToDb"
> work within the execute(Tuple tuple)method.
> MyTopology.java - main(String[] args) method:
> -----
> .....
> BrokerHosts hosts = new ZkHosts("<ip>:<port>");
> SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/" +<topic
> name>,<TopologyName>);
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
> builder.setBolt(<bolt name>, new MyBolt()).shuffleGrouping("kafkaspout_" +
> <topic name>);
> Config conf = new Config();
> conf.setDebug(true);
> StormSubmitter.submitTopology(<topology name>, conf,
> builder.createTopology());
> .....
> ——
>
> Thanks
>
> --
> IPVP
>
>