You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Chitra Raveendran <ch...@flutura.com> on 2014/03/07 09:39:19 UTC
Storm-Kafka without forced offset time parameter, not working !
I'm not able to run a normal storm-kafka topology without specifying
forceStartOffsetTime parameter. Without this parameter, the topology
should start consuming from the last message's offset, right?
The kafka message is consumed as byte array. For this I just commented out
this line.
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
*Consuming from the last message is critical, as I don't want to lose out
on the data if some systems go down unexpectedly! (This is rare and may
never happen! Just being cautious :) )*
Here is a snippet of my code:
import storm.kafka.KafkaConfig.StaticHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class MainTopology {
public static void main(String[] args) throws Exception {
List<String> hosts = new ArrayList<String>();
hosts.add("172.16.18.68");
hosts.add("172.16.18.69");
SpoutConfig spoutConfig = new
SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID");
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout, 2);
builder.setBolt("parserBolt", new MessageParserBolt(),
2).shuffleGrouping("kafka-spout");
---------------
--
Regards,
*Chitra Raveendran*
Re: Storm-Kafka without forced offset time parameter, not working !
Posted by Lin Zhao <li...@groupon.com>.
The KafkaSpout writes commit offsets to zookeeper, by default the storm
cluster's zookeeper. You can find the path in
PartitionManager.committedPath(). Perhaps you can take a look of what's in
zookeeper and see if something's off.
On Fri, Mar 7, 2014 at 12:39 AM, Chitra Raveendran <
chitra.raveendran@flutura.com> wrote:
> I'm not able to run a normal storm-kafka topology without specifying
> forceStartOffsetTime parameter. Without this parameter, the topology
> should start consuming from the last message's offset, right?
>
> The kafka message is consumed as byte array. For this I just commented out
> this line.
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>
> *Consuming from the last message is critical, as I don't want to lose out
> on the data if some systems go down unexpectedly! (This is rare and may
> never happen! Just being cautious :) )*
>
> Here is a snippet of my code:
>
> import storm.kafka.KafkaConfig.StaticHosts;
> import storm.kafka.KafkaSpout;
> import storm.kafka.SpoutConfig;
> import backtype.storm.Config;
> import backtype.storm.StormSubmitter;
> import backtype.storm.LocalCluster;
> import backtype.storm.topology.TopologyBuilder;
>
>
> public class MainTopology {
> public static void main(String[] args) throws Exception {
>
> List<String> hosts = new ArrayList<String>();
> hosts.add("172.16.18.68");
> hosts.add("172.16.18.69");
>
> SpoutConfig spoutConfig = new
> SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID");
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("kafka-spout", kafkaSpout, 2);
> builder.setBolt("parserBolt", new MessageParserBolt(),
> 2).shuffleGrouping("kafka-spout");
>
> ---------------
>
>
>
>
>
>
>
>
> --
>
> Regards,
>
> *Chitra Raveendran*
>
>
>
--
Lin Zhao
https://wiki.groupondev.com/Message_Bus
3101 Park Blvd, Palo Alto, CA 94306
Re: Storm-Kafka without forced offset time parameter, not working !
Posted by Kashyap Mhaisekar <ka...@gmail.com>.
HI Chitra,
Were you able to run the topology without getting a ClassNotFoundException
on IMetric? Can you please let me know what jars did you use?
Regards,
Kashyap
On Fri, Mar 7, 2014 at 2:39 AM, Chitra Raveendran <
chitra.raveendran@flutura.com> wrote:
> I'm not able to run a normal storm-kafka topology without specifying
> forceStartOffsetTime parameter. Without this parameter, the topology
> should start consuming from the last message's offset, right?
>
> The kafka message is consumed as byte array. For this I just commented out
> this line.
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>
> *Consuming from the last message is critical, as I don't want to lose out
> on the data if some systems go down unexpectedly! (This is rare and may
> never happen! Just being cautious :) )*
>
> Here is a snippet of my code:
>
> import storm.kafka.KafkaConfig.StaticHosts;
> import storm.kafka.KafkaSpout;
> import storm.kafka.SpoutConfig;
> import backtype.storm.Config;
> import backtype.storm.StormSubmitter;
> import backtype.storm.LocalCluster;
> import backtype.storm.topology.TopologyBuilder;
>
>
> public class MainTopology {
> public static void main(String[] args) throws Exception {
>
> List<String> hosts = new ArrayList<String>();
> hosts.add("172.16.18.68");
> hosts.add("172.16.18.69");
>
> SpoutConfig spoutConfig = new
> SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID");
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> TopologyBuilder builder = new TopologyBuilder();
> builder.setSpout("kafka-spout", kafkaSpout, 2);
> builder.setBolt("parserBolt", new MessageParserBolt(),
> 2).shuffleGrouping("kafka-spout");
>
> ---------------
>
>
>
>
>
>
>
>
> --
>
> Regards,
>
> *Chitra Raveendran*
>
>
>