You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sai Dilip Reddy Kiralam <dk...@aadhya-analytics.com> on 2016/03/22 17:23:37 UTC

How Storm take topic data which is more than the default value of 1MB.

I'm Trying to get the fb pages data using graph api. The size each post is
more than 1MB where kafka default fetch.message is 1MB. I have changed the
kafka properties from 1MB to 3MB by adding the below lines in kafa
consumer.properties and server.properties file.

fetch.message.max.bytes=3048576 (consumer.properties)
file message.max.bytes=3048576 (server.properties)
replica.fetch.max.bytes=3048576 (server.properties )

Now after adding the above lines in Kafka, 3MB message data is going into
kafka data logs. But STORM is unable to process that data and it is able to
read only default size i.e.,1MB data.What Parameters I should add to storm
topology to read the 3MB data from kafka topic.Do i need to increase
buffer.size in storm?don't have a clear idea about it.

Here is my topology code.

 String argument = args[0];
    Config conf = new Config();
    conf.put(JDBC_CONF, map);
    conf.setDebug(true);
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    //set the number of workers
    conf.setNumWorkers(3);

    TopologyBuilder builder = new TopologyBuilder();
   //Setup Kafka spout
    BrokerHosts hosts = new ZkHosts("localhost:2181");
    String topic = "year1234";
    String zkRoot = "";
    String consumerGroupId = "group1";
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot,
consumerGroupId);

        spoutConfig.scheme = new SchemeAsMultiScheme(new
StringScheme());           KafkaSpout kafkaSpout = new
KafkaSpout(spoutConfig);
    builder.setSpout("KafkaSpout", kafkaSpout,1);
builder.setBolt("user_details", new
Parserspout(),1).shuffleGrouping("KafkaSpout");
builder.setBolt("bolts_user", new
bolts_user(cp),1).shuffleGrouping("user_details");

Thanks In Advance




*Best regards,*

*K.Sai Dilip Reddy.*

Re: How Storm take topic data which is more than the default value of 1MB.

Posted by Sai Dilip Reddy Kiralam <dk...@aadhya-analytics.com>.
Hi Sypan,

That works fine :) Thank you




*Best regards,*
*K.Sai Dilip Reddy.*

On Wed, Mar 23, 2016 at 2:27 PM, sy.pan <sh...@gmail.com> wrote:

> Hi,  SpoutConfig extends KafkaConfig which has public field fetchSizeBytes
> , fetchMaxWait and so on,
>
> you can directly set value in code
>
>
> 在 2016年3月23日,00:23,Sai Dilip Reddy Kiralam <dk...@aadhya-analytics.com>
> 写道:
>
> I'm Trying to get the fb pages data using graph api. The size each post is
> more than 1MB where kafka default fetch.message is 1MB. I have changed the
> kafka properties from 1MB to 3MB by adding the below lines in kafa
> consumer.properties and server.properties file.
>
> fetch.message.max.bytes=3048576 (consumer.properties)
> file message.max.bytes=3048576 (server.properties)
> replica.fetch.max.bytes=3048576 (server.properties )
>
> Now after adding the above lines in Kafka, 3MB message data is going into
> kafka data logs. But STORM is unable to process that data and it is able to
> read only default size i.e.,1MB data.What Parameters I should add to storm
> topology to read the 3MB data from kafka topic.Do i need to increase
> buffer.size in storm?don't have a clear idea about it.
>
> Here is my topology code.
>
>  String argument = args[0];
>     Config conf = new Config();
>     conf.put(JDBC_CONF, map);
>     conf.setDebug(true);
>     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
>     //set the number of workers
>     conf.setNumWorkers(3);
>
>     TopologyBuilder builder = new TopologyBuilder();
>    //Setup Kafka spout
>     BrokerHosts hosts = new ZkHosts("localhost:2181");
>     String topic = "year1234";
>     String zkRoot = "";
>     String consumerGroupId = "group1";
>     SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
>
>         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());           KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>     builder.setSpout("KafkaSpout", kafkaSpout,1);        builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");        builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");
>
> Thanks In Advance
>
>
>
>
> *Best regards,*
>
> *K.Sai Dilip Reddy.*
>
>
>

Re: How Storm take topic data which is more than the default value of 1MB.

Posted by "sy.pan" <sh...@gmail.com>.
Hi,  SpoutConfig extends KafkaConfig which has public field fetchSizeBytes , fetchMaxWait and so on,

you can directly set value in code


> 在 2016年3月23日,00:23,Sai Dilip Reddy Kiralam <dk...@aadhya-analytics.com> 写道:
> 
> I'm Trying to get the fb pages data using graph api. The size each post is more than 1MB where kafka default fetch.message is 1MB. I have changed the kafka properties from 1MB to 3MB by adding the below lines in kafa consumer.properties and server.properties file.
> fetch.message.max.bytes=3048576 (consumer.properties)
> file message.max.bytes=3048576 (server.properties)
> replica.fetch.max.bytes=3048576 (server.properties )
> Now after adding the above lines in Kafka, 3MB message data is going into kafka data logs. But STORM is unable to process that data and it is able to read only default size i.e.,1MB data.What Parameters I should add to storm topology to read the 3MB data from kafka topic.Do i need to increase buffer.size in storm?don't have a clear idea about it.
> 
> Here is my topology code.
> 
>  String argument = args[0];
>     Config conf = new Config();
>     conf.put(JDBC_CONF, map);
>     conf.setDebug(true);
>     conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
>     //set the number of workers
>     conf.setNumWorkers(3);
> 
>     TopologyBuilder builder = new TopologyBuilder();       
>    //Setup Kafka spout
>     BrokerHosts hosts = new ZkHosts("localhost:2181");
>     String topic = "year1234"; 
>     String zkRoot = "";
>     String consumerGroupId = "group1";
>     SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
> 
>         spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());           KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>     builder.setSpout("KafkaSpout", kafkaSpout,1);        builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");        builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");
> Thanks In Advance
> 
> 
> 
>  
> 
> Best regards,
> K.Sai Dilip Reddy.