You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by sujitha chinnu <ch...@gmail.com> on 2016/03/22 12:48:23 UTC

How to change default kafka SpoutConfig class

hai.,

         I am getting the message stream of 3MB from kafka topic but the
default value is 1MB. Now I have changed the kafka properties from 1MB to
3MB by adding the below lines in kafa consumer.properties and
server.properties file.
   fetch.message.max.bytes=2048576 ( consumer.properties )
   filemessage.max.bytes=2048576 ( server.properties )
   replica.fetch.max.bytes=2048576 ( server.properties )Now after adding
the above lines in Kafka, 3MB message data is going into  kafka data logs.
But STORM is unable to process that 3MB data and it is able to read only
default size i.e.,1MB data. So how to change those configurations inorder
to process/read the 3MB data. Here is my topology class.        String
argument = args[0];
        Config conf = new Config();
        conf.put(JDBC_CONF, map);
        conf.setDebug(true);
        conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        //set the number of workers
        conf.setNumWorkers(3);

        TopologyBuilder builder = new TopologyBuilder();        //Setup
Kafka spout
        BrokerHosts hosts = new ZkHosts("localhost:2181");
        String topic = "year1234";
        String zkRoot = "";
        String consumerGroupId = "group1";
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot,
consumerGroupId);

            spoutConfig.scheme = new SchemeAsMultiScheme(new
StringScheme());        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
        builder.setSpout("KafkaSpout", kafkaSpout,1);
builder.setBolt("user_details",
new Parserspout(),1).shuffleGrouping("KafkaSpout");
builder.setBolt("bolts_user",
new bolts_user(cp),1).shuffleGrouping("user_details");

Re: How to change default kafka SpoutConfig class

Posted by "sy.pan" <sh...@gmail.com>.
Hi,  SpoutConfig extends KafkaConfig which has public field fetchSizeBytes , fetchMaxWait and so on,

you can directly set value in code


> 在 2016年3月22日,19:48,sujitha chinnu <ch...@gmail.com> 写道:
> 
> hai.,
> 
>          I am getting the message stream of 3MB from kafka topic but the default value is 1MB. Now I have changed the kafka properties from 1MB to 3MB by adding the below lines in kafa consumer.properties and server.properties file.       
>    fetch.message.max.bytes=2048576 ( consumer.properties )
>    filemessage.max.bytes=2048576 ( server.properties )
>    replica.fetch.max.bytes=2048576 ( server.properties )Now after adding the above lines in Kafka, 3MB message data is going into  kafka data logs. But STORM is unable to process that 3MB data and it is able to read only default size i.e.,1MB data. So how to change those configurations inorder to process/read the 3MB data. Here is my topology class.        String argument = args[0];
>         Config conf = new Config();
>         conf.put(JDBC_CONF, map);
>         conf.setDebug(true);
>         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
>         //set the number of workers
>         conf.setNumWorkers(3);
>         
>         TopologyBuilder builder = new TopologyBuilder();        //Setup Kafka spout
>         BrokerHosts hosts = new ZkHosts("localhost:2181");
>         String topic = "year1234"; 
>         String zkRoot = "";
>         String consumerGroupId = "group1";
>         SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
>         
>             spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>         builder.setSpout("KafkaSpout", kafkaSpout,1);        builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");        builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");