You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Shunxin Lu <lu...@gmail.com> on 2016/08/12 18:03:55 UTC

Join Support

Hello there,

I am planning to add join support in Windowed Operator, but need some
advice on how to start.
Currently I am thinking to add a new subclass inheriting
AbstractWindowedOperator and do all the work we need in that class (add
more input ports, do join accumulation, etc.), but I am experiencing some
difficulties doing so. Or should I directly change the codes in
AbstractWindowedOperator?

Thanks,
Shunxin

Re: Join Support

Posted by Shunxin Lu <lu...@gmail.com>.
Thanks David. That's very helpful! I will continue to work on that and let
you know once I encounter more problems.

On Tue, Aug 16, 2016 at 2:02 PM, David Yan <da...@datatorrent.com> wrote:

> Hi Shunxin,
>
> How about declaring the JoinWindowedOperator interface something like this:
>
> public interface JoinWindowedOperator<InputT1, InputT2, InputT3, InputT4,
> InputT5>
>     extends WindowedOperator<InputT1>
> {
>   void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
>   void accumulateTuple3(Tuple.WindowedTuple<InputT3> tuple);
>   void accumulateTuple4(Tuple.WindowedTuple<InputT4> tuple);
>   void accumulateTuple5(Tuple.WindowedTuple<InputT5> tuple);
>
>   void processWatermark2(ControlTuple.Watermark watermark);
>
>   void processWatermark3(ControlTuple.Watermark watermark);
>
>   void processWatermark4(ControlTuple.Watermark watermark);
>
>   void processWatermark5(ControlTuple.Watermark watermark);
>
> }
>
> then have the AbstractJoinWindowedOperator clared like this:
>
> public abstract class AbstractJoinWindowedOperator<InputT1, InputT2,
> InputT3, InputT4, InputT5, OutputT, DataStorageT extends WindowedStorage,
> RetractionStorageT extends WindowedStorage, AccumulationT extends
> JoinAccumulation>
>     extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT,
> RetractionStorageT, AccumulationT>
>     implements JoinWindowedOperator<InputT1, InputT2, InputT3, InputT4,
> InputT5>
> {
>   ...
> }
>
> David
>
> On Tue, Aug 16, 2016 at 1:19 PM, Shunxin Lu <lu...@gmail.com> wrote:
>
> > Hi David,
> >
> > Thanks for the reply. I think I will need to reconsider the whole
> situation
> > again base on your input.
> > The main problem that I had was, every input port has its own type, how
> can
> > I write methods that can handle all of them?
> >
> > Thanks,
> > Shunxin
> >
> > On Tue, Aug 16, 2016 at 12:49 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Also, regarding the difficulties you mentioned about a new subclass
> > > inheriting AbstractWindowedOperator, what specifically are they?
> > >
> > > David
> > >
> > > On Tue, Aug 16, 2016 at 12:31 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Hi Shunxin,
> > > >
> > > > One problem with join support using WindowedOperator is that Apex
> > > operator
> > > > does not support variable number of ports so we might have to limit
> the
> > > > join operator to, say, 5 input ports. Implementing join support for
> > > > WindowedOperator should not be difficult, but might be a little messy
> > > > because we will need to have a watermark control port for each
> regular
> > > > input port, making it 10 total input ports if we support a maximum
> of 5
> > > > join inputs.
> > > >
> > > > Please take a look at the JoinAccumulation template interface. That
> was
> > > > there for the future join support I planned to add.
> > > >
> > > > Also, pay a bit of attention on how you process watermarks from each
> > > > input, and let me know if you need help.
> > > >
> > > > David
> > > >
> > > > On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com>
> > > wrote:
> > > >
> > > >> Hello there,
> > > >>
> > > >> I am planning to add join support in Windowed Operator, but need
> some
> > > >> advice on how to start.
> > > >> Currently I am thinking to add a new subclass inheriting
> > > >> AbstractWindowedOperator and do all the work we need in that class
> > (add
> > > >> more input ports, do join accumulation, etc.), but I am experiencing
> > > some
> > > >> difficulties doing so. Or should I directly change the codes in
> > > >> AbstractWindowedOperator?
> > > >>
> > > >> Thanks,
> > > >> Shunxin
> > > >>
> > > >
> > > >
> > >
> >
>

Re: Join Support

Posted by David Yan <da...@datatorrent.com>.
Hi Shunxin,

How about declaring the JoinWindowedOperator interface something like this:

public interface JoinWindowedOperator<InputT1, InputT2, InputT3, InputT4,
InputT5>
    extends WindowedOperator<InputT1>
{
  void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
  void accumulateTuple3(Tuple.WindowedTuple<InputT3> tuple);
  void accumulateTuple4(Tuple.WindowedTuple<InputT4> tuple);
  void accumulateTuple5(Tuple.WindowedTuple<InputT5> tuple);

  void processWatermark2(ControlTuple.Watermark watermark);

  void processWatermark3(ControlTuple.Watermark watermark);

  void processWatermark4(ControlTuple.Watermark watermark);

  void processWatermark5(ControlTuple.Watermark watermark);

}

then have the AbstractJoinWindowedOperator clared like this:

public abstract class AbstractJoinWindowedOperator<InputT1, InputT2,
InputT3, InputT4, InputT5, OutputT, DataStorageT extends WindowedStorage,
RetractionStorageT extends WindowedStorage, AccumulationT extends
JoinAccumulation>
    extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT,
RetractionStorageT, AccumulationT>
    implements JoinWindowedOperator<InputT1, InputT2, InputT3, InputT4,
InputT5>
{
  ...
}

David

On Tue, Aug 16, 2016 at 1:19 PM, Shunxin Lu <lu...@gmail.com> wrote:

> Hi David,
>
> Thanks for the reply. I think I will need to reconsider the whole situation
> again base on your input.
> The main problem that I had was, every input port has its own type, how can
> I write methods that can handle all of them?
>
> Thanks,
> Shunxin
>
> On Tue, Aug 16, 2016 at 12:49 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Also, regarding the difficulties you mentioned about a new subclass
> > inheriting AbstractWindowedOperator, what specifically are they?
> >
> > David
> >
> > On Tue, Aug 16, 2016 at 12:31 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Hi Shunxin,
> > >
> > > One problem with join support using WindowedOperator is that Apex
> > operator
> > > does not support variable number of ports so we might have to limit the
> > > join operator to, say, 5 input ports. Implementing join support for
> > > WindowedOperator should not be difficult, but might be a little messy
> > > because we will need to have a watermark control port for each regular
> > > input port, making it 10 total input ports if we support a maximum of 5
> > > join inputs.
> > >
> > > Please take a look at the JoinAccumulation template interface. That was
> > > there for the future join support I planned to add.
> > >
> > > Also, pay a bit of attention on how you process watermarks from each
> > > input, and let me know if you need help.
> > >
> > > David
> > >
> > > On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com>
> > wrote:
> > >
> > >> Hello there,
> > >>
> > >> I am planning to add join support in Windowed Operator, but need some
> > >> advice on how to start.
> > >> Currently I am thinking to add a new subclass inheriting
> > >> AbstractWindowedOperator and do all the work we need in that class
> (add
> > >> more input ports, do join accumulation, etc.), but I am experiencing
> > some
> > >> difficulties doing so. Or should I directly change the codes in
> > >> AbstractWindowedOperator?
> > >>
> > >> Thanks,
> > >> Shunxin
> > >>
> > >
> > >
> >
>

Re: Join Support

Posted by Shunxin Lu <lu...@gmail.com>.
Hi David,

Thanks for the reply. I think I will need to reconsider the whole situation
again base on your input.
The main problem that I had was, every input port has its own type, how can
I write methods that can handle all of them?

Thanks,
Shunxin

On Tue, Aug 16, 2016 at 12:49 PM, David Yan <da...@datatorrent.com> wrote:

> Also, regarding the difficulties you mentioned about a new subclass
> inheriting AbstractWindowedOperator, what specifically are they?
>
> David
>
> On Tue, Aug 16, 2016 at 12:31 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi Shunxin,
> >
> > One problem with join support using WindowedOperator is that Apex
> operator
> > does not support variable number of ports so we might have to limit the
> > join operator to, say, 5 input ports. Implementing join support for
> > WindowedOperator should not be difficult, but might be a little messy
> > because we will need to have a watermark control port for each regular
> > input port, making it 10 total input ports if we support a maximum of 5
> > join inputs.
> >
> > Please take a look at the JoinAccumulation template interface. That was
> > there for the future join support I planned to add.
> >
> > Also, pay a bit of attention on how you process watermarks from each
> > input, and let me know if you need help.
> >
> > David
> >
> > On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com>
> wrote:
> >
> >> Hello there,
> >>
> >> I am planning to add join support in Windowed Operator, but need some
> >> advice on how to start.
> >> Currently I am thinking to add a new subclass inheriting
> >> AbstractWindowedOperator and do all the work we need in that class (add
> >> more input ports, do join accumulation, etc.), but I am experiencing
> some
> >> difficulties doing so. Or should I directly change the codes in
> >> AbstractWindowedOperator?
> >>
> >> Thanks,
> >> Shunxin
> >>
> >
> >
>

Re: Join Support

Posted by David Yan <da...@datatorrent.com>.
Also, regarding the difficulties you mentioned about a new subclass
inheriting AbstractWindowedOperator, what specifically are they?

David

On Tue, Aug 16, 2016 at 12:31 PM, David Yan <da...@datatorrent.com> wrote:

> Hi Shunxin,
>
> One problem with join support using WindowedOperator is that Apex operator
> does not support variable number of ports so we might have to limit the
> join operator to, say, 5 input ports. Implementing join support for
> WindowedOperator should not be difficult, but might be a little messy
> because we will need to have a watermark control port for each regular
> input port, making it 10 total input ports if we support a maximum of 5
> join inputs.
>
> Please take a look at the JoinAccumulation template interface. That was
> there for the future join support I planned to add.
>
> Also, pay a bit of attention on how you process watermarks from each
> input, and let me know if you need help.
>
> David
>
> On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com> wrote:
>
>> Hello there,
>>
>> I am planning to add join support in Windowed Operator, but need some
>> advice on how to start.
>> Currently I am thinking to add a new subclass inheriting
>> AbstractWindowedOperator and do all the work we need in that class (add
>> more input ports, do join accumulation, etc.), but I am experiencing some
>> difficulties doing so. Or should I directly change the codes in
>> AbstractWindowedOperator?
>>
>> Thanks,
>> Shunxin
>>
>
>

Re: Join Support

Posted by David Yan <da...@datatorrent.com>.
Hi Shunxin,

One problem with join support using WindowedOperator is that Apex operator
does not support variable number of ports so we might have to limit the
join operator to, say, 5 input ports. Implementing join support for
WindowedOperator should not be difficult, but might be a little messy
because we will need to have a watermark control port for each regular
input port, making it 10 total input ports if we support a maximum of 5
join inputs.

Please take a look at the JoinAccumulation template interface. That was
there for the future join support I planned to add.

Also, pay a bit of attention on how you process watermarks from each input,
and let me know if you need help.

David

On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com> wrote:

> Hello there,
>
> I am planning to add join support in Windowed Operator, but need some
> advice on how to start.
> Currently I am thinking to add a new subclass inheriting
> AbstractWindowedOperator and do all the work we need in that class (add
> more input ports, do join accumulation, etc.), but I am experiencing some
> difficulties doing so. Or should I directly change the codes in
> AbstractWindowedOperator?
>
> Thanks,
> Shunxin
>