You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Chitra Raveendran <ch...@flutura.com> on 2014/03/07 09:39:19 UTC

Storm-Kafka without forced offset time parameter, not working !

I'm not able to run a normal storm-kafka topology without specifying
  forceStartOffsetTime parameter. Without this parameter, the topology
should start consuming from the last message's offset, right?

The kafka message is consumed as byte array. For this I just commented out
this line.
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

*Consuming from the last message is critical, as I don't want to lose out
on the data if some systems go down unexpectedly! (This is rare and may
never happen! Just being cautious :) )*

Here is a snippet of my code:

import storm.kafka.KafkaConfig.StaticHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;


public class MainTopology {
    public static void main(String[] args) throws Exception {

        List<String> hosts = new ArrayList<String>();
        hosts.add("172.16.18.68");
        hosts.add("172.16.18.69");

        SpoutConfig spoutConfig = new
SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID");
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", kafkaSpout, 2);
        builder.setBolt("parserBolt", new MessageParserBolt(),
2).shuffleGrouping("kafka-spout");

---------------








-- 

Regards,

*Chitra Raveendran*

Re: Storm-Kafka without forced offset time parameter, not working !

Posted by Lin Zhao <li...@groupon.com>.
The KafkaSpout writes commit offsets to zookeeper, by default the storm
cluster's zookeeper. You can find the path in
PartitionManager.committedPath(). Perhaps you can take a look of what's in
zookeeper and see if something's off.


On Fri, Mar 7, 2014 at 12:39 AM, Chitra Raveendran <
chitra.raveendran@flutura.com> wrote:

> I'm not able to run a normal storm-kafka topology without specifying
>   forceStartOffsetTime parameter. Without this parameter, the topology
> should start consuming from the last message's offset, right?
>
> The kafka message is consumed as byte array. For this I just commented out
> this line.
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>
> *Consuming from the last message is critical, as I don't want to lose out
> on the data if some systems go down unexpectedly! (This is rare and may
> never happen! Just being cautious :) )*
>
> Here is a snippet of my code:
>
> import storm.kafka.KafkaConfig.StaticHosts;
> import storm.kafka.KafkaSpout;
> import storm.kafka.SpoutConfig;
> import backtype.storm.Config;
> import backtype.storm.StormSubmitter;
> import backtype.storm.LocalCluster;
> import backtype.storm.topology.TopologyBuilder;
>
>
> public class MainTopology {
>     public static void main(String[] args) throws Exception {
>
>         List<String> hosts = new ArrayList<String>();
>         hosts.add("172.16.18.68");
>         hosts.add("172.16.18.69");
>
>         SpoutConfig spoutConfig = new
> SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID");
>         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("kafka-spout", kafkaSpout, 2);
>         builder.setBolt("parserBolt", new MessageParserBolt(),
> 2).shuffleGrouping("kafka-spout");
>
> ---------------
>
>
>
>
>
>
>
>
> --
>
> Regards,
>
> *Chitra Raveendran*
>
>
>


-- 
Lin Zhao

https://wiki.groupondev.com/Message_Bus
3101 Park Blvd, Palo Alto, CA 94306

Re: Storm-Kafka without forced offset time parameter, not working !

Posted by Kashyap Mhaisekar <ka...@gmail.com>.
HI Chitra,
Were you able to run the topology without getting a ClassNotFoundException
on IMetric? Can you please let me know what jars did you use?

Regards,
Kashyap


On Fri, Mar 7, 2014 at 2:39 AM, Chitra Raveendran <
chitra.raveendran@flutura.com> wrote:

> I'm not able to run a normal storm-kafka topology without specifying
>   forceStartOffsetTime parameter. Without this parameter, the topology
> should start consuming from the last message's offset, right?
>
> The kafka message is consumed as byte array. For this I just commented out
> this line.
> kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>
> *Consuming from the last message is critical, as I don't want to lose out
> on the data if some systems go down unexpectedly! (This is rare and may
> never happen! Just being cautious :) )*
>
> Here is a snippet of my code:
>
> import storm.kafka.KafkaConfig.StaticHosts;
> import storm.kafka.KafkaSpout;
> import storm.kafka.SpoutConfig;
> import backtype.storm.Config;
> import backtype.storm.StormSubmitter;
> import backtype.storm.LocalCluster;
> import backtype.storm.topology.TopologyBuilder;
>
>
> public class MainTopology {
>     public static void main(String[] args) throws Exception {
>
>         List<String> hosts = new ArrayList<String>();
>         hosts.add("172.16.18.68");
>         hosts.add("172.16.18.69");
>
>         SpoutConfig spoutConfig = new
> SpoutConfig(StaticHosts.fromHostString(hosts, 2), "topic", "/TOPIC", "ID");
>         KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
>         TopologyBuilder builder = new TopologyBuilder();
>         builder.setSpout("kafka-spout", kafkaSpout, 2);
>         builder.setBolt("parserBolt", new MessageParserBolt(),
> 2).shuffleGrouping("kafka-spout");
>
> ---------------
>
>
>
>
>
>
>
>
> --
>
> Regards,
>
> *Chitra Raveendran*
>
>
>