You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Filipa Moura <fi...@gmail.com> on 2014/10/20 20:47:18 UTC
KafkaSpout not receiving all the messages
Hi,
I am reading data from Kafka using the KafkaSpout. When I was running a
trident topology, this seemed to be working fine, receiving hundreds of
messages per minute. However, ever since I changed my code to use “vanilla”
storm, my spout only receives around 40 messages per minute (which is a lot
less messages than Kafka is sending).
Code before (with Trident):
BrokerHosts brokerHosts = new ZkHosts("kafka1.xxx.com:2181");
TridentKafkaConfig kafkaSpoutConfig = new
TridentKafkaConfig(brokerHosts, "Budget");
// additional settings
kafkaSpoutConfig.socketTimeoutMs = 1000000;
OpaqueTridentKafkaSpout kafkaSpout = new
OpaqueTridentKafkaSpout(kafkaSpoutConfig);
TridentState curveballEvents =
topology.newStream("spout", kafkaSpout)
.each(kafkaSpout.getOutputFields(), new Mx3AvroDecoder(),
new
Fields("data")) [etc]
Code after (without trident):
BrokerHosts brokerHosts = new ZkHosts("kafka1.xxx.com:2181");
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, //List of Kafka brokers
“Budget"), // Kafka topic to read from
"/BudgetConsumer", // Root path in Zookeeper
for the spout to store consumer offsets
"lowLatencyBudgetConsumer"); // ID for
storing consumer offsets in Zookeeper
// additional settings
spoutConfig.socketTimeoutMs = 1000000;
topologyBuilder.setSpout(KAFKA_SPOUT, new KafkaSpout(spoutConfig), 2);
topologyBuilder.setBolt(DECODER_BOLT, new
DecoderBolt()).shuffleGrouping(KAFKA_SPOUT);
[etc]
Kafka has 2 partitions, so I’m using parallelism=2.
Is there anything I’m doing terribly wrong here? Can’t find a reason for
this to be happening..
Thank you very much for your help,
Filipa