You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Tim Head <th...@cern.ch> on 2015/06/23 09:55:46 UTC

Storm for LHCb at CERN

Hello Storm devs,

I work for one of the big four experiments at CERN. We are currently
thinking about "the future of our real time data processing". I tweeted
about it <https://twitter.com/betatim/status/611503830161862657> and
@ptgoetz replied saying "post to dev/usr". This is that post.

Below some brief introduction to what LHCb is, how we process data, and
some constraints/context. I tried to think of all the relevant info and
invariably will have forgotten some...

We are currently surveying the landscape of real time stream processing.
The question we are trying to answer is: "What toolkit could we use as
starting point for the LHCb experiment's real time data processing if we
started from scratch today?"

Storm looks attractive, however I know nobody who has real world experience
with it. So my first question would be: is it worth investigating Storm
further for this kind of use-case?

LHCb is one of the four big experiments at CERN <
http://home.web.cern.ch/about>, the home of the Large Hadron Collider (you
might have heard of us when we discovered the Higgs boson).

A brief outline of what LHCb does when processing data:

The LHCb trigger reduces/filters the stream of messages (or events) from
the experimental hardware. It runs on a large farm of about 27000 physical
cores with about 1-2GB of RAM per core. Its task is to decide which
messages are worth recording to disk and which to reject. Most
messages/events are rejected.

Each message contains low level features (which pixels in the silicon
sensor were hit, how much energy was deposited in read out channel X, etc).
Part of the decision process is to construct higher level features from
these. Usually the more high level a feature the more expensive it is to
compute and the more discriminative it is. We routinely use machine
learning algorithms like random forests/gradient boosted decision trees/NNs
as part of the decision making.

In the current setup each message is delivered to exactly one computing
node, which processes it in full. If the message is accepted it is sent
onwards for storing to disk.

One avenue we are investigating is if it is feasible to instead pass a
message around several "specialised" nodes that each do part of the
processing. This would be attractive as it means you could easily integrate
GPUs into the flow, or spin up more instances for a certain part of the
processing if it is taking too long. Such a scheme would need extremely
cheap serialisation.

A good (longer) introduction written up for a NIPS workshop [pdf]:

<https://www.dropbox.com/s/fcq3v28oxdvp6iw/RealTimeAnalysisLHC.pdf?dl=0>

Some boundary conditions/context:

- input rate of ~40million messages per second
- 100kB per message, without higher level features
- ~O(60) cores per box (extrapolating the tick-tock of Intel's roadmap to
2020)
- ~O(2)GB RAM per core (very hard to tell where this will be in 2020)
- network design is throughput driven, bound to the connectivity
requirements. Events are distributed for filtering with a one-to-one
communication, usually implemented with a "cheap" CLOS-topology like
networking. The topology doesn't matter that much as long as it is
non-blocking, that is, all nodes can send to the next stage of the pipeline
guaranteeing a certain required throughput.
- every messages has to be processed, no duplication (at the output of the
processing chain) of messages
- most messages are rejected, the output rate is (much) lower than the
input rate
- fault tolerance, when the LHC is delivering collisions to the experiment
we have to be ready, otherwise valuable beam time is wasted, there is no
pausing or going back
- need to be able to re-run the processing pipeline on a single machine
after the fact for debugging

If this isn't the right place for discussing this or some other medium is
more efficient, let me know. Did I miss some obvious constraint/bit of
context?

Thanks a lot for your time, you'd think LHCb has a lot of experts on this
topic but I suspect there is even more (real world, modern) expertise out
there :-)

T
ps. I CC'ed some of the other guys from LHCb who are interested in this.

Re: Storm for LHCb at CERN

Posted by Vladimir Gligorov <Vl...@cern.ch>.
Hi Nathan,

yes that is about right; the detector readout is designed to handle up to 40 Tbits/second, so the rest of the system needs to be able to cope with this as well. Our tolerance for data loss in the network system is basically at the couple of permille level; our total data loss due to dead time and other problems throughout the system is (needs to be) 1-2% in stable operation.

Hope that's useful.

Thanks,

Vava

On Jun 23, 2015, at 3:34 PM, Nathan Leung <nc...@gmail.com>
 wrote:

> Forgive me if my math is off, but are you pushing 4TB of raw data every second?  I can't say whether storm would work or not, but I'm pretty sure that if it does the configuration will look very different from what most people are using.  What are your tolerances for data loss?
> 
> On Jun 23, 2015 3:56 AM, "Tim Head" <th...@cern.ch> wrote:
> Hello Storm devs,
> 
> I work for one of the big four experiments at CERN. We are currently
> thinking about "the future of our real time data processing". I tweeted
> about it <https://twitter.com/betatim/status/611503830161862657> and
> @ptgoetz replied saying "post to dev/usr". This is that post.
> 
> Below some brief introduction to what LHCb is, how we process data, and
> some constraints/context. I tried to think of all the relevant info and
> invariably will have forgotten some...
> 
> We are currently surveying the landscape of real time stream processing.
> The question we are trying to answer is: "What toolkit could we use as
> starting point for the LHCb experiment's real time data processing if we
> started from scratch today?"
> 
> Storm looks attractive, however I know nobody who has real world experience
> with it. So my first question would be: is it worth investigating Storm
> further for this kind of use-case?
> 
> LHCb is one of the four big experiments at CERN <
> http://home.web.cern.ch/about>, the home of the Large Hadron Collider (you
> might have heard of us when we discovered the Higgs boson).
> 
> A brief outline of what LHCb does when processing data:
> 
> The LHCb trigger reduces/filters the stream of messages (or events) from
> the experimental hardware. It runs on a large farm of about 27000 physical
> cores with about 1-2GB of RAM per core. Its task is to decide which
> messages are worth recording to disk and which to reject. Most
> messages/events are rejected.
> 
> Each message contains low level features (which pixels in the silicon
> sensor were hit, how much energy was deposited in read out channel X, etc).
> Part of the decision process is to construct higher level features from
> these. Usually the more high level a feature the more expensive it is to
> compute and the more discriminative it is. We routinely use machine
> learning algorithms like random forests/gradient boosted decision trees/NNs
> as part of the decision making.
> 
> In the current setup each message is delivered to exactly one computing
> node, which processes it in full. If the message is accepted it is sent
> onwards for storing to disk.
> 
> One avenue we are investigating is if it is feasible to instead pass a
> message around several "specialised" nodes that each do part of the
> processing. This would be attractive as it means you could easily integrate
> GPUs into the flow, or spin up more instances for a certain part of the
> processing if it is taking too long. Such a scheme would need extremely
> cheap serialisation.
> 
> A good (longer) introduction written up for a NIPS workshop [pdf]:
> 
> <https://www.dropbox.com/s/fcq3v28oxdvp6iw/RealTimeAnalysisLHC.pdf?dl=0>
> 
> Some boundary conditions/context:
> 
> - input rate of ~40million messages per second
> - 100kB per message, without higher level features
> - ~O(60) cores per box (extrapolating the tick-tock of Intel's roadmap to
> 2020)
> - ~O(2)GB RAM per core (very hard to tell where this will be in 2020)
> - network design is throughput driven, bound to the connectivity
> requirements. Events are distributed for filtering with a one-to-one
> communication, usually implemented with a "cheap" CLOS-topology like
> networking. The topology doesn't matter that much as long as it is
> non-blocking, that is, all nodes can send to the next stage of the pipeline
> guaranteeing a certain required throughput.
> - every messages has to be processed, no duplication (at the output of the
> processing chain) of messages
> - most messages are rejected, the output rate is (much) lower than the
> input rate
> - fault tolerance, when the LHC is delivering collisions to the experiment
> we have to be ready, otherwise valuable beam time is wasted, there is no
> pausing or going back
> - need to be able to re-run the processing pipeline on a single machine
> after the fact for debugging
> 
> If this isn't the right place for discussing this or some other medium is
> more efficient, let me know. Did I miss some obvious constraint/bit of
> context?
> 
> Thanks a lot for your time, you'd think LHCb has a lot of experts on this
> topic but I suspect there is even more (real world, modern) expertise out
> there :-)
> 
> T
> ps. I CC'ed some of the other guys from LHCb who are interested in this.


Re: Storm for LHCb at CERN

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
Great to hear it.  If you do want to reach out directly the best e-mail to reach me is bobby@apache.org.
 - Bobby
 


     On Friday, June 26, 2015 3:29 AM, Tim Head <th...@cern.ch> wrote:
   

 Hi

On Tue, Jun 23, 2015 at 4:42 PM Bobby Evans <ev...@yahoo-inc.com.invalid>
wrote:

>
> Long story short your goals and our road-map seem to align quite well.  I
> would think we could totally handle your processing requirements by 2020.
> The biggest issue is around how we would support elasticity, and your use
> case is common enough that it should be an official part of Storm.  I would
> love to work with you on it if you do decide to go with Storm.
>

Thanks a lot for sharing your experience. Based on what we heard here we
will setup a small prototype for LHCb with Storm and gain some experience.

The final setup is only needed in 2020, before that we need to show that
what we propose will work and can be built/operated by the collaboration
running the experiment. Especially as it would be quite a big change to how
things are done currently. We will probably be back with more questions.

Yes, it would be extremely cool to work together on this!

T


  

Re: Storm for LHCb at CERN

Posted by Tim Head <th...@cern.ch>.
Hi

On Tue, Jun 23, 2015 at 4:42 PM Bobby Evans <ev...@yahoo-inc.com.invalid>
wrote:

>
> Long story short your goals and our road-map seem to align quite well.  I
> would think we could totally handle your processing requirements by 2020.
> The biggest issue is around how we would support elasticity, and your use
> case is common enough that it should be an official part of Storm.  I would
> love to work with you on it if you do decide to go with Storm.
>

Thanks a lot for sharing your experience. Based on what we heard here we
will setup a small prototype for LHCb with Storm and gain some experience.

The final setup is only needed in 2020, before that we need to show that
what we propose will work and can be built/operated by the collaboration
running the experiment. Especially as it would be quite a big change to how
things are done currently. We will probably be back with more questions.

Yes, it would be extremely cool to work together on this!

T

Re: Storm for LHCb at CERN

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
Your requirements are very interesting and are definitely the direction that my team at Yahoo and I want to take storm.  Storm currently cannot meet your scale requirements http://www.slideshare.net/RobertEvans26/scaling-apache-storm-hadoop-summit-2015 is a talk I gave at Hadoop summit on where storm is at, and where we are taking it in terms of scale.  Just so you know 4000 nodes is approximately 96000 cores, but they would not all be dedicated to a single topology.  By the time we are done storm should be able to handle the volume of data that you are talking about and the number of machines/cores too.  I expect to have most of the work done by the end of next quarter and everything except possibly some polish by the end of the year so 2020 should totally be doable.  With resource aware scheduling we should also be able to handle a heterogeneous cluster where some nodes have GPUs and others do not.  We are hoping it will be generic enough that you can add in your own resources.  Also the serialization of tuples is done by Kryo and is totally pluggable so you could always make it lighter weight if you needed to.

There are a few issues that we would need to tackle as a community before handling this type of thing. The first is around the exactly once semantics that you want or at least you seem to want "every messages has to be processed, no duplication (at the output of the processing chain) of messages".  Storm does exactly once through the trident API, which is actually not exactly once, it is at least once with filtering of duplicates when committing state (Output of the processing chain).  So in theory it should work but it relies on the spout, which ingests the data, to have replay capabilities.  This is often done through Kafka which persists the data to disk, and negates the entire purpose of filtering the data before persisting it to disk (my swag would be around 50,000 spindles without replication and I know Kafka cannot currently handle that).  We could do it through an in-memory system like the redis pub-sub which should scale fairly well, but we don't have a trident spout for that yet.
We can probably get this to work with under 1-2% data loss without trident.  We do that already in Yahoo without replay, just best effort delivery.  In the common case we have no data loss, but occasionally there are issues usually hardware related, we drop too much data, and alarms go off.
 The second issue I see is around elasticity in your topology.  We have plans to support dynamic scaling of topologies and there are some great proofs of concept out there already to do this.  The issue is that in its current form there is almost definitely going to be data loss when this happens, unless you have trident to get the replay and the exactly once semantics.  At Yahoo when storm is acting mostly as a master/worker like setup then we run multiple smaller topologies all of whom are consuming from the same source.  If we need more processing we can launch more topologies.  If we need less we can deactivate some of them, let the data they are processing drain and then kill them.  This does not give you the scaling you want per level of your topology, where you could give more resources to a particular part of your topology. We should be able to accomplish this without data loss with a little bit of work, and probably only for specific types of groupings (shuffle being the most obvious).

Long story short your goals and our road-map seem to align quite well.  I would think we could totally handle your processing requirements by 2020.  The biggest issue is around how we would support elasticity, and your use case is common enough that it should be an official part of Storm.  I would love to work with you on it if you do decide to go with Storm.

- Bobby
 


     On Tuesday, June 23, 2015 8:35 AM, Nathan Leung <nc...@gmail.com> wrote:
   

 Forgive me if my math is off, but are you pushing 4TB of raw data every
second?  I can't say whether storm would work or not, but I'm pretty sure
that if it does the configuration will look very different from what most
people are using.  What are your tolerances for data loss?
On Jun 23, 2015 3:56 AM, "Tim Head" <th...@cern.ch> wrote:

> Hello Storm devs,
>
> I work for one of the big four experiments at CERN. We are currently
> thinking about "the future of our real time data processing". I tweeted
> about it <https://twitter.com/betatim/status/611503830161862657> and
> @ptgoetz replied saying "post to dev/usr". This is that post.
>
> Below some brief introduction to what LHCb is, how we process data, and
> some constraints/context. I tried to think of all the relevant info and
> invariably will have forgotten some...
>
> We are currently surveying the landscape of real time stream processing.
> The question we are trying to answer is: "What toolkit could we use as
> starting point for the LHCb experiment's real time data processing if we
> started from scratch today?"
>
> Storm looks attractive, however I know nobody who has real world experience
> with it. So my first question would be: is it worth investigating Storm
> further for this kind of use-case?
>
> LHCb is one of the four big experiments at CERN <
> http://home.web.cern.ch/about>, the home of the Large Hadron Collider (you
> might have heard of us when we discovered the Higgs boson).
>
> A brief outline of what LHCb does when processing data:
>
> The LHCb trigger reduces/filters the stream of messages (or events) from
> the experimental hardware. It runs on a large farm of about 27000 physical
> cores with about 1-2GB of RAM per core. Its task is to decide which
> messages are worth recording to disk and which to reject. Most
> messages/events are rejected.
>
> Each message contains low level features (which pixels in the silicon
> sensor were hit, how much energy was deposited in read out channel X, etc).
> Part of the decision process is to construct higher level features from
> these. Usually the more high level a feature the more expensive it is to
> compute and the more discriminative it is. We routinely use machine
> learning algorithms like random forests/gradient boosted decision trees/NNs
> as part of the decision making.
>
> In the current setup each message is delivered to exactly one computing
> node, which processes it in full. If the message is accepted it is sent
> onwards for storing to disk.
>
> One avenue we are investigating is if it is feasible to instead pass a
> message around several "specialised" nodes that each do part of the
> processing. This would be attractive as it means you could easily integrate
> GPUs into the flow, or spin up more instances for a certain part of the
> processing if it is taking too long. Such a scheme would need extremely
> cheap serialisation.
>
> A good (longer) introduction written up for a NIPS workshop [pdf]:
>
> <https://www.dropbox.com/s/fcq3v28oxdvp6iw/RealTimeAnalysisLHC.pdf?dl=0>
>
> Some boundary conditions/context:
>
> - input rate of ~40million messages per second
> - 100kB per message, without higher level features
> - ~O(60) cores per box (extrapolating the tick-tock of Intel's roadmap to
> 2020)
> - ~O(2)GB RAM per core (very hard to tell where this will be in 2020)
> - network design is throughput driven, bound to the connectivity
> requirements. Events are distributed for filtering with a one-to-one
> communication, usually implemented with a "cheap" CLOS-topology like
> networking. The topology doesn't matter that much as long as it is
> non-blocking, that is, all nodes can send to the next stage of the pipeline
> guaranteeing a certain required throughput.
> - every messages has to be processed, no duplication (at the output of the
> processing chain) of messages
> - most messages are rejected, the output rate is (much) lower than the
> input rate
> - fault tolerance, when the LHC is delivering collisions to the experiment
> we have to be ready, otherwise valuable beam time is wasted, there is no
> pausing or going back
> - need to be able to re-run the processing pipeline on a single machine
> after the fact for debugging
>
> If this isn't the right place for discussing this or some other medium is
> more efficient, let me know. Did I miss some obvious constraint/bit of
> context?
>
> Thanks a lot for your time, you'd think LHCb has a lot of experts on this
> topic but I suspect there is even more (real world, modern) expertise out
> there :-)
>
> T
> ps. I CC'ed some of the other guys from LHCb who are interested in this.
>


  

Re: Storm for LHCb at CERN

Posted by Nathan Leung <nc...@gmail.com>.
Forgive me if my math is off, but are you pushing 4TB of raw data every
second?  I can't say whether storm would work or not, but I'm pretty sure
that if it does the configuration will look very different from what most
people are using.  What are your tolerances for data loss?
On Jun 23, 2015 3:56 AM, "Tim Head" <th...@cern.ch> wrote:

> Hello Storm devs,
>
> I work for one of the big four experiments at CERN. We are currently
> thinking about "the future of our real time data processing". I tweeted
> about it <https://twitter.com/betatim/status/611503830161862657> and
> @ptgoetz replied saying "post to dev/usr". This is that post.
>
> Below some brief introduction to what LHCb is, how we process data, and
> some constraints/context. I tried to think of all the relevant info and
> invariably will have forgotten some...
>
> We are currently surveying the landscape of real time stream processing.
> The question we are trying to answer is: "What toolkit could we use as
> starting point for the LHCb experiment's real time data processing if we
> started from scratch today?"
>
> Storm looks attractive, however I know nobody who has real world experience
> with it. So my first question would be: is it worth investigating Storm
> further for this kind of use-case?
>
> LHCb is one of the four big experiments at CERN <
> http://home.web.cern.ch/about>, the home of the Large Hadron Collider (you
> might have heard of us when we discovered the Higgs boson).
>
> A brief outline of what LHCb does when processing data:
>
> The LHCb trigger reduces/filters the stream of messages (or events) from
> the experimental hardware. It runs on a large farm of about 27000 physical
> cores with about 1-2GB of RAM per core. Its task is to decide which
> messages are worth recording to disk and which to reject. Most
> messages/events are rejected.
>
> Each message contains low level features (which pixels in the silicon
> sensor were hit, how much energy was deposited in read out channel X, etc).
> Part of the decision process is to construct higher level features from
> these. Usually the more high level a feature the more expensive it is to
> compute and the more discriminative it is. We routinely use machine
> learning algorithms like random forests/gradient boosted decision trees/NNs
> as part of the decision making.
>
> In the current setup each message is delivered to exactly one computing
> node, which processes it in full. If the message is accepted it is sent
> onwards for storing to disk.
>
> One avenue we are investigating is if it is feasible to instead pass a
> message around several "specialised" nodes that each do part of the
> processing. This would be attractive as it means you could easily integrate
> GPUs into the flow, or spin up more instances for a certain part of the
> processing if it is taking too long. Such a scheme would need extremely
> cheap serialisation.
>
> A good (longer) introduction written up for a NIPS workshop [pdf]:
>
> <https://www.dropbox.com/s/fcq3v28oxdvp6iw/RealTimeAnalysisLHC.pdf?dl=0>
>
> Some boundary conditions/context:
>
> - input rate of ~40million messages per second
> - 100kB per message, without higher level features
> - ~O(60) cores per box (extrapolating the tick-tock of Intel's roadmap to
> 2020)
> - ~O(2)GB RAM per core (very hard to tell where this will be in 2020)
> - network design is throughput driven, bound to the connectivity
> requirements. Events are distributed for filtering with a one-to-one
> communication, usually implemented with a "cheap" CLOS-topology like
> networking. The topology doesn't matter that much as long as it is
> non-blocking, that is, all nodes can send to the next stage of the pipeline
> guaranteeing a certain required throughput.
> - every messages has to be processed, no duplication (at the output of the
> processing chain) of messages
> - most messages are rejected, the output rate is (much) lower than the
> input rate
> - fault tolerance, when the LHC is delivering collisions to the experiment
> we have to be ready, otherwise valuable beam time is wasted, there is no
> pausing or going back
> - need to be able to re-run the processing pipeline on a single machine
> after the fact for debugging
>
> If this isn't the right place for discussing this or some other medium is
> more efficient, let me know. Did I miss some obvious constraint/bit of
> context?
>
> Thanks a lot for your time, you'd think LHCb has a lot of experts on this
> topic but I suspect there is even more (real world, modern) expertise out
> there :-)
>
> T
> ps. I CC'ed some of the other guys from LHCb who are interested in this.
>