You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Jakes John <ja...@gmail.com> on 2017/08/08 00:46:34 UTC

Fields grouping

Hi,

   I need to have a streaming pipeline  Kafka->storm-> ElasticSearch.  The
volume of message produced to Kafka is in order of  millions. Hence, I need
to have maximum throughput in Elasticsearch writes.  Each message has an id
which is mapped to a Elasticsearch index.  The number of possible message
ids possible are less than 50(which means, max number of created ES
indices). I would like to batch the ES writes where messages are grouped by
index *as much as possible*.
The problem is that message counts per id are dynamic and certain ids can
have very huge message inflows when compared to other.  Largest message id
can have > 10x message inflow than the smallest. Hence, shuffle grouping on
ids doesn't work here.  Partial key grouping also won't work as I need more
number of output streams for largest message ids.

eg:  i have 10 tasks that write to ES


Say all my messages are spread across 2 message ids - ID1, ID2 which I have
to write to 2 separate index in ES

Say ID1 has 4 times more messages than ID2 at one instant

So, the best possible output would be,
First 8 tasks writes messages with ID1 to ES
Last 2 tasks writes messages with ID2 to ES


Say at a different instant,  ID1 has same number of messages as ID2

So, the best possible output would be,
First 5 tasks writes messages with ID1 to ES
Last 5 tasks writes messages with ID2 to ES


My grouping requirement is just an optimization but it is not a
requirement.   What is the best way where I can group messages *dynamically*
on input streams with hugely varying message counts* in the best way
possible*?  Also, I have the control over creating message ids if it helps
the data distribution

Re: Fields grouping

Posted by Jakes John <ja...@gmail.com>.
" Using direct grouping will let the bolt upstream of the ES writing bolt
decide which ES bolt receives a given message. So you could have spouts ->
sorter bolts -> ES bolts, where sorter bolts use direct grouping to
partition the stream by index id in whatever way you need. "
           What is the best way to use direct grouping in a dynamic way?
For eg: The distribution of index ids will be different across time. I
might need more threads for a index during one point while lesser threads
during the other

On Tue, Aug 8, 2017 at 1:35 AM, Stig Rohde Døssing <sr...@apache.org> wrote:

> You can implement your own grouping by using direct grouping (from
> http://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html): "*Direct
> grouping*: This is a special kind of grouping. A stream grouped this way
> means that the *producer* of the tuple decides which task of the consumer
> will receive this tuple. Direct groupings can only be declared on streams
> that have been declared as direct streams. Tuples emitted to a direct
> stream must be emitted using one of the [emitDirect](javadocs/org/
> apache/storm/task/OutputCollector.html#emitDirect(int, int,
> java.util.List) methods. A bolt can get the task ids of its consumers by
> either using the provided TopologyContext
> <http://storm.apache.org/releases/2.0.0-SNAPSHOT/javadocs/org/apache/storm/task/TopologyContext.html>
> or by keeping track of the output of the emit method in OutputCollector
> <http://storm.apache.org/releases/2.0.0-SNAPSHOT/javadocs/org/apache/storm/task/OutputCollector.html>
> (which returns the task ids that the tuple was sent to)."
>
> Using direct grouping will let the bolt upstream of the ES writing bolt
> decide which ES bolt receives a given message. So you could have spouts ->
> sorter bolts -> ES bolts, where sorter bolts use direct grouping to
> partition the stream by index id in whatever way you need.
>
> On another note, I want to say that ES' bulk API supports writing to
> multiple indices in one go, so if you haven't already you should benchmark
> to see what the performance penalty of mixing indices in one bulk API call
> would be. If the penalty isn't much, you might be fine with shuffle
> grouping still.
>
> 2017-08-08 2:46 GMT+02:00 Jakes John <ja...@gmail.com>:
>
>> Hi,
>>
>>    I need to have a streaming pipeline  Kafka->storm-> ElasticSearch.
>> The volume of message produced to Kafka is in order of  millions. Hence, I
>> need to have maximum throughput in Elasticsearch writes.  Each message has
>> an id which is mapped to a Elasticsearch index.  The number of possible
>> message ids possible are less than 50(which means, max number of created ES
>> indices). I would like to batch the ES writes where messages are grouped by
>> index *as much as possible*.
>> The problem is that message counts per id are dynamic and certain ids can
>> have very huge message inflows when compared to other.  Largest message id
>> can have > 10x message inflow than the smallest. Hence, shuffle grouping on
>> ids doesn't work here.  Partial key grouping also won't work as I need more
>> number of output streams for largest message ids.
>>
>> eg:  i have 10 tasks that write to ES
>>
>>
>> Say all my messages are spread across 2 message ids - ID1, ID2 which I
>> have to write to 2 separate index in ES
>>
>> Say ID1 has 4 times more messages than ID2 at one instant
>>
>> So, the best possible output would be,
>> First 8 tasks writes messages with ID1 to ES
>> Last 2 tasks writes messages with ID2 to ES
>>
>>
>> Say at a different instant,  ID1 has same number of messages as ID2
>>
>> So, the best possible output would be,
>> First 5 tasks writes messages with ID1 to ES
>> Last 5 tasks writes messages with ID2 to ES
>>
>>
>> My grouping requirement is just an optimization but it is not a
>> requirement.   What is the best way where I can group messages
>> *dynamically* on input streams with hugely varying message counts* in
>> the best way possible*?  Also, I have the control over creating message
>> ids if it helps the data distribution
>>
>
>

Re: Fields grouping

Posted by Stig Rohde Døssing <sr...@apache.org>.
You can implement your own grouping by using direct grouping (from
http://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html): "*Direct
grouping*: This is a special kind of grouping. A stream grouped this way
means that the *producer* of the tuple decides which task of the consumer
will receive this tuple. Direct groupings can only be declared on streams
that have been declared as direct streams. Tuples emitted to a direct
stream must be emitted using one of the
[emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int,
int, java.util.List) methods. A bolt can get the task ids of its consumers
by either using the provided TopologyContext
<http://storm.apache.org/releases/2.0.0-SNAPSHOT/javadocs/org/apache/storm/task/TopologyContext.html>
or by keeping track of the output of the emit method in OutputCollector
<http://storm.apache.org/releases/2.0.0-SNAPSHOT/javadocs/org/apache/storm/task/OutputCollector.html>
(which returns the task ids that the tuple was sent to)."

Using direct grouping will let the bolt upstream of the ES writing bolt
decide which ES bolt receives a given message. So you could have spouts ->
sorter bolts -> ES bolts, where sorter bolts use direct grouping to
partition the stream by index id in whatever way you need.

On another note, I want to say that ES' bulk API supports writing to
multiple indices in one go, so if you haven't already you should benchmark
to see what the performance penalty of mixing indices in one bulk API call
would be. If the penalty isn't much, you might be fine with shuffle
grouping still.

2017-08-08 2:46 GMT+02:00 Jakes John <ja...@gmail.com>:

> Hi,
>
>    I need to have a streaming pipeline  Kafka->storm-> ElasticSearch.  The
> volume of message produced to Kafka is in order of  millions. Hence, I need
> to have maximum throughput in Elasticsearch writes.  Each message has an id
> which is mapped to a Elasticsearch index.  The number of possible message
> ids possible are less than 50(which means, max number of created ES
> indices). I would like to batch the ES writes where messages are grouped by
> index *as much as possible*.
> The problem is that message counts per id are dynamic and certain ids can
> have very huge message inflows when compared to other.  Largest message id
> can have > 10x message inflow than the smallest. Hence, shuffle grouping on
> ids doesn't work here.  Partial key grouping also won't work as I need more
> number of output streams for largest message ids.
>
> eg:  i have 10 tasks that write to ES
>
>
> Say all my messages are spread across 2 message ids - ID1, ID2 which I
> have to write to 2 separate index in ES
>
> Say ID1 has 4 times more messages than ID2 at one instant
>
> So, the best possible output would be,
> First 8 tasks writes messages with ID1 to ES
> Last 2 tasks writes messages with ID2 to ES
>
>
> Say at a different instant,  ID1 has same number of messages as ID2
>
> So, the best possible output would be,
> First 5 tasks writes messages with ID1 to ES
> Last 5 tasks writes messages with ID2 to ES
>
>
> My grouping requirement is just an optimization but it is not a
> requirement.   What is the best way where I can group messages
> *dynamically* on input streams with hugely varying message counts* in the
> best way possible*?  Also, I have the control over creating message ids
> if it helps the data distribution
>