You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by trung kien <ki...@gmail.com> on 2015/09/02 09:05:25 UTC

How to have multiple storm workers consume a kafka topic in parallel

Hi Storm Users,

I am new with Storm and using Trident for my applications.

My application needs to push large of message into Kafka (in Json format),
do some calculations and save the result in Redis.

It seems that storm always assign only 1 worker for consuming the Kafka
topic (even I have .parallelismhint(5) and my Storm cluster have 10 workers)
Is there any way to have more than one worker consume a Kafka queue in
parallel?


Here is my topology code:

    topology.newStream("msg",kafkaSpout)
    .shuffle()
    .each(new Fields("str"),new JsonDecode(), new
Fields("user_id","user_name"))
    .parallelismHint(5);

Could someone please help me on this? only one worker is causing high
latency in my application.

-- 
Thanks
Kien

RE: How to have multiple storm workers consume a kafka topic in parallel

Posted by "Ziemer, Tom" <to...@wirecard.com>.
Hi Kien,

I don’t see any immediate issue with the setup – some thoughts:

-          Do you see messages in all kafka partitions?

-          How large are the messages in kb?

-          Do you need exactly-once processing? If yes use Trident if not, use vanilla storm (http://stackoverflow.com/questions/15520993/storm-vs-trident-when-not-to-use-trident)

Since you do not specify the partitions explicitly but use ZK instead, the spout should be able to pick it up from there.

Regards,
Tom

From: trung kien [mailto:kientt86@gmail.com]
Sent: Donnerstag, 3. September 2015 09:07
To: user@storm.apache.org
Subject: Re: How to have multiple storm workers consume a kafka topic in parallel


Hi Tom,

Yes, i have my topic partitioned.
I created the topic with --partitions 10

Here is how i create my KafkaSpout:

BrokerHosts zk = ZkHosts("zkserver");
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(zk, "my_queue");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout= new OpaqueTridentKafkaSpout(spoutConf);

And im using this config like following:

TridentTopology topology = new TridentTopology();

topology.newStream("myStream",kafkaSpout).shuffle().each(new Fields("str"), new JsonDecode(), new Fields("user_id","action")).parallelismHint(10);

With this setting it only can handle arround 20k mesaages per sec.

However, i want a lot more ( ~ 100k per sec).

On the storm UI i only see 1 executors for the spout.

Is there any config i can turn for greater performance here?

Sent from my HTC

----- Reply message -----
From: "Ziemer, Tom" <to...@wirecard.com>>
To: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
Subject: How to have multiple storm workers consume a kafka topic in parallel
Date: Thu, Sep 3, 2015 12:59 PM

Hi,

is your kafka topic partitioned?

See: http://stackoverflow.com/questions/17205561/data-modeling-with-kafka-topics-and-partitions

How is KafkaSpout configured?

Regards,
Tom

From: trung kien [mailto:kientt86@gmail.com<ma...@gmail.com>]
Sent: Mittwoch, 2. September 2015 09:05
To: user@storm.apache.org<ma...@storm.apache.org>
Subject: How to have multiple storm workers consume a kafka topic in parallel

Hi Storm Users,

I am new with Storm and using Trident for my applications.

My application needs to push large of message into Kafka (in Json format), do some calculations and save the result in Redis.

It seems that storm always assign only 1 worker for consuming the Kafka topic (even I have .parallelismhint(5) and my Storm cluster have 10 workers)
Is there any way to have more than one worker consume a Kafka queue in parallel?

Here is my topology code:

    topology.newStream("msg",kafkaSpout)
    .shuffle()
    .each(new Fields("str"),new JsonDecode(), new Fields("user_id","user_name"))
    .parallelismHint(5);

Could someone please help me on this? only one worker is causing high latency in my application.

--
Thanks
Kien

Re: How to have multiple storm workers consume a kafka topic in parallel

Posted by trung kien <ki...@gmail.com>.
Hi Tom,

Yes, i have my topic partitioned.
I created the topic with --partitions 10

Here is how i create my KafkaSpout:

BrokerHosts zk = ZkHosts("zkserver");
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(zk, "my_queue");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout= new OpaqueTridentKafkaSpout(spoutConf);

And im using this config like following:

TridentTopology topology = new TridentTopology();

topology.newStream("myStream",kafkaSpout).shuffle().each(new Fields("str"),
new JsonDecode(), new Fields("user_id","action")).parallelismHint(10);

With this setting it only can handle arround 20k mesaages per sec.

However, i want a lot more ( ~ 100k per sec).

On the storm UI i only see 1 executors for the spout.

Is there any config i can turn for greater performance here?

Sent from my HTC

----- Reply message -----
From: "Ziemer, Tom" <to...@wirecard.com>
To: "user@storm.apache.org" <us...@storm.apache.org>
Subject: How to have multiple storm workers consume a kafka topic in
parallel
Date: Thu, Sep 3, 2015 12:59 PM

Hi,

is your kafka topic partitioned?

See:
http://stackoverflow.com/questions/17205561/data-modeling-with-kafka-topics-and-partitions

How is KafkaSpout configured?

Regards,
Tom

From: trung kien [mailto:kientt86@gmail.com]
Sent: Mittwoch, 2. September 2015 09:05
To: user@storm.apache.org
Subject: How to have multiple storm workers consume a kafka topic in
parallel

Hi Storm Users,

I am new with Storm and using Trident for my applications.

My application needs to push large of message into Kafka (in Json format),
do some calculations and save the result in Redis.

It seems that storm always assign only 1 worker for consuming the Kafka
topic (even I have .parallelismhint(5) and my Storm cluster have 10 workers)
Is there any way to have more than one worker consume a Kafka queue in
parallel?

Here is my topology code:

    topology.newStream("msg",kafkaSpout)
    .shuffle()
    .each(new Fields("str"),new JsonDecode(), new
Fields("user_id","user_name"))
    .parallelismHint(5);

Could someone please help me on this? only one worker is causing high
latency in my application.

--
Thanks
Kien

RE: How to have multiple storm workers consume a kafka topic in parallel

Posted by "Ziemer, Tom" <to...@wirecard.com>.
Hi,

is your kafka topic partitioned?

See: http://stackoverflow.com/questions/17205561/data-modeling-with-kafka-topics-and-partitions

How is KafkaSpout configured?

Regards,
Tom

From: trung kien [mailto:kientt86@gmail.com]
Sent: Mittwoch, 2. September 2015 09:05
To: user@storm.apache.org
Subject: How to have multiple storm workers consume a kafka topic in parallel

Hi Storm Users,

I am new with Storm and using Trident for my applications.

My application needs to push large of message into Kafka (in Json format), do some calculations and save the result in Redis.

It seems that storm always assign only 1 worker for consuming the Kafka topic (even I have .parallelismhint(5) and my Storm cluster have 10 workers)
Is there any way to have more than one worker consume a Kafka queue in parallel?


Here is my topology code:

    topology.newStream("msg",kafkaSpout)
    .shuffle()
    .each(new Fields("str"),new JsonDecode(), new Fields("user_id","user_name"))
    .parallelismHint(5);

Could someone please help me on this? only one worker is causing high latency in my application.

--
Thanks
Kien