You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@apex.apache.org by Sunil Parmar <sp...@threatmetrix.com> on 2016/10/14 20:17:31 UTC

balanced of Stream Codec

We're using Stream codec to consistently / parallel processing of the data across the operator partitions. Our requirement is to serialize processing of the data based on particular tuple attribute let's call it 'catagory_name' . In order to achieve the parallel processing of different category names we're written our stream codec as following.


   public class CatagoryStreamCodec extends KryoSerializableStreamCodec<Object> {

private static final long serialVersionUID = -687991492884005033L;



    @Override

    public int getPartition(Object in) {

        try {

        InputTuple tuple = (InputTuple) in;

String partitionKehy = tuple.getName();

        if(partitionKehy != null) {

return partitionKehy.hashCode();

        }

    }

   }

It's working as expected but we observed inconsistent partitions when we run this in production env with 20 partitioner of the operator following the codec in the dag.

  *   Some operator instance didn't process any data
  *   Some operator instance process as many tuples as combined everybody else

Questions :

  *   getPartition method supposed to return the actual partition or just some lower bit used for deciding partition ?
  *   Number of partitions is known to application properties and can vary between deployments or environments. Is it best practice to use that property in the stream codec ?
  *   Any recommended hash function for getting consistent variations in the lower bit with less variety of data. we've ~100+ categories and I'm thinking to have 10+ operator partitions.

Thanks,
Sunil

Re: balanced of Stream Codec

Posted by Vlad Rozov <v....@datatorrent.com>.
Using different hash function will help only in case data is equally 
distributed across categories. In many cases data is skewed and some 
categories occur more frequently than others. In such case generic hash 
function will not help. Can you try to sample data and see if the data 
is equally distributed across categories?

Vlad


On 10/16/16 10:40, Pramod Immaneni wrote:
> Hi Sunil,
>
> Have you tried an alternate hashing function other than java hashcode 
> that might provide a more uniform distribution of your data? The 
> google guava library provides a set of hashing strategies, like murmur 
> hash, that is reported to have lesser hash collisions in different 
> cases. Below is a link explaining these from their website
>
> https://github.com/google/guava/wiki/HashingExplained
>
> Here is a link where someone has done a comparative study of different 
> hashing functions
> http://programmers.stackexchange.com/questions/49550/which-hashing-algorithm-is-best-for-uniqueness-and-speed
>
> If you end up choosing hashing function from google guava library, 
> make sure you use the documentation from guava version 11.0 as this 
> version of guava is already included in Hadoop classpath.
>
> Thanks
>
> On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar 
> <sparmar@threatmetrix.com <ma...@threatmetrix.com>> wrote:
>
>     We\u2019re using Stream codec to consistently / parallel processing of
>     the data across the operator partitions. Our requirement is to
>     serialize processing of the data based on particular tuple
>     attribute let\u2019s call it \u2018catagory_name\u2019 . In order to achieve the
>     parallel processing of different category names we\u2019re written our
>     stream codec as following.
>
>        public class CatagoryStreamCodec extends
>     KryoSerializableStreamCodec<Object> {
>
>     private static final long serialVersionUID = -687991492884005033L;
>
>     @Override
>
>     public int getPartition(Object in) {
>
>     try {
>
>     InputTuple tuple = (InputTuple) in;
>
>     String partitionKehy = tuple.getName();
>
>     if(partitionKehy != null) {
>
>     return partitionKehy.hashCode();
>
>     }
>
>         }
>
>        }
>
>     It\u2019s working as expected *but *we observed inconsistent partitions
>     when we run this in production env with 20 partitioner of the
>     operator following the codec in the dag.
>
>       * Some operator instance didn\u2019t process any data
>       * Some operator instance process as many tuples as combined
>         everybody else
>
>
>     Questions :
>
>       * getPartition method supposed to return the actual partition or
>         just some lower bit used for deciding partition ?
>       * Number of partitions is known to application properties and
>         can vary between deployments or environments. Is it best
>         practice to use that property in the stream codec ?
>       * Any recommended hash function for getting consistent
>         variations in the lower bit with less variety of data. we\u2019ve
>         ~100+ categories and I\u2019m thinking to have 10+ operator
>         partitions.
>
>
>     Thanks,
>     Sunil
>
>


Re: balanced of Stream Codec

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Hi Sunil,

Have you tried an alternate hashing function other than java hashcode that
might provide a more uniform distribution of your data? The google guava
library provides a set of hashing strategies, like murmur hash, that is
reported to have lesser hash collisions in different cases. Below is a link
explaining these from their website

https://github.com/google/guava/wiki/HashingExplained

Here is a link where someone has done a comparative study of different
hashing functions
http://programmers.stackexchange.com/questions/49550/which-hashing-algorithm-is-best-for-uniqueness-and-speed

If you end up choosing hashing function from google guava library, make
sure you use the documentation from guava version 11.0 as this version of
guava is already included in Hadoop classpath.

Thanks

On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar <sp...@threatmetrix.com>
wrote:

> We’re using Stream codec to consistently / parallel processing of the data
> across the operator partitions. Our requirement is to serialize processing
> of the data based on particular tuple attribute let’s call it
> ‘catagory_name’ . In order to achieve the parallel processing of different
> category names we’re written our stream codec as following.
>
>    public class CatagoryStreamCodec extends KryoSerializableStreamCodec<Object>
> {
>
> private static final long serialVersionUID = -687991492884005033L;
>
>
>
>     @Override
>
>     public int getPartition(Object in) {
>
>         try {
>
>         InputTuple tuple = (InputTuple) in;
>
> String partitionKehy = tuple.getName();
>
>         if(partitionKehy != null) {
>
> return partitionKehy.hashCode();
>
>         }
>
>     }
>    }
>
> It’s working as expected *but *we observed inconsistent partitions when
> we run this in production env with 20 partitioner of the operator following
> the codec in the dag.
>
>    - Some operator instance didn’t process any data
>    - Some operator instance process as many tuples as combined everybody
>    else
>
>
> Questions :
>
>    - getPartition method supposed to return the actual partition or just
>    some lower bit used for deciding partition ?
>    - Number of partitions is known to application properties and can vary
>    between deployments or environments. Is it best practice to use that
>    property in the stream codec ?
>    - Any recommended hash function for getting consistent variations in
>    the lower bit with less variety of data. we’ve ~100+ categories and I’m
>    thinking to have 10+ operator partitions.
>
>
> Thanks,
> Sunil
>

Re: balanced of Stream Codec

Posted by Ashwin Chandra Putta <as...@gmail.com>.
Sunil,

For key based partitioning, the getPartition method is supposed to return a
consistent integer representing the key for partitioning. Typically the
java hashCode of the key. The tuples are then routed based on the integer
and looking at its lower bits on the mask (number of lower bits) based on
number of downstream partitions.

A partition key with only 100 possible values may not load balance properly
and will most likely introduce skew, as in your case. It is recommended to
use a key that will generate a high range of values for the return value.

Regards,
Ashwin.

On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar <sp...@threatmetrix.com>
wrote:

> We’re using Stream codec to consistently / parallel processing of the data
> across the operator partitions. Our requirement is to serialize processing
> of the data based on particular tuple attribute let’s call it
> ‘catagory_name’ . In order to achieve the parallel processing of different
> category names we’re written our stream codec as following.
>
>    public class CatagoryStreamCodec extends KryoSerializableStreamCodec<Object>
> {
>
> private static final long serialVersionUID = -687991492884005033L;
>
>
>
>     @Override
>
>     public int getPartition(Object in) {
>
>         try {
>
>         InputTuple tuple = (InputTuple) in;
>
> String partitionKehy = tuple.getName();
>
>         if(partitionKehy != null) {
>
> return partitionKehy.hashCode();
>
>         }
>
>     }
>    }
>
> It’s working as expected *but *we observed inconsistent partitions when
> we run this in production env with 20 partitioner of the operator following
> the codec in the dag.
>
>    - Some operator instance didn’t process any data
>    - Some operator instance process as many tuples as combined everybody
>    else
>
>
> Questions :
>
>    - getPartition method supposed to return the actual partition or just
>    some lower bit used for deciding partition ?
>    - Number of partitions is known to application properties and can vary
>    between deployments or environments. Is it best practice to use that
>    property in the stream codec ?
>    - Any recommended hash function for getting consistent variations in
>    the lower bit with less variety of data. we’ve ~100+ categories and I’m
>    thinking to have 10+ operator partitions.
>
>
> Thanks,
> Sunil
>



-- 

Regards,
Ashwin.

Re: balanced of Stream Codec

Posted by Thomas Weise <th...@apache.org>.
Without knowing the operations following the indeterministic partitioning,
assume that you cannot have exactly-once results because processing won't
be idempotent. If there are only stateless operations, then it should be
OK. If there are stateful operations (windowing with any form of
aggregation etc.), then key based partitioning is needed.

Thomas


On Sat, Oct 15, 2016 at 8:39 PM, Amol Kekre <am...@datatorrent.com> wrote:

>
> Sunil,
> Round robin in an internal operator could be used in exactly once writes
> to external system for certain operations. I do know what your business
> logic is, but in case it can be split into partitions and then unified (for
> example aggregates), you have a situation where you can use round robin and
> then unify the result. The result can then be fed into an output operator
> that handles exactly once semantics with external system like Cassandra
> etc. Apex engine will guarantee that all the tuples in a window are same
> for the logical operator, so for the operator that you partition by
> round-robin, the result post-unifier should be identical (aka effectively
> idempotent) even if each partition is not idempotent. Often "each partition
> be idempotent" is not useful for internal operators. The case where this
> would not work is if order of tuples based on category_name is important.
>
> Thks
> Amol
>
>
> On Sat, Oct 15, 2016 at 6:03 PM, Sandesh Hegde <sa...@datatorrent.com>
> wrote:
>
>> Round robin is not idempotent, so you can't have exactly once.
>>
>> On Sat, Oct 15, 2016 at 4:49 PM Munagala Ramanath <ra...@datatorrent.com>
>> wrote:
>>
>>> If you want round-robin distribution which will give you uniform load
>>> across all partitions you can use
>>> a StreamCodec like this (provided the number of partitions is known and
>>> static):
>>>
>>> *public class CatagoryStreamCodec extends
>>> KryoSerializableStreamCodec<Object> {*
>>> *  private int n = 0;*
>>> *  @Override*
>>> *  public int getPartition(Object in) {*
>>> *    return n++ % nPartitions;    // nPartitions is the number of
>>> partitions*
>>> *  }*
>>> *}*
>>>
>>> If you want certain category names to go to certain partitions, you can
>>> create that mapping
>>> within the StreamCodec (map category names to integers in the range
>>> *0..nPartitions-1*), and, for each tuple, lookup the category name in
>>> the map and return the corresponding value.
>>>
>>> Ram
>>> ....
>>>
>>> On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar <sp...@threatmetrix.com>
>>> wrote:
>>>
>>> We’re using Stream codec to consistently / parallel processing of the
>>> data across the operator partitions. Our requirement is to serialize
>>> processing of the data based on particular tuple attribute let’s call it
>>> ‘catagory_name’ . In order to achieve the parallel processing of different
>>> category names we’re written our stream codec as following.
>>>
>>>    public class CatagoryStreamCodec extends
>>> KryoSerializableStreamCodec<Object> {
>>>
>>> private static final long serialVersionUID = -687991492884005033L;
>>>
>>>
>>>
>>>     @Override
>>>
>>>     public int getPartition(Object in) {
>>>
>>>         try {
>>>
>>>         InputTuple tuple = (InputTuple) in;
>>>
>>> String partitionKehy = tuple.getName();
>>>
>>>         if(partitionKehy != null) {
>>>
>>> return partitionKehy.hashCode();
>>>
>>>         }
>>>
>>>     }
>>>    }
>>>
>>> It’s working as expected *but *we observed inconsistent partitions when
>>> we run this in production env with 20 partitioner of the operator following
>>> the codec in the dag.
>>>
>>>    - Some operator instance didn’t process any data
>>>    - Some operator instance process as many tuples as combined
>>>    everybody else
>>>
>>>
>>> Questions :
>>>
>>>    - getPartition method supposed to return the actual partition or
>>>    just some lower bit used for deciding partition ?
>>>    - Number of partitions is known to application properties and can
>>>    vary between deployments or environments. Is it best practice to use that
>>>    property in the stream codec ?
>>>    - Any recommended hash function for getting consistent variations in
>>>    the lower bit with less variety of data. we’ve ~100+ categories and I’m
>>>    thinking to have 10+ operator partitions.
>>>
>>>
>>> Thanks,
>>> Sunil
>>>
>>>
>

Re: balanced of Stream Codec

Posted by Amol Kekre <am...@datatorrent.com>.
Sunil,
Round robin in an internal operator could be used in exactly once writes to
external system for certain operations. I do know what your business logic
is, but in case it can be split into partitions and then unified (for
example aggregates), you have a situation where you can use round robin and
then unify the result. The result can then be fed into an output operator
that handles exactly once semantics with external system like Cassandra
etc. Apex engine will guarantee that all the tuples in a window are same
for the logical operator, so for the operator that you partition by
round-robin, the result post-unifier should be identical (aka effectively
idempotent) even if each partition is not idempotent. Often "each partition
be idempotent" is not useful for internal operators. The case where this
would not work is if order of tuples based on category_name is important.

Thks
Amol


On Sat, Oct 15, 2016 at 6:03 PM, Sandesh Hegde <sa...@datatorrent.com>
wrote:

> Round robin is not idempotent, so you can't have exactly once.
>
> On Sat, Oct 15, 2016 at 4:49 PM Munagala Ramanath <ra...@datatorrent.com>
> wrote:
>
>> If you want round-robin distribution which will give you uniform load
>> across all partitions you can use
>> a StreamCodec like this (provided the number of partitions is known and
>> static):
>>
>> *public class CatagoryStreamCodec extends
>> KryoSerializableStreamCodec<Object> {*
>> *  private int n = 0;*
>> *  @Override*
>> *  public int getPartition(Object in) {*
>> *    return n++ % nPartitions;    // nPartitions is the number of
>> partitions*
>> *  }*
>> *}*
>>
>> If you want certain category names to go to certain partitions, you can
>> create that mapping
>> within the StreamCodec (map category names to integers in the range
>> *0..nPartitions-1*), and, for each tuple, lookup the category name in
>> the map and return the corresponding value.
>>
>> Ram
>> ....
>>
>> On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar <sp...@threatmetrix.com>
>> wrote:
>>
>> We’re using Stream codec to consistently / parallel processing of the
>> data across the operator partitions. Our requirement is to serialize
>> processing of the data based on particular tuple attribute let’s call it
>> ‘catagory_name’ . In order to achieve the parallel processing of different
>> category names we’re written our stream codec as following.
>>
>>    public class CatagoryStreamCodec extends KryoSerializableStreamCodec<Object>
>> {
>>
>> private static final long serialVersionUID = -687991492884005033L;
>>
>>
>>
>>     @Override
>>
>>     public int getPartition(Object in) {
>>
>>         try {
>>
>>         InputTuple tuple = (InputTuple) in;
>>
>> String partitionKehy = tuple.getName();
>>
>>         if(partitionKehy != null) {
>>
>> return partitionKehy.hashCode();
>>
>>         }
>>
>>     }
>>    }
>>
>> It’s working as expected *but *we observed inconsistent partitions when
>> we run this in production env with 20 partitioner of the operator following
>> the codec in the dag.
>>
>>    - Some operator instance didn’t process any data
>>    - Some operator instance process as many tuples as combined everybody
>>    else
>>
>>
>> Questions :
>>
>>    - getPartition method supposed to return the actual partition or just
>>    some lower bit used for deciding partition ?
>>    - Number of partitions is known to application properties and can
>>    vary between deployments or environments. Is it best practice to use that
>>    property in the stream codec ?
>>    - Any recommended hash function for getting consistent variations in
>>    the lower bit with less variety of data. we’ve ~100+ categories and I’m
>>    thinking to have 10+ operator partitions.
>>
>>
>> Thanks,
>> Sunil
>>
>>

Re: balanced of Stream Codec

Posted by Sandesh Hegde <sa...@datatorrent.com>.
Round robin is not idempotent, so you can't have exactly once.
On Sat, Oct 15, 2016 at 4:49 PM Munagala Ramanath <ra...@datatorrent.com>
wrote:

> If you want round-robin distribution which will give you uniform load
> across all partitions you can use
> a StreamCodec like this (provided the number of partitions is known and
> static):
>
> *public class CatagoryStreamCodec extends
> KryoSerializableStreamCodec<Object> {*
> *  private int n = 0;*
> *  @Override*
> *  public int getPartition(Object in) {*
> *    return n++ % nPartitions;    // nPartitions is the number of
> partitions*
> *  }*
> *}*
>
> If you want certain category names to go to certain partitions, you can
> create that mapping
> within the StreamCodec (map category names to integers in the range
> *0..nPartitions-1*), and, for each tuple, lookup the category name in the
> map and return the corresponding value.
>
> Ram
> ....
>
> On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar <sp...@threatmetrix.com>
> wrote:
>
> We’re using Stream codec to consistently / parallel processing of the data
> across the operator partitions. Our requirement is to serialize processing
> of the data based on particular tuple attribute let’s call it
> ‘catagory_name’ . In order to achieve the parallel processing of different
> category names we’re written our stream codec as following.
>
>    public class CatagoryStreamCodec extends
> KryoSerializableStreamCodec<Object> {
>
> private static final long serialVersionUID = -687991492884005033L;
>
>
>
>     @Override
>
>     public int getPartition(Object in) {
>
>         try {
>
>         InputTuple tuple = (InputTuple) in;
>
> String partitionKehy = tuple.getName();
>
>         if(partitionKehy != null) {
>
> return partitionKehy.hashCode();
>
>         }
>
>     }
>    }
>
> It’s working as expected *but *we observed inconsistent partitions when
> we run this in production env with 20 partitioner of the operator following
> the codec in the dag.
>
>    - Some operator instance didn’t process any data
>    - Some operator instance process as many tuples as combined everybody
>    else
>
>
> Questions :
>
>    - getPartition method supposed to return the actual partition or just
>    some lower bit used for deciding partition ?
>    - Number of partitions is known to application properties and can vary
>    between deployments or environments. Is it best practice to use that
>    property in the stream codec ?
>    - Any recommended hash function for getting consistent variations in
>    the lower bit with less variety of data. we’ve ~100+ categories and I’m
>    thinking to have 10+ operator partitions.
>
>
> Thanks,
> Sunil
>
>

Re: balanced of Stream Codec

Posted by Munagala Ramanath <ra...@datatorrent.com>.
If you want round-robin distribution which will give you uniform load
across all partitions you can use
a StreamCodec like this (provided the number of partitions is known and
static):

*public class CatagoryStreamCodec extends
KryoSerializableStreamCodec<Object> {*
*  private int n = 0;*
*  @Override*
*  public int getPartition(Object in) {*
*    return n++ % nPartitions;    // nPartitions is the number of
partitions*
*  }*
*}*

If you want certain category names to go to certain partitions, you can
create that mapping
within the StreamCodec (map category names to integers in the range
*0..nPartitions-1*), and, for each tuple, lookup the category name in the
map and return the corresponding value.

Ram
....

On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar <sp...@threatmetrix.com>
wrote:

> We’re using Stream codec to consistently / parallel processing of the data
> across the operator partitions. Our requirement is to serialize processing
> of the data based on particular tuple attribute let’s call it
> ‘catagory_name’ . In order to achieve the parallel processing of different
> category names we’re written our stream codec as following.
>
>    public class CatagoryStreamCodec extends KryoSerializableStreamCodec<Object>
> {
>
> private static final long serialVersionUID = -687991492884005033L;
>
>
>
>     @Override
>
>     public int getPartition(Object in) {
>
>         try {
>
>         InputTuple tuple = (InputTuple) in;
>
> String partitionKehy = tuple.getName();
>
>         if(partitionKehy != null) {
>
> return partitionKehy.hashCode();
>
>         }
>
>     }
>    }
>
> It’s working as expected *but *we observed inconsistent partitions when
> we run this in production env with 20 partitioner of the operator following
> the codec in the dag.
>
>    - Some operator instance didn’t process any data
>    - Some operator instance process as many tuples as combined everybody
>    else
>
>
> Questions :
>
>    - getPartition method supposed to return the actual partition or just
>    some lower bit used for deciding partition ?
>    - Number of partitions is known to application properties and can vary
>    between deployments or environments. Is it best practice to use that
>    property in the stream codec ?
>    - Any recommended hash function for getting consistent variations in
>    the lower bit with less variety of data. we’ve ~100+ categories and I’m
>    thinking to have 10+ operator partitions.
>
>
> Thanks,
> Sunil
>