You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Shubham Pathak <sh...@datatorrent.com> on 2016/02/08 11:12:45 UTC

Writing Custom Partitioner

Hi,

I need some suggestions / pointers related to defining a custom partitioner.

The operators in my application process a custom tuple class ( lets call it
TUPLE) . This data type has a single field ArrayList.. So each tuple
represents a list of values.

For a typical word count problem, my dag would be

WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter ->  <TUPLE> ->
Console

and  if i were to use  TUPLE, tokenizer will emit TUPLE that contains array
list with contents <word,count>

Now i wish to partition Counter and each instance should receive all tuples
containing same word.

I know that by default , hashCode()  method of custom tuple class would be
used , but in my case custom tuple class is an arrayList  and i wish to
specify that hashCode must be done on just the first field in ArrayList. In
a generic case it could also be on multiple fields in array list.

Do we have any examples that i could refer to ?

Also can this be done at application level by setting an attribute ?

Thanks,
Shubham

Re: Writing Custom Partitioner

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
Same for me!
Does anyone know if this is expected? Or can be a bug?

Thanks.
-Bhupesh

On Thu, Feb 18, 2016 at 6:38 PM, Shubham Pathak <sh...@datatorrent.com>
wrote:

> Extending custom stream codec from DefaultKryoStreamCodec worked for my
> case !
>
> Thanks,
> Shubham
>
> On Thu, Feb 18, 2016 at 12:48 AM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
> > Hi Bhupesh,
> >
> > I also did some experiments and found out that stream codec extended
> > from DefaultStatefulStreamCodec does not work. Extending custom stream
> > codec from DefaultKryoStreamCodec worked.
> >
> > Can anyone explain, what is the difference between
> > DefaultStatefulStreamCodec and DefaultKryoStreamCodec? and does platform
> > allow writing custom streamcodec extended from
> DefaultStatefulStreamCodec?
> > In above case it did not work. If it is expected to work then I will open
> > an Jira for the issue.
> >
> > Regards,
> > -Tushar.
> >
> >
> >
> > On Wed, Feb 17, 2016 at 6:32 PM, Bhupesh Chawda <bhupesh@datatorrent.com
> >
> > wrote:
> >
> > > Hi,
> > >
> > > The above code fragment is not working in my case as well.
> > >
> > > Basically I have a Tuple class which has a "key" component.
> > >
> > > Here is my dag:
> > > A -> B
> > >
> > > I need to be able to define Key Based stream multiplexing (same key
> goes
> > to
> > > same downstream partition) when the down stream operator B is
> > partitioned.
> > > I tried implementing this by defining a custom stream codec extending
> > > DefaultStatefulStreamCodec where I define a getPartition() function
> which
> > > returns tuple.key().hashcode(). However, I get the same distribution
> > > irrespective of what I return in the getPartition() function. I am
> using
> > > the StatelessPartitioner for partitioning the downstream operator.
> > >
> > > Any ideas on what I am doing wrong?
> > >
> > > Thanks.
> > >
> > > -Bhupesh
> > >
> > > On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak <
> shubham@datatorrent.com
> > >
> > > wrote:
> > >
> > > > Thanks Tushar.
> > > >
> > > > I implemented the codec as per your suggestion.
> > > >
> > > > To test it i did the following :
> > > >
> > > > 1. Partitioned Counter operator to have 2 instances  using
> > > >
> > > >
> > >
> >
> <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>
> > > >
> > > > 2. The tokenizer emits <word,1> . So if i were to group the stream by
> > > > second field ( index 1 ) , all words would go to same instance of
> > > counter.
> > > >
> > > >   dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC,
> > new
> > > > TupleStreamCodec(new int[] { 1 })));
> > > >
> > > > 3. Override getStreamCodec() method in counter operator's input port
> to
> > > > return new TupleStreamCodec(new int[] { 1 })));
> > > >
> > > > But i couldn't observe the desired results. I also added few debug
> > > > statements in TupleStreamCodec class, but surprisingly, i couldn't
> see
> > > them
> > > > in logs.
> > > >
> > > > Am i missing something ?
> > > >
> > > > Is the com.datatorrent.common.partitioner.StatelessPartitioner:2
> > > overriding
> > > > the behavior of TupleStreamCodec ?
> > > > But if i remove that how would i specify how many partitions i need ?
> > > >
> > > > I did take look at FraudDetectionDemo and YahooFinance demo . I
> noticed
> > > > that StreamCodec is being used by some operators the same way i am
> > using,
> > > > but no where i could see number of partitions being defined for them
> (
> > in
> > > > application.java or properties.xml )
> > > >
> > > > Could you guide me more on this.
> > > >
> > > > Thanks,
> > > > Shubham
> > > >
> > > >
> > > >
> > > > On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <
> tushar@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Hi Shubham,
> > > > >
> > > > > You can implement a custom stream codec to define your own hashCode
> > > > logic,
> > > > > A simple StateFulStreamCodec
> > > > > is as below. You specify indexes on which you want to compute the
> > > > hashCode
> > > > > in constructor of the stream codec,
> > > > > and while computing hashCode you only use elements at those
> indexes.
> > > > >
> > > > >
> > > > >   public static class TupleStreamCodec extends
> > > > > DefaultStatefulStreamCodec<Tuple>
> > > > >   {
> > > > >     private int[] indexes;
> > > > >
> > > > >     public TupleStreamCodec(int[] indexes) {
> > > > >     this.indexes = indexes;
> > > > >     }
> > > > >
> > > > >     @Override
> > > > >     public int getPartition(Tuple tuple)
> > > > >     {
> > > > >       int hashCode = 1;
> > > > >       for (int idx : indexes) {
> > > > >         hashCode = 31 * hashCode + tuple.get(i).hashCode();
> > > > >       }
> > > > >       return hashCode;
> > > > >     }
> > > > >
> > > > >   }
> > > > >
> > > > > and you can set this stream codec at input port of the Counter
> > > operator.
> > > > >     dag.setInputPortAttribute(counter.inport,
> > > > > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[]
> {0}
> > > ));
> > > > >  // partition based on first element
> > > > >
> > > > >   Note : This is sample implementation, does not handle out of
> index
> > > and
> > > > > other errors :)
> > > > >
> > > > >   - Tushar.
> > > > >
> > > > >
> > > > > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <
> > > shubham@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I need some suggestions / pointers related to defining a custom
> > > > > > partitioner.
> > > > > >
> > > > > > The operators in my application process a custom tuple class (
> lets
> > > > call
> > > > > it
> > > > > > TUPLE) . This data type has a single field ArrayList.. So each
> > tuple
> > > > > > represents a list of values.
> > > > > >
> > > > > > For a typical word count problem, my dag would be
> > > > > >
> > > > > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter ->
> > > > <TUPLE>
> > > > > ->
> > > > > > Console
> > > > > >
> > > > > > and  if i were to use  TUPLE, tokenizer will emit TUPLE that
> > contains
> > > > > array
> > > > > > list with contents <word,count>
> > > > > >
> > > > > > Now i wish to partition Counter and each instance should receive
> > all
> > > > > tuples
> > > > > > containing same word.
> > > > > >
> > > > > > I know that by default , hashCode()  method of custom tuple class
> > > would
> > > > > be
> > > > > > used , but in my case custom tuple class is an arrayList  and i
> > wish
> > > to
> > > > > > specify that hashCode must be done on just the first field in
> > > > ArrayList.
> > > > > In
> > > > > > a generic case it could also be on multiple fields in array list.
> > > > > >
> > > > > > Do we have any examples that i could refer to ?
> > > > > >
> > > > > > Also can this be done at application level by setting an
> attribute
> > ?
> > > > > >
> > > > > > Thanks,
> > > > > > Shubham
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: Writing Custom Partitioner

Posted by Shubham Pathak <sh...@datatorrent.com>.
Extending custom stream codec from DefaultKryoStreamCodec worked for my
case !

Thanks,
Shubham

On Thu, Feb 18, 2016 at 12:48 AM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi Bhupesh,
>
> I also did some experiments and found out that stream codec extended
> from DefaultStatefulStreamCodec does not work. Extending custom stream
> codec from DefaultKryoStreamCodec worked.
>
> Can anyone explain, what is the difference between
> DefaultStatefulStreamCodec and DefaultKryoStreamCodec? and does platform
> allow writing custom streamcodec extended from DefaultStatefulStreamCodec?
> In above case it did not work. If it is expected to work then I will open
> an Jira for the issue.
>
> Regards,
> -Tushar.
>
>
>
> On Wed, Feb 17, 2016 at 6:32 PM, Bhupesh Chawda <bh...@datatorrent.com>
> wrote:
>
> > Hi,
> >
> > The above code fragment is not working in my case as well.
> >
> > Basically I have a Tuple class which has a "key" component.
> >
> > Here is my dag:
> > A -> B
> >
> > I need to be able to define Key Based stream multiplexing (same key goes
> to
> > same downstream partition) when the down stream operator B is
> partitioned.
> > I tried implementing this by defining a custom stream codec extending
> > DefaultStatefulStreamCodec where I define a getPartition() function which
> > returns tuple.key().hashcode(). However, I get the same distribution
> > irrespective of what I return in the getPartition() function. I am using
> > the StatelessPartitioner for partitioning the downstream operator.
> >
> > Any ideas on what I am doing wrong?
> >
> > Thanks.
> >
> > -Bhupesh
> >
> > On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak <shubham@datatorrent.com
> >
> > wrote:
> >
> > > Thanks Tushar.
> > >
> > > I implemented the codec as per your suggestion.
> > >
> > > To test it i did the following :
> > >
> > > 1. Partitioned Counter operator to have 2 instances  using
> > >
> > >
> >
> <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>
> > >
> > > 2. The tokenizer emits <word,1> . So if i were to group the stream by
> > > second field ( index 1 ) , all words would go to same instance of
> > counter.
> > >
> > >   dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC,
> new
> > > TupleStreamCodec(new int[] { 1 })));
> > >
> > > 3. Override getStreamCodec() method in counter operator's input port to
> > > return new TupleStreamCodec(new int[] { 1 })));
> > >
> > > But i couldn't observe the desired results. I also added few debug
> > > statements in TupleStreamCodec class, but surprisingly, i couldn't see
> > them
> > > in logs.
> > >
> > > Am i missing something ?
> > >
> > > Is the com.datatorrent.common.partitioner.StatelessPartitioner:2
> > overriding
> > > the behavior of TupleStreamCodec ?
> > > But if i remove that how would i specify how many partitions i need ?
> > >
> > > I did take look at FraudDetectionDemo and YahooFinance demo . I noticed
> > > that StreamCodec is being used by some operators the same way i am
> using,
> > > but no where i could see number of partitions being defined for them (
> in
> > > application.java or properties.xml )
> > >
> > > Could you guide me more on this.
> > >
> > > Thanks,
> > > Shubham
> > >
> > >
> > >
> > > On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <tu...@datatorrent.com>
> > > wrote:
> > >
> > > > Hi Shubham,
> > > >
> > > > You can implement a custom stream codec to define your own hashCode
> > > logic,
> > > > A simple StateFulStreamCodec
> > > > is as below. You specify indexes on which you want to compute the
> > > hashCode
> > > > in constructor of the stream codec,
> > > > and while computing hashCode you only use elements at those indexes.
> > > >
> > > >
> > > >   public static class TupleStreamCodec extends
> > > > DefaultStatefulStreamCodec<Tuple>
> > > >   {
> > > >     private int[] indexes;
> > > >
> > > >     public TupleStreamCodec(int[] indexes) {
> > > >     this.indexes = indexes;
> > > >     }
> > > >
> > > >     @Override
> > > >     public int getPartition(Tuple tuple)
> > > >     {
> > > >       int hashCode = 1;
> > > >       for (int idx : indexes) {
> > > >         hashCode = 31 * hashCode + tuple.get(i).hashCode();
> > > >       }
> > > >       return hashCode;
> > > >     }
> > > >
> > > >   }
> > > >
> > > > and you can set this stream codec at input port of the Counter
> > operator.
> > > >     dag.setInputPortAttribute(counter.inport,
> > > > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0}
> > ));
> > > >  // partition based on first element
> > > >
> > > >   Note : This is sample implementation, does not handle out of index
> > and
> > > > other errors :)
> > > >
> > > >   - Tushar.
> > > >
> > > >
> > > > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <
> > shubham@datatorrent.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I need some suggestions / pointers related to defining a custom
> > > > > partitioner.
> > > > >
> > > > > The operators in my application process a custom tuple class ( lets
> > > call
> > > > it
> > > > > TUPLE) . This data type has a single field ArrayList.. So each
> tuple
> > > > > represents a list of values.
> > > > >
> > > > > For a typical word count problem, my dag would be
> > > > >
> > > > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter ->
> > > <TUPLE>
> > > > ->
> > > > > Console
> > > > >
> > > > > and  if i were to use  TUPLE, tokenizer will emit TUPLE that
> contains
> > > > array
> > > > > list with contents <word,count>
> > > > >
> > > > > Now i wish to partition Counter and each instance should receive
> all
> > > > tuples
> > > > > containing same word.
> > > > >
> > > > > I know that by default , hashCode()  method of custom tuple class
> > would
> > > > be
> > > > > used , but in my case custom tuple class is an arrayList  and i
> wish
> > to
> > > > > specify that hashCode must be done on just the first field in
> > > ArrayList.
> > > > In
> > > > > a generic case it could also be on multiple fields in array list.
> > > > >
> > > > > Do we have any examples that i could refer to ?
> > > > >
> > > > > Also can this be done at application level by setting an attribute
> ?
> > > > >
> > > > > Thanks,
> > > > > Shubham
> > > > >
> > > >
> > >
> >
>

Re: Writing Custom Partitioner

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi Bhupesh,

I also did some experiments and found out that stream codec extended
from DefaultStatefulStreamCodec does not work. Extending custom stream
codec from DefaultKryoStreamCodec worked.

Can anyone explain, what is the difference between
DefaultStatefulStreamCodec and DefaultKryoStreamCodec? and does platform
allow writing custom streamcodec extended from DefaultStatefulStreamCodec?
In above case it did not work. If it is expected to work then I will open
an Jira for the issue.

Regards,
-Tushar.



On Wed, Feb 17, 2016 at 6:32 PM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> Hi,
>
> The above code fragment is not working in my case as well.
>
> Basically I have a Tuple class which has a "key" component.
>
> Here is my dag:
> A -> B
>
> I need to be able to define Key Based stream multiplexing (same key goes to
> same downstream partition) when the down stream operator B is partitioned.
> I tried implementing this by defining a custom stream codec extending
> DefaultStatefulStreamCodec where I define a getPartition() function which
> returns tuple.key().hashcode(). However, I get the same distribution
> irrespective of what I return in the getPartition() function. I am using
> the StatelessPartitioner for partitioning the downstream operator.
>
> Any ideas on what I am doing wrong?
>
> Thanks.
>
> -Bhupesh
>
> On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak <sh...@datatorrent.com>
> wrote:
>
> > Thanks Tushar.
> >
> > I implemented the codec as per your suggestion.
> >
> > To test it i did the following :
> >
> > 1. Partitioned Counter operator to have 2 instances  using
> >
> >
> <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>
> >
> > 2. The tokenizer emits <word,1> . So if i were to group the stream by
> > second field ( index 1 ) , all words would go to same instance of
> counter.
> >
> >   dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
> > TupleStreamCodec(new int[] { 1 })));
> >
> > 3. Override getStreamCodec() method in counter operator's input port to
> > return new TupleStreamCodec(new int[] { 1 })));
> >
> > But i couldn't observe the desired results. I also added few debug
> > statements in TupleStreamCodec class, but surprisingly, i couldn't see
> them
> > in logs.
> >
> > Am i missing something ?
> >
> > Is the com.datatorrent.common.partitioner.StatelessPartitioner:2
> overriding
> > the behavior of TupleStreamCodec ?
> > But if i remove that how would i specify how many partitions i need ?
> >
> > I did take look at FraudDetectionDemo and YahooFinance demo . I noticed
> > that StreamCodec is being used by some operators the same way i am using,
> > but no where i could see number of partitions being defined for them ( in
> > application.java or properties.xml )
> >
> > Could you guide me more on this.
> >
> > Thanks,
> > Shubham
> >
> >
> >
> > On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <tu...@datatorrent.com>
> > wrote:
> >
> > > Hi Shubham,
> > >
> > > You can implement a custom stream codec to define your own hashCode
> > logic,
> > > A simple StateFulStreamCodec
> > > is as below. You specify indexes on which you want to compute the
> > hashCode
> > > in constructor of the stream codec,
> > > and while computing hashCode you only use elements at those indexes.
> > >
> > >
> > >   public static class TupleStreamCodec extends
> > > DefaultStatefulStreamCodec<Tuple>
> > >   {
> > >     private int[] indexes;
> > >
> > >     public TupleStreamCodec(int[] indexes) {
> > >     this.indexes = indexes;
> > >     }
> > >
> > >     @Override
> > >     public int getPartition(Tuple tuple)
> > >     {
> > >       int hashCode = 1;
> > >       for (int idx : indexes) {
> > >         hashCode = 31 * hashCode + tuple.get(i).hashCode();
> > >       }
> > >       return hashCode;
> > >     }
> > >
> > >   }
> > >
> > > and you can set this stream codec at input port of the Counter
> operator.
> > >     dag.setInputPortAttribute(counter.inport,
> > > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0}
> ));
> > >  // partition based on first element
> > >
> > >   Note : This is sample implementation, does not handle out of index
> and
> > > other errors :)
> > >
> > >   - Tushar.
> > >
> > >
> > > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <
> shubham@datatorrent.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I need some suggestions / pointers related to defining a custom
> > > > partitioner.
> > > >
> > > > The operators in my application process a custom tuple class ( lets
> > call
> > > it
> > > > TUPLE) . This data type has a single field ArrayList.. So each tuple
> > > > represents a list of values.
> > > >
> > > > For a typical word count problem, my dag would be
> > > >
> > > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter ->
> > <TUPLE>
> > > ->
> > > > Console
> > > >
> > > > and  if i were to use  TUPLE, tokenizer will emit TUPLE that contains
> > > array
> > > > list with contents <word,count>
> > > >
> > > > Now i wish to partition Counter and each instance should receive all
> > > tuples
> > > > containing same word.
> > > >
> > > > I know that by default , hashCode()  method of custom tuple class
> would
> > > be
> > > > used , but in my case custom tuple class is an arrayList  and i wish
> to
> > > > specify that hashCode must be done on just the first field in
> > ArrayList.
> > > In
> > > > a generic case it could also be on multiple fields in array list.
> > > >
> > > > Do we have any examples that i could refer to ?
> > > >
> > > > Also can this be done at application level by setting an attribute ?
> > > >
> > > > Thanks,
> > > > Shubham
> > > >
> > >
> >
>

Re: Writing Custom Partitioner

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
Hi,

The above code fragment is not working in my case as well.

Basically I have a Tuple class which has a "key" component.

Here is my dag:
A -> B

I need to be able to define Key Based stream multiplexing (same key goes to
same downstream partition) when the down stream operator B is partitioned.
I tried implementing this by defining a custom stream codec extending
DefaultStatefulStreamCodec where I define a getPartition() function which
returns tuple.key().hashcode(). However, I get the same distribution
irrespective of what I return in the getPartition() function. I am using
the StatelessPartitioner for partitioning the downstream operator.

Any ideas on what I am doing wrong?

Thanks.

-Bhupesh

On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak <sh...@datatorrent.com>
wrote:

> Thanks Tushar.
>
> I implemented the codec as per your suggestion.
>
> To test it i did the following :
>
> 1. Partitioned Counter operator to have 2 instances  using
>
> <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>
>
> 2. The tokenizer emits <word,1> . So if i were to group the stream by
> second field ( index 1 ) , all words would go to same instance of counter.
>
>   dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
> TupleStreamCodec(new int[] { 1 })));
>
> 3. Override getStreamCodec() method in counter operator's input port to
> return new TupleStreamCodec(new int[] { 1 })));
>
> But i couldn't observe the desired results. I also added few debug
> statements in TupleStreamCodec class, but surprisingly, i couldn't see them
> in logs.
>
> Am i missing something ?
>
> Is the com.datatorrent.common.partitioner.StatelessPartitioner:2 overriding
> the behavior of TupleStreamCodec ?
> But if i remove that how would i specify how many partitions i need ?
>
> I did take look at FraudDetectionDemo and YahooFinance demo . I noticed
> that StreamCodec is being used by some operators the same way i am using,
> but no where i could see number of partitions being defined for them ( in
> application.java or properties.xml )
>
> Could you guide me more on this.
>
> Thanks,
> Shubham
>
>
>
> On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
> > Hi Shubham,
> >
> > You can implement a custom stream codec to define your own hashCode
> logic,
> > A simple StateFulStreamCodec
> > is as below. You specify indexes on which you want to compute the
> hashCode
> > in constructor of the stream codec,
> > and while computing hashCode you only use elements at those indexes.
> >
> >
> >   public static class TupleStreamCodec extends
> > DefaultStatefulStreamCodec<Tuple>
> >   {
> >     private int[] indexes;
> >
> >     public TupleStreamCodec(int[] indexes) {
> >     this.indexes = indexes;
> >     }
> >
> >     @Override
> >     public int getPartition(Tuple tuple)
> >     {
> >       int hashCode = 1;
> >       for (int idx : indexes) {
> >         hashCode = 31 * hashCode + tuple.get(i).hashCode();
> >       }
> >       return hashCode;
> >     }
> >
> >   }
> >
> > and you can set this stream codec at input port of the Counter operator.
> >     dag.setInputPortAttribute(counter.inport,
> > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0} ));
> >  // partition based on first element
> >
> >   Note : This is sample implementation, does not handle out of index and
> > other errors :)
> >
> >   - Tushar.
> >
> >
> > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <sh...@datatorrent.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I need some suggestions / pointers related to defining a custom
> > > partitioner.
> > >
> > > The operators in my application process a custom tuple class ( lets
> call
> > it
> > > TUPLE) . This data type has a single field ArrayList.. So each tuple
> > > represents a list of values.
> > >
> > > For a typical word count problem, my dag would be
> > >
> > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter ->
> <TUPLE>
> > ->
> > > Console
> > >
> > > and  if i were to use  TUPLE, tokenizer will emit TUPLE that contains
> > array
> > > list with contents <word,count>
> > >
> > > Now i wish to partition Counter and each instance should receive all
> > tuples
> > > containing same word.
> > >
> > > I know that by default , hashCode()  method of custom tuple class would
> > be
> > > used , but in my case custom tuple class is an arrayList  and i wish to
> > > specify that hashCode must be done on just the first field in
> ArrayList.
> > In
> > > a generic case it could also be on multiple fields in array list.
> > >
> > > Do we have any examples that i could refer to ?
> > >
> > > Also can this be done at application level by setting an attribute ?
> > >
> > > Thanks,
> > > Shubham
> > >
> >
>

Re: Writing Custom Partitioner

Posted by Shubham Pathak <sh...@datatorrent.com>.
Thanks Tushar.

I implemented the codec as per your suggestion.

To test it i did the following :

1. Partitioned Counter operator to have 2 instances  using
<configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>

2. The tokenizer emits <word,1> . So if i were to group the stream by
second field ( index 1 ) , all words would go to same instance of counter.

  dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
TupleStreamCodec(new int[] { 1 })));

3. Override getStreamCodec() method in counter operator's input port to
return new TupleStreamCodec(new int[] { 1 })));

But i couldn't observe the desired results. I also added few debug
statements in TupleStreamCodec class, but surprisingly, i couldn't see them
in logs.

Am i missing something ?

Is the com.datatorrent.common.partitioner.StatelessPartitioner:2 overriding
the behavior of TupleStreamCodec ?
But if i remove that how would i specify how many partitions i need ?

I did take look at FraudDetectionDemo and YahooFinance demo . I noticed
that StreamCodec is being used by some operators the same way i am using,
but no where i could see number of partitions being defined for them ( in
application.java or properties.xml )

Could you guide me more on this.

Thanks,
Shubham



On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi Shubham,
>
> You can implement a custom stream codec to define your own hashCode logic,
> A simple StateFulStreamCodec
> is as below. You specify indexes on which you want to compute the hashCode
> in constructor of the stream codec,
> and while computing hashCode you only use elements at those indexes.
>
>
>   public static class TupleStreamCodec extends
> DefaultStatefulStreamCodec<Tuple>
>   {
>     private int[] indexes;
>
>     public TupleStreamCodec(int[] indexes) {
>     this.indexes = indexes;
>     }
>
>     @Override
>     public int getPartition(Tuple tuple)
>     {
>       int hashCode = 1;
>       for (int idx : indexes) {
>         hashCode = 31 * hashCode + tuple.get(i).hashCode();
>       }
>       return hashCode;
>     }
>
>   }
>
> and you can set this stream codec at input port of the Counter operator.
>     dag.setInputPortAttribute(counter.inport,
> Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0} ));
>  // partition based on first element
>
>   Note : This is sample implementation, does not handle out of index and
> other errors :)
>
>   - Tushar.
>
>
> On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <sh...@datatorrent.com>
> wrote:
>
> > Hi,
> >
> > I need some suggestions / pointers related to defining a custom
> > partitioner.
> >
> > The operators in my application process a custom tuple class ( lets call
> it
> > TUPLE) . This data type has a single field ArrayList.. So each tuple
> > represents a list of values.
> >
> > For a typical word count problem, my dag would be
> >
> > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter ->  <TUPLE>
> ->
> > Console
> >
> > and  if i were to use  TUPLE, tokenizer will emit TUPLE that contains
> array
> > list with contents <word,count>
> >
> > Now i wish to partition Counter and each instance should receive all
> tuples
> > containing same word.
> >
> > I know that by default , hashCode()  method of custom tuple class would
> be
> > used , but in my case custom tuple class is an arrayList  and i wish to
> > specify that hashCode must be done on just the first field in ArrayList.
> In
> > a generic case it could also be on multiple fields in array list.
> >
> > Do we have any examples that i could refer to ?
> >
> > Also can this be done at application level by setting an attribute ?
> >
> > Thanks,
> > Shubham
> >
>

Re: Writing Custom Partitioner

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi Shubham,

You can implement a custom stream codec to define your own hashCode logic,
A simple StateFulStreamCodec
is as below. You specify indexes on which you want to compute the hashCode
in constructor of the stream codec,
and while computing hashCode you only use elements at those indexes.


  public static class TupleStreamCodec extends
DefaultStatefulStreamCodec<Tuple>
  {
    private int[] indexes;

    public TupleStreamCodec(int[] indexes) {
    this.indexes = indexes;
    }

    @Override
    public int getPartition(Tuple tuple)
    {
      int hashCode = 1;
      for (int idx : indexes) {
        hashCode = 31 * hashCode + tuple.get(i).hashCode();
      }
      return hashCode;
    }

  }

and you can set this stream codec at input port of the Counter operator.
    dag.setInputPortAttribute(counter.inport,
Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0} ));
 // partition based on first element

  Note : This is sample implementation, does not handle out of index and
other errors :)

  - Tushar.


On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <sh...@datatorrent.com>
wrote:

> Hi,
>
> I need some suggestions / pointers related to defining a custom
> partitioner.
>
> The operators in my application process a custom tuple class ( lets call it
> TUPLE) . This data type has a single field ArrayList.. So each tuple
> represents a list of values.
>
> For a typical word count problem, my dag would be
>
> WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter ->  <TUPLE> ->
> Console
>
> and  if i were to use  TUPLE, tokenizer will emit TUPLE that contains array
> list with contents <word,count>
>
> Now i wish to partition Counter and each instance should receive all tuples
> containing same word.
>
> I know that by default , hashCode()  method of custom tuple class would be
> used , but in my case custom tuple class is an arrayList  and i wish to
> specify that hashCode must be done on just the first field in ArrayList. In
> a generic case it could also be on multiple fields in array list.
>
> Do we have any examples that i could refer to ?
>
> Also can this be done at application level by setting an attribute ?
>
> Thanks,
> Shubham
>