You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Filipa Moura <fi...@gmail.com> on 2014/10/20 20:47:18 UTC

KafkaSpout not receiving all the messages

Hi,
I am reading data from Kafka using the KafkaSpout. When I was running a
trident topology, this seemed to be working fine, receiving hundreds of
messages per minute. However, ever since I changed my code to use “vanilla”
storm, my spout only receives around 40 messages per minute (which is a lot
less messages than Kafka is sending).

Code before (with Trident):

 BrokerHosts brokerHosts = new ZkHosts("kafka1.xxx.com:2181");
 TridentKafkaConfig kafkaSpoutConfig = new
TridentKafkaConfig(brokerHosts, "Budget");

// additional settings
kafkaSpoutConfig.socketTimeoutMs = 1000000;
OpaqueTridentKafkaSpout kafkaSpout = new
OpaqueTridentKafkaSpout(kafkaSpoutConfig);

TridentState curveballEvents =
                        topology.newStream("spout", kafkaSpout)

.each(kafkaSpout.getOutputFields(), new Mx3AvroDecoder(),
                                                        new
Fields("data")) [etc]

Code after (without trident):

BrokerHosts brokerHosts = new ZkHosts("kafka1.xxx.com:2181");

SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, //List of Kafka brokers
                        “Budget"),      // Kafka topic to read from
                        "/BudgetConsumer", // Root path in Zookeeper
for the spout to store consumer offsets
                        "lowLatencyBudgetConsumer");  // ID for
storing consumer offsets in Zookeeper

// additional settings
spoutConfig.socketTimeoutMs = 1000000;

topologyBuilder.setSpout(KAFKA_SPOUT, new KafkaSpout(spoutConfig), 2);

topologyBuilder.setBolt(DECODER_BOLT, new
DecoderBolt()).shuffleGrouping(KAFKA_SPOUT);

[etc]

Kafka has 2 partitions, so I’m using parallelism=2.

Is there anything I’m doing terribly wrong here? Can’t find a reason for
this to be happening..

Thank you very much for your help,
Filipa