You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by I PVP <ip...@hotmail.com> on 2016/03/30 04:10:28 UTC
storm-kafka KafkaSpout always consuming the entire Topic
Hi everyone,
What needs to be done in order for a Topology to let the KafkaSpout/Zookeeper know that messages were successfully consumed/processed and should not be sent to that Topology again ?
Is it something to be controlled automatically or manually within the Bolt?
or
Is it set on the SpoutConfig ?
I am new to Storm.
The topologies I built so far are all performing the work expected , but are always and continuously consuming all messages there were ever sent to the Topic .
On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are: SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the TopologyName>
The Bolt implements IRichBolt and the only code in it is some "saveToDb" work within the execute(Tuple tuple)method.
MyTopology.java - main(String[] args) method:
-----
.....
BrokerHosts hosts = new ZkHosts("<ip>:<port>");
SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/" +<topic name>,<TopologyName>);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
builder.setBolt(<bolt name>, new MyBolt()).shuffleGrouping("kafkaspout_" + <topic name>);
Config conf = new Config();
conf.setDebug(true);
StormSubmitter.submitTopology(<topology name>, conf, builder.createTopology());
.....
——
Thanks
--
IPVP
Re: storm-kafka KafkaSpout always consuming the entire Topic
Posted by Aurelien Violette <au...@webgroup-limited.com>.
Hi,
First, depends on your use case. Do you want to have processing : at
least once, at most once or exactly once.
This is mostly dependent on the way you anchor and ack tuples. For
"exactly once" use case, this is a bit more trickier. See documentation :
http://storm.apache.org/releases/0.10.0/Guaranteeing-message-processing.html
As for KafkaSpout, it starts back at the last offset validated. Thing
is, if you accept more than 1 tuple pending in your topology, kafkaSpout
will consume as many tuple as possible until the topology has the number
of tuples in flight.
If you allow 10 tuples in flight, then it consumes the 10 tuples, but if
you manage to ack tuple 1 and 2, but tuple 3 fails, tuple 4 and 5 ack
and the others are still in flight when you receive the failure from
tuple 3, kafka spout will go backward up to tuple 3 to replay from tuple
3. It will so replay 3, 4, 5, ... leading to potential duplicates.
You can use the formalism of "exactly once" to handle the issue or very
strictly limit the number of tuples in flight to 1. You'll process
tuples one at a time though, so far from full speed I guess.
Le 30/03/2016 04:10, I PVP a écrit :
> Hi everyone,
>
> What needs to be done in order for a Topology to let the
> KafkaSpout/Zookeeper know that messages were successfully
> consumed/processed and should not be sent to that Topology again ?
>
> Is it something to be controlled automatically or manually within
> the Bolt?
> or
> Is it set on the SpoutConfig ?
>
> I am new to Storm.
>
> The topologies I built so far are all performing the work expected ,
> but are always and continuously consuming all messages there were
> ever sent to the Topic .
>
> On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are:
> SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the
> TopologyName>
>
> The Bolt implements IRichBolt and the only code in it is some
> "saveToDb" work within the execute(Tuple tuple)method.
> MyTopology.java - main(String[] args) method:
> -----
> .....
> BrokerHosts hosts = new ZkHosts("<ip>:<port>");
> SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/"
> +<topic name>,<TopologyName>);
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
> builder.setBolt(<bolt name>, new
> MyBolt()).shuffleGrouping("kafkaspout_" + <topic name>);
> Config conf = new Config();
> conf.setDebug(true);
> StormSubmitter.submitTopology(<topology name>, conf,
> builder.createTopology());
> .....
> ——
>
> Thanks
>
> --
> IPVP
>
--
BR,
Aurelien Violette
Re: storm-kafka KafkaSpout always consuming the entire Topic
Posted by John Reilly <jr...@inconspicuous.org>.
It sounds like your bolt is not acking the tuples.
On Tue, Mar 29, 2016 at 7:10 PM I PVP <ip...@hotmail.com> wrote:
> Hi everyone,
>
> What needs to be done in order for a Topology to let the
> KafkaSpout/Zookeeper know that messages were successfully
> consumed/processed and should not be sent to that Topology again ?
>
> Is it something to be controlled automatically or manually within the
> Bolt?
> or
> Is it set on the SpoutConfig ?
>
> I am new to Storm.
>
> The topologies I built so far are all performing the work expected , but
> are always and continuously consuming all messages there were ever sent to
> the Topic .
>
> On each Topology the SpoutConfig.zkRoot and SpoutConfig.id are:
> SpoutConfig.zkRoot = / +<topic name> , SpoutConfig.id = <the TopologyName>
>
> The Bolt implements IRichBolt and the only code in it is some "saveToDb"
> work within the execute(Tuple tuple)method.
> MyTopology.java - main(String[] args) method:
> -----
> .....
> BrokerHosts hosts = new ZkHosts("<ip>:<port>");
> SpoutConfig spoutConfig = new SpoutConfig(hosts,<topic name>,"/" +<topic
> name>,<TopologyName>);
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("kafkaspout_" + <topic name>,kafkaSpout);
> builder.setBolt(<bolt name>, new MyBolt()).shuffleGrouping("kafkaspout_" +
> <topic name>);
> Config conf = new Config();
> conf.setDebug(true);
> StormSubmitter.submitTopology(<topology name>, conf,
> builder.createTopology());
> .....
> ——
>
> Thanks
>
> --
> IPVP
>
>