You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Navin Ipe <na...@searchlighthealth.com> on 2016/04/19 06:32:59 UTC

How does one distribute database iteration across workers?

I've seen this:
http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
but it doesn't explain how workers coordinate with each other, so
requesting a bit of clarity.

I'm considering a situation where I have 2 million rows in MySQL or MongoDB.

1. I want to use a Spout to read the first 1000 rows and send the processed
output to a Bolt. This happens in Worker1.
2. I want a different instance of the same Spout class to read the next
1000 rows in parallel with the working of the Spout of 1, then send the
processed output to an instance of the same Bolt used in 1. This happens in
Worker2.
3. Same as 1 and 2, but it happens in Worker 3.
4. I might setup 10 workers like this.
5. When all the Bolts in the workers are finished, they send their outputs
to a single Bolt in Worker 11.
6. The Bolt in Worker 11 writes the processed value to a new MySQL table.

*My confusion here is in how to make the database iterations happen batch
by batch, parallelly*. Obviously the database connection would have to be
made in some static class outside the workers, but if workers are started
with just "conf.setNumWorkers(2);", then how do I tell the workers to
iterate different rows of the database? Assuming that the workers are
running in different machines.

-- 
Regards,
Navin

Re: How does one distribute database iteration across workers?

Posted by anshu shukla <an...@gmail.com>.
Hey ,

One way how I handle the similar problem - say if  only 1 worker slot is
there on 1 VM then based on hostname/host ip  I  will force to fetch rows
from the database .Another choice but with diff setup is using hdfs in
place of   MySQL.

eg.

if(InetAddress.getLocalHost().getHostName().compareTo("anshuStormSCsup1")==0)
   msgId= (long) (1*Math.pow(10,12)+r.nextInt(10));




On Tue, Apr 19, 2016 at 10:02 AM, Navin Ipe <navin.ipe@searchlighthealth.com
> wrote:

> I've seen this:
> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
> but it doesn't explain how workers coordinate with each other, so
> requesting a bit of clarity.
>
> I'm considering a situation where I have 2 million rows in MySQL or
> MongoDB.
>
> 1. I want to use a Spout to read the first 1000 rows and send the
> processed output to a Bolt. This happens in Worker1.
> 2. I want a different instance of the same Spout class to read the next
> 1000 rows in parallel with the working of the Spout of 1, then send the
> processed output to an instance of the same Bolt used in 1. This happens in
> Worker2.
> 3. Same as 1 and 2, but it happens in Worker 3.
> 4. I might setup 10 workers like this.
> 5. When all the Bolts in the workers are finished, they send their outputs
> to a single Bolt in Worker 11.
> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table.
>
> *My confusion here is in how to make the database iterations happen batch
> by batch, parallelly*. Obviously the database connection would have to be
> made in some static class outside the workers, but if workers are started
> with just "conf.setNumWorkers(2);", then how do I tell the workers to
> iterate different rows of the database? Assuming that the workers are
> running in different machines.
>
> --
> Regards,
> Navin
>



-- 
Thanks & Regards,
Anshu Shukla

Re: How does one distribute database iteration across workers?

Posted by Alexander T <mi...@gmail.com>.
Hi Navin,

I'm not sure if this scenario is a perfect fit for Storm since you want
precice control of colocation. But If I understand your problem correctly
the following could be a viable approach:

1. Establish a total order of spout instances by utilizing Zookeeper. Your
spout instances will now have ids 0,1,2,3,4 etc.

2. Partition the keyspace of your db table using the spout instance id. So
if you have 5 instances instance 0 gets 0-1000 followed by 5000-6000 etc.

3. Emit from the spout with a field for the sequence number (0-1000 etc)
and one for the partition id.

4. Do a field grouping on the sequence number from the spout to your bolt.

You have now pinned each partition to  a  bolt instance and you can buffer
until the batch is complete. If you want one partition per bolt instance,
set bolt parallelism=spout parallelism.

Regards
Alexander
I've seen this:
http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
but it doesn't explain how workers coordinate with each other, so
requesting a bit of clarity.

I'm considering a situation where I have 2 million rows in MySQL or MongoDB.

1. I want to use a Spout to read the first 1000 rows and send the processed
output to a Bolt. This happens in Worker1.
2. I want a different instance of the same Spout class to read the next
1000 rows in parallel with the working of the Spout of 1, then send the
processed output to an instance of the same Bolt used in 1. This happens in
Worker2.
3. Same as 1 and 2, but it happens in Worker 3.
4. I might setup 10 workers like this.
5. When all the Bolts in the workers are finished, they send their outputs
to a single Bolt in Worker 11.
6. The Bolt in Worker 11 writes the processed value to a new MySQL table.

*My confusion here is in how to make the database iterations happen batch
by batch, parallelly*. Obviously the database connection would have to be
made in some static class outside the workers, but if workers are started
with just "conf.setNumWorkers(2);", then how do I tell the workers to
iterate different rows of the database? Assuming that the workers are
running in different machines.

-- 
Regards,
Navin

Re: How does one distribute database iteration across workers?

Posted by Jason Kusar <ja...@kusar.net>.
Navin,  my Zookeeper solution requires you to initiate your own connection
to zookeeper.  Storm includes zookeeper, but it doesn't expose it in any
pretty way to the topologies.  In my case, I just used the zookeeper client
library to initiate my own connection and store the information under my
own folder in zookeeper.

http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#_introduction


On Wed, Apr 20, 2016 at 7:33 AM Navin Ipe <na...@searchlighthealth.com>
wrote:

> In this case the values coming from the bolts can all be put() or updated
> into a single large hashmap in the Bolt in Worker11. So no need of
> aggregation.
> If there is no standard way in Storm for spouts and bolts to notify each
> other that there is nothing more for them to process, then I guess I'll
> just send tuples with "null" in them so that the Bolt in Worker11 will know
> the processing is over.
>
> I just hope what y'all said about tuples being automatically passed
> between workers, actually works without any problems :-)
> Thanks a lot for all your help!
>
> On Wed, Apr 20, 2016 at 12:06 PM, Jungtaek Lim <ka...@gmail.com> wrote:
>
>> Navin,
>>
>> I think this two lines are not cleared so I may have misunderstand.
>>
>> 5. When all the Bolts in the workers are finished, they send their
>> outputs to a single Bolt in Worker 11.
>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table.
>>
>> If you don't need to aggregate (I mean join) the results from Bolt in
>> Worker1~10 to Bolt in Worker11, no need to use Trident or Windowing.
>>
>> 2016년 4월 20일 (수) 오후 3:28, Navin Ipe <na...@searchlighthealth.com>님이
>> 작성:
>>
>>> @Jungtaek: This person (
>>> http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology)
>>> claims that Storm would automatically manage the flow of data between
>>> spouts and blots on different workers. Can anyone confirm this? If this is
>>> the case, I won't have to bother using Trident.
>>>
>>> On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe <
>>> navin.ipe@searchlighthealth.com> wrote:
>>>
>>>> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral
>>>> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
>>>> searching for the wrong thing)
>>>>
>>>> @Jungtaek: Will explore component tasks. Meanwhile, I had considered
>>>> Trident, but didn't go ahead because it was not clear how I could implement
>>>> multiple spouts in Trident, where each spout would iterate a certain number
>>>> of rows of a database. Any idea how that could happen.
>>>>
>>>> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> There's other idea without relying on Zookeeper : use ordinal of task
>>>>> id between same components (spout)
>>>>>
>>>>> Task id is issued across all tasks including system tasks so you can't
>>>>> assume spout tasks are having task id sequentially, but whatever you can do
>>>>> the trick - check "ordinal" of this spout task's id around same spouts.
>>>>> Please refer GeneralTopologyContext.getComponentTasks(String
>>>>> componentId).
>>>>>
>>>>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
>>>>> easy to aggregate the results of Bolt2 from Bolt3.
>>>>> You should consider windowing by processed time or Trident or maintain
>>>>> your own buffers.
>>>>>
>>>>> Hope this helps.
>>>>>
>>>>> Thanks,
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <ja...@kusar.net>님이 작성:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I've done a similar thing before with the exception that I was
>>>>>> reading from Cassandra.  The concept is the same though.  Assuming you know
>>>>>> that you have 10,000 records and you want each spout to read 1,000 of them,
>>>>>> then you would launch 10 instances of the spouts.  The first thing they do
>>>>>> during init is to connect to zookeeper and create an ephemeral node (
>>>>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
>>>>>> starting with one called '0'.  If 0 already exists, you'll get an exception
>>>>>> which means you try to create '1' and so on until you successfully create a
>>>>>> node.  That tells you which batch of records that instance of the spout is
>>>>>> responsible for.  I.e., if you successfully created '3', then this spout
>>>>>> needs to set its offset to 3,000.
>>>>>>
>>>>>> The reason for using ephemeral nodes is that they are automatically
>>>>>> deleted if the zookeeper client disconnects.  That way if a spout crashes,
>>>>>> once Storm relaunches the spout, it will be able to re-claim that token and
>>>>>> resume work on that batch.  You'll obviously need to have some way to keep
>>>>>> track of which records you've already processed, but that's going to be
>>>>>> specific to your implementation.
>>>>>>
>>>>>> Hope that helps!
>>>>>> Jason
>>>>>>
>>>>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>>>>>> navin.ipe@searchlighthealth.com> wrote:
>>>>>>
>>>>>>> Thanks guys.
>>>>>>> I didn't understand "*...spout instances by utilizing Zookeper.*".
>>>>>>> How does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a
>>>>>>> Spout?
>>>>>>>
>>>>>>> As of now I've set
>>>>>>> config.setNumWorkers(2);
>>>>>>> and
>>>>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>>>>>>
>>>>>>> I'm able to get spoutID in open() using this.spoutId =
>>>>>>> context.getThisTaskId();
>>>>>>> Strangely, my spoutID always begins with 3 instead of 0.
>>>>>>>
>>>>>>> By partitionID I understand that's the fieldGrouping's id.
>>>>>>>
>>>>>>> Even if I do all this, will the spout's tasks actually be
>>>>>>> distributed across multiple workers? Won't I have to create separate spouts?
>>>>>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>>>>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>>>>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>>>>>> and so on?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <
>>>>>>> mittspamkonto@gmail.com> wrote:
>>>>>>>
>>>>>>>> Coreection - group on partition id
>>>>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <
>>>>>>>> navin.ipe@searchlighthealth.com> wrote:
>>>>>>>>
>>>>>>>>> I've seen this:
>>>>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>>>>>>> but it doesn't explain how workers coordinate with each other, so
>>>>>>>>> requesting a bit of clarity.
>>>>>>>>>
>>>>>>>>> I'm considering a situation where I have 2 million rows in MySQL
>>>>>>>>> or MongoDB.
>>>>>>>>>
>>>>>>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>>>>>>> processed output to a Bolt. This happens in Worker1.
>>>>>>>>> 2. I want a different instance of the same Spout class to read the
>>>>>>>>> next 1000 rows in parallel with the working of the Spout of 1, then send
>>>>>>>>> the processed output to an instance of the same Bolt used in 1. This
>>>>>>>>> happens in Worker2.
>>>>>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>>>>>> 4. I might setup 10 workers like this.
>>>>>>>>> 5. When all the Bolts in the workers are finished, they send their
>>>>>>>>> outputs to a single Bolt in Worker 11.
>>>>>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>>>>>>> table.
>>>>>>>>>
>>>>>>>>> *My confusion here is in how to make the database iterations
>>>>>>>>> happen batch by batch, parallelly*. Obviously the database
>>>>>>>>> connection would have to be made in some static class outside the workers,
>>>>>>>>> but if workers are started with just "conf.setNumWorkers(2);",
>>>>>>>>> then how do I tell the workers to iterate different rows of the database?
>>>>>>>>> Assuming that the workers are running in different machines.
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Regards,
>>>>>>>>> Navin
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Regards,
>>>>>>> Navin
>>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Navin
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>
>
> --
> Regards,
> Navin
>

Re: How does one distribute database iteration across workers?

Posted by Navin Ipe <na...@searchlighthealth.com>.
In this case the values coming from the bolts can all be put() or updated
into a single large hashmap in the Bolt in Worker11. So no need of
aggregation.
If there is no standard way in Storm for spouts and bolts to notify each
other that there is nothing more for them to process, then I guess I'll
just send tuples with "null" in them so that the Bolt in Worker11 will know
the processing is over.

I just hope what y'all said about tuples being automatically passed between
workers, actually works without any problems :-)
Thanks a lot for all your help!

On Wed, Apr 20, 2016 at 12:06 PM, Jungtaek Lim <ka...@gmail.com> wrote:

> Navin,
>
> I think this two lines are not cleared so I may have misunderstand.
>
> 5. When all the Bolts in the workers are finished, they send their outputs
> to a single Bolt in Worker 11.
> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table.
>
> If you don't need to aggregate (I mean join) the results from Bolt in
> Worker1~10 to Bolt in Worker11, no need to use Trident or Windowing.
>
> 2016년 4월 20일 (수) 오후 3:28, Navin Ipe <na...@searchlighthealth.com>님이
> 작성:
>
>> @Jungtaek: This person (
>> http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology)
>> claims that Storm would automatically manage the flow of data between
>> spouts and blots on different workers. Can anyone confirm this? If this is
>> the case, I won't have to bother using Trident.
>>
>> On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe <
>> navin.ipe@searchlighthealth.com> wrote:
>>
>>> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral
>>> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
>>> searching for the wrong thing)
>>>
>>> @Jungtaek: Will explore component tasks. Meanwhile, I had considered
>>> Trident, but didn't go ahead because it was not clear how I could implement
>>> multiple spouts in Trident, where each spout would iterate a certain number
>>> of rows of a database. Any idea how that could happen.
>>>
>>> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <ka...@gmail.com> wrote:
>>>
>>>> There's other idea without relying on Zookeeper : use ordinal of task
>>>> id between same components (spout)
>>>>
>>>> Task id is issued across all tasks including system tasks so you can't
>>>> assume spout tasks are having task id sequentially, but whatever you can do
>>>> the trick - check "ordinal" of this spout task's id around same spouts.
>>>> Please refer GeneralTopologyContext.getComponentTasks(String
>>>> componentId).
>>>>
>>>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
>>>> easy to aggregate the results of Bolt2 from Bolt3.
>>>> You should consider windowing by processed time or Trident or maintain
>>>> your own buffers.
>>>>
>>>> Hope this helps.
>>>>
>>>> Thanks,
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <ja...@kusar.net>님이 작성:
>>>>
>>>>> Hi,
>>>>>
>>>>> I've done a similar thing before with the exception that I was reading
>>>>> from Cassandra.  The concept is the same though.  Assuming you know that
>>>>> you have 10,000 records and you want each spout to read 1,000 of them, then
>>>>> you would launch 10 instances of the spouts.  The first thing they do
>>>>> during init is to connect to zookeeper and create an ephemeral node (
>>>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
>>>>> starting with one called '0'.  If 0 already exists, you'll get an exception
>>>>> which means you try to create '1' and so on until you successfully create a
>>>>> node.  That tells you which batch of records that instance of the spout is
>>>>> responsible for.  I.e., if you successfully created '3', then this spout
>>>>> needs to set its offset to 3,000.
>>>>>
>>>>> The reason for using ephemeral nodes is that they are automatically
>>>>> deleted if the zookeeper client disconnects.  That way if a spout crashes,
>>>>> once Storm relaunches the spout, it will be able to re-claim that token and
>>>>> resume work on that batch.  You'll obviously need to have some way to keep
>>>>> track of which records you've already processed, but that's going to be
>>>>> specific to your implementation.
>>>>>
>>>>> Hope that helps!
>>>>> Jason
>>>>>
>>>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>>>>> navin.ipe@searchlighthealth.com> wrote:
>>>>>
>>>>>> Thanks guys.
>>>>>> I didn't understand "*...spout instances by utilizing Zookeper.*".
>>>>>> How does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a
>>>>>> Spout?
>>>>>>
>>>>>> As of now I've set
>>>>>> config.setNumWorkers(2);
>>>>>> and
>>>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>>>>>
>>>>>> I'm able to get spoutID in open() using this.spoutId =
>>>>>> context.getThisTaskId();
>>>>>> Strangely, my spoutID always begins with 3 instead of 0.
>>>>>>
>>>>>> By partitionID I understand that's the fieldGrouping's id.
>>>>>>
>>>>>> Even if I do all this, will the spout's tasks actually be distributed
>>>>>> across multiple workers? Won't I have to create separate spouts?
>>>>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>>>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>>>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>>>>> and so on?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <
>>>>>> mittspamkonto@gmail.com> wrote:
>>>>>>
>>>>>>> Coreection - group on partition id
>>>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <
>>>>>>> navin.ipe@searchlighthealth.com> wrote:
>>>>>>>
>>>>>>>> I've seen this:
>>>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>>>>>> but it doesn't explain how workers coordinate with each other, so
>>>>>>>> requesting a bit of clarity.
>>>>>>>>
>>>>>>>> I'm considering a situation where I have 2 million rows in MySQL or
>>>>>>>> MongoDB.
>>>>>>>>
>>>>>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>>>>>> processed output to a Bolt. This happens in Worker1.
>>>>>>>> 2. I want a different instance of the same Spout class to read the
>>>>>>>> next 1000 rows in parallel with the working of the Spout of 1, then send
>>>>>>>> the processed output to an instance of the same Bolt used in 1. This
>>>>>>>> happens in Worker2.
>>>>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>>>>> 4. I might setup 10 workers like this.
>>>>>>>> 5. When all the Bolts in the workers are finished, they send their
>>>>>>>> outputs to a single Bolt in Worker 11.
>>>>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>>>>>> table.
>>>>>>>>
>>>>>>>> *My confusion here is in how to make the database iterations happen
>>>>>>>> batch by batch, parallelly*. Obviously the database connection
>>>>>>>> would have to be made in some static class outside the workers, but if
>>>>>>>> workers are started with just "conf.setNumWorkers(2);", then how
>>>>>>>> do I tell the workers to iterate different rows of the database? Assuming
>>>>>>>> that the workers are running in different machines.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Regards,
>>>>>>>> Navin
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Regards,
>>>>>> Navin
>>>>>>
>>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>


-- 
Regards,
Navin

Re: How does one distribute database iteration across workers?

Posted by Jungtaek Lim <ka...@gmail.com>.
Navin,

I think this two lines are not cleared so I may have misunderstand.

5. When all the Bolts in the workers are finished, they send their outputs
to a single Bolt in Worker 11.
6. The Bolt in Worker 11 writes the processed value to a new MySQL table.

If you don't need to aggregate (I mean join) the results from Bolt in
Worker1~10 to Bolt in Worker11, no need to use Trident or Windowing.

2016년 4월 20일 (수) 오후 3:28, Navin Ipe <na...@searchlighthealth.com>님이 작성:

> @Jungtaek: This person (
> http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology)
> claims that Storm would automatically manage the flow of data between
> spouts and blots on different workers. Can anyone confirm this? If this is
> the case, I won't have to bother using Trident.
>
> On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe <
> navin.ipe@searchlighthealth.com> wrote:
>
>> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral
>> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
>> searching for the wrong thing)
>>
>> @Jungtaek: Will explore component tasks. Meanwhile, I had considered
>> Trident, but didn't go ahead because it was not clear how I could implement
>> multiple spouts in Trident, where each spout would iterate a certain number
>> of rows of a database. Any idea how that could happen.
>>
>> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <ka...@gmail.com> wrote:
>>
>>> There's other idea without relying on Zookeeper : use ordinal of task id
>>> between same components (spout)
>>>
>>> Task id is issued across all tasks including system tasks so you can't
>>> assume spout tasks are having task id sequentially, but whatever you can do
>>> the trick - check "ordinal" of this spout task's id around same spouts.
>>> Please refer GeneralTopologyContext.getComponentTasks(String
>>> componentId).
>>>
>>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
>>> easy to aggregate the results of Bolt2 from Bolt3.
>>> You should consider windowing by processed time or Trident or maintain
>>> your own buffers.
>>>
>>> Hope this helps.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <ja...@kusar.net>님이 작성:
>>>
>>>> Hi,
>>>>
>>>> I've done a similar thing before with the exception that I was reading
>>>> from Cassandra.  The concept is the same though.  Assuming you know that
>>>> you have 10,000 records and you want each spout to read 1,000 of them, then
>>>> you would launch 10 instances of the spouts.  The first thing they do
>>>> during init is to connect to zookeeper and create an ephemeral node (
>>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
>>>> starting with one called '0'.  If 0 already exists, you'll get an exception
>>>> which means you try to create '1' and so on until you successfully create a
>>>> node.  That tells you which batch of records that instance of the spout is
>>>> responsible for.  I.e., if you successfully created '3', then this spout
>>>> needs to set its offset to 3,000.
>>>>
>>>> The reason for using ephemeral nodes is that they are automatically
>>>> deleted if the zookeeper client disconnects.  That way if a spout crashes,
>>>> once Storm relaunches the spout, it will be able to re-claim that token and
>>>> resume work on that batch.  You'll obviously need to have some way to keep
>>>> track of which records you've already processed, but that's going to be
>>>> specific to your implementation.
>>>>
>>>> Hope that helps!
>>>> Jason
>>>>
>>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>>>> navin.ipe@searchlighthealth.com> wrote:
>>>>
>>>>> Thanks guys.
>>>>> I didn't understand "*...spout instances by utilizing Zookeper.*".
>>>>> How does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a
>>>>> Spout?
>>>>>
>>>>> As of now I've set
>>>>> config.setNumWorkers(2);
>>>>> and
>>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>>>>
>>>>> I'm able to get spoutID in open() using this.spoutId =
>>>>> context.getThisTaskId();
>>>>> Strangely, my spoutID always begins with 3 instead of 0.
>>>>>
>>>>> By partitionID I understand that's the fieldGrouping's id.
>>>>>
>>>>> Even if I do all this, will the spout's tasks actually be distributed
>>>>> across multiple workers? Won't I have to create separate spouts?
>>>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>>>> and so on?
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mittspamkonto@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Coreection - group on partition id
>>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <na...@searchlighthealth.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've seen this:
>>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>>>>> but it doesn't explain how workers coordinate with each other, so
>>>>>>> requesting a bit of clarity.
>>>>>>>
>>>>>>> I'm considering a situation where I have 2 million rows in MySQL or
>>>>>>> MongoDB.
>>>>>>>
>>>>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>>>>> processed output to a Bolt. This happens in Worker1.
>>>>>>> 2. I want a different instance of the same Spout class to read the
>>>>>>> next 1000 rows in parallel with the working of the Spout of 1, then send
>>>>>>> the processed output to an instance of the same Bolt used in 1. This
>>>>>>> happens in Worker2.
>>>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>>>> 4. I might setup 10 workers like this.
>>>>>>> 5. When all the Bolts in the workers are finished, they send their
>>>>>>> outputs to a single Bolt in Worker 11.
>>>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>>>>> table.
>>>>>>>
>>>>>>> *My confusion here is in how to make the database iterations happen
>>>>>>> batch by batch, parallelly*. Obviously the database connection
>>>>>>> would have to be made in some static class outside the workers, but if
>>>>>>> workers are started with just "conf.setNumWorkers(2);", then how do
>>>>>>> I tell the workers to iterate different rows of the database? Assuming that
>>>>>>> the workers are running in different machines.
>>>>>>>
>>>>>>> --
>>>>>>> Regards,
>>>>>>> Navin
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Navin
>>>>>
>>>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>
>
>
> --
> Regards,
> Navin
>

Re: How does one distribute database iteration across workers?

Posted by Navin Ipe <na...@searchlighthealth.com>.
@Jungtaek: This person (
http://stackoverflow.com/questions/21480773/can-twitter-storm-worker-get-only-parts-of-the-topology)
claims that Storm would automatically manage the flow of data between
spouts and blots on different workers. Can anyone confirm this? If this is
the case, I won't have to bother using Trident.

On Wed, Apr 20, 2016 at 8:59 AM, Navin Ipe <na...@searchlighthealth.com>
wrote:

> @Jason: Thanks. Tried searching for Storm code which starts Ephemeral
> nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
> searching for the wrong thing)
>
> @Jungtaek: Will explore component tasks. Meanwhile, I had considered
> Trident, but didn't go ahead because it was not clear how I could implement
> multiple spouts in Trident, where each spout would iterate a certain number
> of rows of a database. Any idea how that could happen.
>
> On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <ka...@gmail.com> wrote:
>
>> There's other idea without relying on Zookeeper : use ordinal of task id
>> between same components (spout)
>>
>> Task id is issued across all tasks including system tasks so you can't
>> assume spout tasks are having task id sequentially, but whatever you can do
>> the trick - check "ordinal" of this spout task's id around same spouts.
>> Please refer GeneralTopologyContext.getComponentTasks(String componentId).
>>
>> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
>> easy to aggregate the results of Bolt2 from Bolt3.
>> You should consider windowing by processed time or Trident or maintain
>> your own buffers.
>>
>> Hope this helps.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <ja...@kusar.net>님이 작성:
>>
>>> Hi,
>>>
>>> I've done a similar thing before with the exception that I was reading
>>> from Cassandra.  The concept is the same though.  Assuming you know that
>>> you have 10,000 records and you want each spout to read 1,000 of them, then
>>> you would launch 10 instances of the spouts.  The first thing they do
>>> during init is to connect to zookeeper and create an ephemeral node (
>>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
>>> starting with one called '0'.  If 0 already exists, you'll get an exception
>>> which means you try to create '1' and so on until you successfully create a
>>> node.  That tells you which batch of records that instance of the spout is
>>> responsible for.  I.e., if you successfully created '3', then this spout
>>> needs to set its offset to 3,000.
>>>
>>> The reason for using ephemeral nodes is that they are automatically
>>> deleted if the zookeeper client disconnects.  That way if a spout crashes,
>>> once Storm relaunches the spout, it will be able to re-claim that token and
>>> resume work on that batch.  You'll obviously need to have some way to keep
>>> track of which records you've already processed, but that's going to be
>>> specific to your implementation.
>>>
>>> Hope that helps!
>>> Jason
>>>
>>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>>> navin.ipe@searchlighthealth.com> wrote:
>>>
>>>> Thanks guys.
>>>> I didn't understand "*...spout instances by utilizing Zookeper.*". How
>>>> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout?
>>>>
>>>> As of now I've set
>>>> config.setNumWorkers(2);
>>>> and
>>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>>>
>>>> I'm able to get spoutID in open() using this.spoutId =
>>>> context.getThisTaskId();
>>>> Strangely, my spoutID always begins with 3 instead of 0.
>>>>
>>>> By partitionID I understand that's the fieldGrouping's id.
>>>>
>>>> Even if I do all this, will the spout's tasks actually be distributed
>>>> across multiple workers? Won't I have to create separate spouts?
>>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>>> and so on?
>>>>
>>>>
>>>>
>>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Coreection - group on partition id
>>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <na...@searchlighthealth.com>
>>>>> wrote:
>>>>>
>>>>>> I've seen this:
>>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>>>> but it doesn't explain how workers coordinate with each other, so
>>>>>> requesting a bit of clarity.
>>>>>>
>>>>>> I'm considering a situation where I have 2 million rows in MySQL or
>>>>>> MongoDB.
>>>>>>
>>>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>>>> processed output to a Bolt. This happens in Worker1.
>>>>>> 2. I want a different instance of the same Spout class to read the
>>>>>> next 1000 rows in parallel with the working of the Spout of 1, then send
>>>>>> the processed output to an instance of the same Bolt used in 1. This
>>>>>> happens in Worker2.
>>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>>> 4. I might setup 10 workers like this.
>>>>>> 5. When all the Bolts in the workers are finished, they send their
>>>>>> outputs to a single Bolt in Worker 11.
>>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>>>> table.
>>>>>>
>>>>>> *My confusion here is in how to make the database iterations happen
>>>>>> batch by batch, parallelly*. Obviously the database connection would
>>>>>> have to be made in some static class outside the workers, but if workers
>>>>>> are started with just "conf.setNumWorkers(2);", then how do I tell
>>>>>> the workers to iterate different rows of the database? Assuming that the
>>>>>> workers are running in different machines.
>>>>>>
>>>>>> --
>>>>>> Regards,
>>>>>> Navin
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Navin
>>>>
>>>
>
>
> --
> Regards,
> Navin
>



-- 
Regards,
Navin

Re: How does one distribute database iteration across workers?

Posted by Navin Ipe <na...@searchlighthealth.com>.
@Jason: Thanks. Tried searching for Storm code which starts Ephemeral
nodes, but couldn't find it. (am new to Hadoop and Storm, so perhaps I was
searching for the wrong thing)

@Jungtaek: Will explore component tasks. Meanwhile, I had considered
Trident, but didn't go ahead because it was not clear how I could implement
multiple spouts in Trident, where each spout would iterate a certain number
of rows of a database. Any idea how that could happen.

On Tue, Apr 19, 2016 at 6:39 PM, Jungtaek Lim <ka...@gmail.com> wrote:

> There's other idea without relying on Zookeeper : use ordinal of task id
> between same components (spout)
>
> Task id is issued across all tasks including system tasks so you can't
> assume spout tasks are having task id sequentially, but whatever you can do
> the trick - check "ordinal" of this spout task's id around same spouts.
> Please refer GeneralTopologyContext.getComponentTasks(String componentId).
>
> Btw, Spout1 -> Bolt2 can be done with various ways but it would not be
> easy to aggregate the results of Bolt2 from Bolt3.
> You should consider windowing by processed time or Trident or maintain
> your own buffers.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2016년 4월 19일 (화) 오후 10:02, Jason Kusar <ja...@kusar.net>님이 작성:
>
>> Hi,
>>
>> I've done a similar thing before with the exception that I was reading
>> from Cassandra.  The concept is the same though.  Assuming you know that
>> you have 10,000 records and you want each spout to read 1,000 of them, then
>> you would launch 10 instances of the spouts.  The first thing they do
>> during init is to connect to zookeeper and create an ephemeral node (
>> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
>> starting with one called '0'.  If 0 already exists, you'll get an exception
>> which means you try to create '1' and so on until you successfully create a
>> node.  That tells you which batch of records that instance of the spout is
>> responsible for.  I.e., if you successfully created '3', then this spout
>> needs to set its offset to 3,000.
>>
>> The reason for using ephemeral nodes is that they are automatically
>> deleted if the zookeeper client disconnects.  That way if a spout crashes,
>> once Storm relaunches the spout, it will be able to re-claim that token and
>> resume work on that batch.  You'll obviously need to have some way to keep
>> track of which records you've already processed, but that's going to be
>> specific to your implementation.
>>
>> Hope that helps!
>> Jason
>>
>> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <
>> navin.ipe@searchlighthealth.com> wrote:
>>
>>> Thanks guys.
>>> I didn't understand "*...spout instances by utilizing Zookeper.*". How
>>> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout?
>>>
>>> As of now I've set
>>> config.setNumWorkers(2);
>>> and
>>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>>
>>> I'm able to get spoutID in open() using this.spoutId =
>>> context.getThisTaskId();
>>> Strangely, my spoutID always begins with 3 instead of 0.
>>>
>>> By partitionID I understand that's the fieldGrouping's id.
>>>
>>> Even if I do all this, will the spout's tasks actually be distributed
>>> across multiple workers? Won't I have to create separate spouts?
>>> builder.setSpout("mongoSpout1", new MongoSpout());
>>> builder.setSpout("mongoSpout2", new MongoSpout());
>>> builder.setSpout("mongoSpout3", new MongoSpout());
>>> and so on?
>>>
>>>
>>>
>>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mi...@gmail.com>
>>> wrote:
>>>
>>>> Coreection - group on partition id
>>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <na...@searchlighthealth.com>
>>>> wrote:
>>>>
>>>>> I've seen this:
>>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>>> but it doesn't explain how workers coordinate with each other, so
>>>>> requesting a bit of clarity.
>>>>>
>>>>> I'm considering a situation where I have 2 million rows in MySQL or
>>>>> MongoDB.
>>>>>
>>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>>> processed output to a Bolt. This happens in Worker1.
>>>>> 2. I want a different instance of the same Spout class to read the
>>>>> next 1000 rows in parallel with the working of the Spout of 1, then send
>>>>> the processed output to an instance of the same Bolt used in 1. This
>>>>> happens in Worker2.
>>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>>> 4. I might setup 10 workers like this.
>>>>> 5. When all the Bolts in the workers are finished, they send their
>>>>> outputs to a single Bolt in Worker 11.
>>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>>> table.
>>>>>
>>>>> *My confusion here is in how to make the database iterations happen
>>>>> batch by batch, parallelly*. Obviously the database connection would
>>>>> have to be made in some static class outside the workers, but if workers
>>>>> are started with just "conf.setNumWorkers(2);", then how do I tell
>>>>> the workers to iterate different rows of the database? Assuming that the
>>>>> workers are running in different machines.
>>>>>
>>>>> --
>>>>> Regards,
>>>>> Navin
>>>>>
>>>>
>>>
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>


-- 
Regards,
Navin

Re: How does one distribute database iteration across workers?

Posted by Jungtaek Lim <ka...@gmail.com>.
There's other idea without relying on Zookeeper : use ordinal of task id
between same components (spout)

Task id is issued across all tasks including system tasks so you can't
assume spout tasks are having task id sequentially, but whatever you can do
the trick - check "ordinal" of this spout task's id around same spouts.
Please refer GeneralTopologyContext.getComponentTasks(String componentId).

Btw, Spout1 -> Bolt2 can be done with various ways but it would not be easy
to aggregate the results of Bolt2 from Bolt3.
You should consider windowing by processed time or Trident or maintain your
own buffers.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

2016년 4월 19일 (화) 오후 10:02, Jason Kusar <ja...@kusar.net>님이 작성:

> Hi,
>
> I've done a similar thing before with the exception that I was reading
> from Cassandra.  The concept is the same though.  Assuming you know that
> you have 10,000 records and you want each spout to read 1,000 of them, then
> you would launch 10 instances of the spouts.  The first thing they do
> during init is to connect to zookeeper and create an ephemeral node (
> http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
> starting with one called '0'.  If 0 already exists, you'll get an exception
> which means you try to create '1' and so on until you successfully create a
> node.  That tells you which batch of records that instance of the spout is
> responsible for.  I.e., if you successfully created '3', then this spout
> needs to set its offset to 3,000.
>
> The reason for using ephemeral nodes is that they are automatically
> deleted if the zookeeper client disconnects.  That way if a spout crashes,
> once Storm relaunches the spout, it will be able to re-claim that token and
> resume work on that batch.  You'll obviously need to have some way to keep
> track of which records you've already processed, but that's going to be
> specific to your implementation.
>
> Hope that helps!
> Jason
>
> On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <na...@searchlighthealth.com>
> wrote:
>
>> Thanks guys.
>> I didn't understand "*...spout instances by utilizing Zookeper.*". How
>> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout?
>>
>> As of now I've set
>> config.setNumWorkers(2);
>> and
>> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>>
>> I'm able to get spoutID in open() using this.spoutId =
>> context.getThisTaskId();
>> Strangely, my spoutID always begins with 3 instead of 0.
>>
>> By partitionID I understand that's the fieldGrouping's id.
>>
>> Even if I do all this, will the spout's tasks actually be distributed
>> across multiple workers? Won't I have to create separate spouts?
>> builder.setSpout("mongoSpout1", new MongoSpout());
>> builder.setSpout("mongoSpout2", new MongoSpout());
>> builder.setSpout("mongoSpout3", new MongoSpout());
>> and so on?
>>
>>
>>
>> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mi...@gmail.com>
>> wrote:
>>
>>> Coreection - group on partition id
>>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <na...@searchlighthealth.com>
>>> wrote:
>>>
>>>> I've seen this:
>>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>>> but it doesn't explain how workers coordinate with each other, so
>>>> requesting a bit of clarity.
>>>>
>>>> I'm considering a situation where I have 2 million rows in MySQL or
>>>> MongoDB.
>>>>
>>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>>> processed output to a Bolt. This happens in Worker1.
>>>> 2. I want a different instance of the same Spout class to read the next
>>>> 1000 rows in parallel with the working of the Spout of 1, then send the
>>>> processed output to an instance of the same Bolt used in 1. This happens in
>>>> Worker2.
>>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>>> 4. I might setup 10 workers like this.
>>>> 5. When all the Bolts in the workers are finished, they send their
>>>> outputs to a single Bolt in Worker 11.
>>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL
>>>> table.
>>>>
>>>> *My confusion here is in how to make the database iterations happen
>>>> batch by batch, parallelly*. Obviously the database connection would
>>>> have to be made in some static class outside the workers, but if workers
>>>> are started with just "conf.setNumWorkers(2);", then how do I tell the
>>>> workers to iterate different rows of the database? Assuming that the
>>>> workers are running in different machines.
>>>>
>>>> --
>>>> Regards,
>>>> Navin
>>>>
>>>
>>
>>
>> --
>> Regards,
>> Navin
>>
>

Re: How does one distribute database iteration across workers?

Posted by Jason Kusar <ja...@kusar.net>.
Hi,

I've done a similar thing before with the exception that I was reading from
Cassandra.  The concept is the same though.  Assuming you know that you
have 10,000 records and you want each spout to read 1,000 of them, then you
would launch 10 instances of the spouts.  The first thing they do during
init is to connect to zookeeper and create an ephemeral node (
http://zookeeper.apache.org/doc/r3.2.1/zookeeperProgrammers.html#Ephemeral+Nodes)
starting with one called '0'.  If 0 already exists, you'll get an exception
which means you try to create '1' and so on until you successfully create a
node.  That tells you which batch of records that instance of the spout is
responsible for.  I.e., if you successfully created '3', then this spout
needs to set its offset to 3,000.

The reason for using ephemeral nodes is that they are automatically deleted
if the zookeeper client disconnects.  That way if a spout crashes, once
Storm relaunches the spout, it will be able to re-claim that token and
resume work on that batch.  You'll obviously need to have some way to keep
track of which records you've already processed, but that's going to be
specific to your implementation.

Hope that helps!
Jason

On Tue, Apr 19, 2016 at 4:43 AM Navin Ipe <na...@searchlighthealth.com>
wrote:

> Thanks guys.
> I didn't understand "*...spout instances by utilizing Zookeper.*". How
> does one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout?
>
> As of now I've set
> config.setNumWorkers(2);
> and
> builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);
>
> I'm able to get spoutID in open() using this.spoutId =
> context.getThisTaskId();
> Strangely, my spoutID always begins with 3 instead of 0.
>
> By partitionID I understand that's the fieldGrouping's id.
>
> Even if I do all this, will the spout's tasks actually be distributed
> across multiple workers? Won't I have to create separate spouts?
> builder.setSpout("mongoSpout1", new MongoSpout());
> builder.setSpout("mongoSpout2", new MongoSpout());
> builder.setSpout("mongoSpout3", new MongoSpout());
> and so on?
>
>
>
> On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mi...@gmail.com>
> wrote:
>
>> Coreection - group on partition id
>> On Apr 19, 2016 6:33 AM, "Navin Ipe" <na...@searchlighthealth.com>
>> wrote:
>>
>>> I've seen this:
>>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>>> but it doesn't explain how workers coordinate with each other, so
>>> requesting a bit of clarity.
>>>
>>> I'm considering a situation where I have 2 million rows in MySQL or
>>> MongoDB.
>>>
>>> 1. I want to use a Spout to read the first 1000 rows and send the
>>> processed output to a Bolt. This happens in Worker1.
>>> 2. I want a different instance of the same Spout class to read the next
>>> 1000 rows in parallel with the working of the Spout of 1, then send the
>>> processed output to an instance of the same Bolt used in 1. This happens in
>>> Worker2.
>>> 3. Same as 1 and 2, but it happens in Worker 3.
>>> 4. I might setup 10 workers like this.
>>> 5. When all the Bolts in the workers are finished, they send their
>>> outputs to a single Bolt in Worker 11.
>>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table.
>>>
>>> *My confusion here is in how to make the database iterations happen
>>> batch by batch, parallelly*. Obviously the database connection would
>>> have to be made in some static class outside the workers, but if workers
>>> are started with just "conf.setNumWorkers(2);", then how do I tell the
>>> workers to iterate different rows of the database? Assuming that the
>>> workers are running in different machines.
>>>
>>> --
>>> Regards,
>>> Navin
>>>
>>
>
>
> --
> Regards,
> Navin
>

Re: How does one distribute database iteration across workers?

Posted by Navin Ipe <na...@searchlighthealth.com>.
Thanks guys.
I didn't understand "*...spout instances by utilizing Zookeper.*". How does
one utilize Zookeper? Is it the same as ".setNumTasks(10)" for a Spout?

As of now I've set
config.setNumWorkers(2);
and
builder.setSpout("mongoSpout", new MongoSpout()).setNumTasks(2);

I'm able to get spoutID in open() using this.spoutId =
context.getThisTaskId();
Strangely, my spoutID always begins with 3 instead of 0.

By partitionID I understand that's the fieldGrouping's id.

Even if I do all this, will the spout's tasks actually be distributed
across multiple workers? Won't I have to create separate spouts?
builder.setSpout("mongoSpout1", new MongoSpout());
builder.setSpout("mongoSpout2", new MongoSpout());
builder.setSpout("mongoSpout3", new MongoSpout());
and so on?



On Tue, Apr 19, 2016 at 11:51 AM, Alexander T <mi...@gmail.com>
wrote:

> Coreection - group on partition id
> On Apr 19, 2016 6:33 AM, "Navin Ipe" <na...@searchlighthealth.com>
> wrote:
>
>> I've seen this:
>> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
>> but it doesn't explain how workers coordinate with each other, so
>> requesting a bit of clarity.
>>
>> I'm considering a situation where I have 2 million rows in MySQL or
>> MongoDB.
>>
>> 1. I want to use a Spout to read the first 1000 rows and send the
>> processed output to a Bolt. This happens in Worker1.
>> 2. I want a different instance of the same Spout class to read the next
>> 1000 rows in parallel with the working of the Spout of 1, then send the
>> processed output to an instance of the same Bolt used in 1. This happens in
>> Worker2.
>> 3. Same as 1 and 2, but it happens in Worker 3.
>> 4. I might setup 10 workers like this.
>> 5. When all the Bolts in the workers are finished, they send their
>> outputs to a single Bolt in Worker 11.
>> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table.
>>
>> *My confusion here is in how to make the database iterations happen batch
>> by batch, parallelly*. Obviously the database connection would have to
>> be made in some static class outside the workers, but if workers are
>> started with just "conf.setNumWorkers(2);", then how do I tell the
>> workers to iterate different rows of the database? Assuming that the
>> workers are running in different machines.
>>
>> --
>> Regards,
>> Navin
>>
>


-- 
Regards,
Navin

Re: How does one distribute database iteration across workers?

Posted by Alexander T <mi...@gmail.com>.
Coreection - group on partition id
On Apr 19, 2016 6:33 AM, "Navin Ipe" <na...@searchlighthealth.com>
wrote:

> I've seen this:
> http://storm.apache.org/releases/0.10.0/Understanding-the-parallelism-of-a-Storm-topology.html
> but it doesn't explain how workers coordinate with each other, so
> requesting a bit of clarity.
>
> I'm considering a situation where I have 2 million rows in MySQL or
> MongoDB.
>
> 1. I want to use a Spout to read the first 1000 rows and send the
> processed output to a Bolt. This happens in Worker1.
> 2. I want a different instance of the same Spout class to read the next
> 1000 rows in parallel with the working of the Spout of 1, then send the
> processed output to an instance of the same Bolt used in 1. This happens in
> Worker2.
> 3. Same as 1 and 2, but it happens in Worker 3.
> 4. I might setup 10 workers like this.
> 5. When all the Bolts in the workers are finished, they send their outputs
> to a single Bolt in Worker 11.
> 6. The Bolt in Worker 11 writes the processed value to a new MySQL table.
>
> *My confusion here is in how to make the database iterations happen batch
> by batch, parallelly*. Obviously the database connection would have to be
> made in some static class outside the workers, but if workers are started
> with just "conf.setNumWorkers(2);", then how do I tell the workers to
> iterate different rows of the database? Assuming that the workers are
> running in different machines.
>
> --
> Regards,
> Navin
>