You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Shunxin Lu <lu...@gmail.com> on 2016/08/12 18:03:55 UTC
Join Support
Hello there,
I am planning to add join support in Windowed Operator, but need some
advice on how to start.
Currently I am thinking to add a new subclass inheriting
AbstractWindowedOperator and do all the work we need in that class (add
more input ports, do join accumulation, etc.), but I am experiencing some
difficulties doing so. Or should I directly change the codes in
AbstractWindowedOperator?
Thanks,
Shunxin
Re: Join Support
Posted by Shunxin Lu <lu...@gmail.com>.
Thanks David. That's very helpful! I will continue to work on that and let
you know once I encounter more problems.
On Tue, Aug 16, 2016 at 2:02 PM, David Yan <da...@datatorrent.com> wrote:
> Hi Shunxin,
>
> How about declaring the JoinWindowedOperator interface something like this:
>
> public interface JoinWindowedOperator<InputT1, InputT2, InputT3, InputT4,
> InputT5>
> extends WindowedOperator<InputT1>
> {
> void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
> void accumulateTuple3(Tuple.WindowedTuple<InputT3> tuple);
> void accumulateTuple4(Tuple.WindowedTuple<InputT4> tuple);
> void accumulateTuple5(Tuple.WindowedTuple<InputT5> tuple);
>
> void processWatermark2(ControlTuple.Watermark watermark);
>
> void processWatermark3(ControlTuple.Watermark watermark);
>
> void processWatermark4(ControlTuple.Watermark watermark);
>
> void processWatermark5(ControlTuple.Watermark watermark);
>
> }
>
> then have the AbstractJoinWindowedOperator clared like this:
>
> public abstract class AbstractJoinWindowedOperator<InputT1, InputT2,
> InputT3, InputT4, InputT5, OutputT, DataStorageT extends WindowedStorage,
> RetractionStorageT extends WindowedStorage, AccumulationT extends
> JoinAccumulation>
> extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT,
> RetractionStorageT, AccumulationT>
> implements JoinWindowedOperator<InputT1, InputT2, InputT3, InputT4,
> InputT5>
> {
> ...
> }
>
> David
>
> On Tue, Aug 16, 2016 at 1:19 PM, Shunxin Lu <lu...@gmail.com> wrote:
>
> > Hi David,
> >
> > Thanks for the reply. I think I will need to reconsider the whole
> situation
> > again base on your input.
> > The main problem that I had was, every input port has its own type, how
> can
> > I write methods that can handle all of them?
> >
> > Thanks,
> > Shunxin
> >
> > On Tue, Aug 16, 2016 at 12:49 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Also, regarding the difficulties you mentioned about a new subclass
> > > inheriting AbstractWindowedOperator, what specifically are they?
> > >
> > > David
> > >
> > > On Tue, Aug 16, 2016 at 12:31 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Hi Shunxin,
> > > >
> > > > One problem with join support using WindowedOperator is that Apex
> > > operator
> > > > does not support variable number of ports so we might have to limit
> the
> > > > join operator to, say, 5 input ports. Implementing join support for
> > > > WindowedOperator should not be difficult, but might be a little messy
> > > > because we will need to have a watermark control port for each
> regular
> > > > input port, making it 10 total input ports if we support a maximum
> of 5
> > > > join inputs.
> > > >
> > > > Please take a look at the JoinAccumulation template interface. That
> was
> > > > there for the future join support I planned to add.
> > > >
> > > > Also, pay a bit of attention on how you process watermarks from each
> > > > input, and let me know if you need help.
> > > >
> > > > David
> > > >
> > > > On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com>
> > > wrote:
> > > >
> > > >> Hello there,
> > > >>
> > > >> I am planning to add join support in Windowed Operator, but need
> some
> > > >> advice on how to start.
> > > >> Currently I am thinking to add a new subclass inheriting
> > > >> AbstractWindowedOperator and do all the work we need in that class
> > (add
> > > >> more input ports, do join accumulation, etc.), but I am experiencing
> > > some
> > > >> difficulties doing so. Or should I directly change the codes in
> > > >> AbstractWindowedOperator?
> > > >>
> > > >> Thanks,
> > > >> Shunxin
> > > >>
> > > >
> > > >
> > >
> >
>
Re: Join Support
Posted by David Yan <da...@datatorrent.com>.
Hi Shunxin,
How about declaring the JoinWindowedOperator interface something like this:
public interface JoinWindowedOperator<InputT1, InputT2, InputT3, InputT4,
InputT5>
extends WindowedOperator<InputT1>
{
void accumulateTuple2(Tuple.WindowedTuple<InputT2> tuple);
void accumulateTuple3(Tuple.WindowedTuple<InputT3> tuple);
void accumulateTuple4(Tuple.WindowedTuple<InputT4> tuple);
void accumulateTuple5(Tuple.WindowedTuple<InputT5> tuple);
void processWatermark2(ControlTuple.Watermark watermark);
void processWatermark3(ControlTuple.Watermark watermark);
void processWatermark4(ControlTuple.Watermark watermark);
void processWatermark5(ControlTuple.Watermark watermark);
}
then have the AbstractJoinWindowedOperator clared like this:
public abstract class AbstractJoinWindowedOperator<InputT1, InputT2,
InputT3, InputT4, InputT5, OutputT, DataStorageT extends WindowedStorage,
RetractionStorageT extends WindowedStorage, AccumulationT extends
JoinAccumulation>
extends AbstractWindowedOperator<InputT1, OutputT, DataStorageT,
RetractionStorageT, AccumulationT>
implements JoinWindowedOperator<InputT1, InputT2, InputT3, InputT4,
InputT5>
{
...
}
David
On Tue, Aug 16, 2016 at 1:19 PM, Shunxin Lu <lu...@gmail.com> wrote:
> Hi David,
>
> Thanks for the reply. I think I will need to reconsider the whole situation
> again base on your input.
> The main problem that I had was, every input port has its own type, how can
> I write methods that can handle all of them?
>
> Thanks,
> Shunxin
>
> On Tue, Aug 16, 2016 at 12:49 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Also, regarding the difficulties you mentioned about a new subclass
> > inheriting AbstractWindowedOperator, what specifically are they?
> >
> > David
> >
> > On Tue, Aug 16, 2016 at 12:31 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Hi Shunxin,
> > >
> > > One problem with join support using WindowedOperator is that Apex
> > operator
> > > does not support variable number of ports so we might have to limit the
> > > join operator to, say, 5 input ports. Implementing join support for
> > > WindowedOperator should not be difficult, but might be a little messy
> > > because we will need to have a watermark control port for each regular
> > > input port, making it 10 total input ports if we support a maximum of 5
> > > join inputs.
> > >
> > > Please take a look at the JoinAccumulation template interface. That was
> > > there for the future join support I planned to add.
> > >
> > > Also, pay a bit of attention on how you process watermarks from each
> > > input, and let me know if you need help.
> > >
> > > David
> > >
> > > On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com>
> > wrote:
> > >
> > >> Hello there,
> > >>
> > >> I am planning to add join support in Windowed Operator, but need some
> > >> advice on how to start.
> > >> Currently I am thinking to add a new subclass inheriting
> > >> AbstractWindowedOperator and do all the work we need in that class
> (add
> > >> more input ports, do join accumulation, etc.), but I am experiencing
> > some
> > >> difficulties doing so. Or should I directly change the codes in
> > >> AbstractWindowedOperator?
> > >>
> > >> Thanks,
> > >> Shunxin
> > >>
> > >
> > >
> >
>
Re: Join Support
Posted by Shunxin Lu <lu...@gmail.com>.
Hi David,
Thanks for the reply. I think I will need to reconsider the whole situation
again base on your input.
The main problem that I had was, every input port has its own type, how can
I write methods that can handle all of them?
Thanks,
Shunxin
On Tue, Aug 16, 2016 at 12:49 PM, David Yan <da...@datatorrent.com> wrote:
> Also, regarding the difficulties you mentioned about a new subclass
> inheriting AbstractWindowedOperator, what specifically are they?
>
> David
>
> On Tue, Aug 16, 2016 at 12:31 PM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi Shunxin,
> >
> > One problem with join support using WindowedOperator is that Apex
> operator
> > does not support variable number of ports so we might have to limit the
> > join operator to, say, 5 input ports. Implementing join support for
> > WindowedOperator should not be difficult, but might be a little messy
> > because we will need to have a watermark control port for each regular
> > input port, making it 10 total input ports if we support a maximum of 5
> > join inputs.
> >
> > Please take a look at the JoinAccumulation template interface. That was
> > there for the future join support I planned to add.
> >
> > Also, pay a bit of attention on how you process watermarks from each
> > input, and let me know if you need help.
> >
> > David
> >
> > On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com>
> wrote:
> >
> >> Hello there,
> >>
> >> I am planning to add join support in Windowed Operator, but need some
> >> advice on how to start.
> >> Currently I am thinking to add a new subclass inheriting
> >> AbstractWindowedOperator and do all the work we need in that class (add
> >> more input ports, do join accumulation, etc.), but I am experiencing
> some
> >> difficulties doing so. Or should I directly change the codes in
> >> AbstractWindowedOperator?
> >>
> >> Thanks,
> >> Shunxin
> >>
> >
> >
>
Re: Join Support
Posted by David Yan <da...@datatorrent.com>.
Also, regarding the difficulties you mentioned about a new subclass
inheriting AbstractWindowedOperator, what specifically are they?
David
On Tue, Aug 16, 2016 at 12:31 PM, David Yan <da...@datatorrent.com> wrote:
> Hi Shunxin,
>
> One problem with join support using WindowedOperator is that Apex operator
> does not support variable number of ports so we might have to limit the
> join operator to, say, 5 input ports. Implementing join support for
> WindowedOperator should not be difficult, but might be a little messy
> because we will need to have a watermark control port for each regular
> input port, making it 10 total input ports if we support a maximum of 5
> join inputs.
>
> Please take a look at the JoinAccumulation template interface. That was
> there for the future join support I planned to add.
>
> Also, pay a bit of attention on how you process watermarks from each
> input, and let me know if you need help.
>
> David
>
> On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com> wrote:
>
>> Hello there,
>>
>> I am planning to add join support in Windowed Operator, but need some
>> advice on how to start.
>> Currently I am thinking to add a new subclass inheriting
>> AbstractWindowedOperator and do all the work we need in that class (add
>> more input ports, do join accumulation, etc.), but I am experiencing some
>> difficulties doing so. Or should I directly change the codes in
>> AbstractWindowedOperator?
>>
>> Thanks,
>> Shunxin
>>
>
>
Re: Join Support
Posted by David Yan <da...@datatorrent.com>.
Hi Shunxin,
One problem with join support using WindowedOperator is that Apex operator
does not support variable number of ports so we might have to limit the
join operator to, say, 5 input ports. Implementing join support for
WindowedOperator should not be difficult, but might be a little messy
because we will need to have a watermark control port for each regular
input port, making it 10 total input ports if we support a maximum of 5
join inputs.
Please take a look at the JoinAccumulation template interface. That was
there for the future join support I planned to add.
Also, pay a bit of attention on how you process watermarks from each input,
and let me know if you need help.
David
On Fri, Aug 12, 2016 at 11:03 AM, Shunxin Lu <lu...@gmail.com> wrote:
> Hello there,
>
> I am planning to add join support in Windowed Operator, but need some
> advice on how to start.
> Currently I am thinking to add a new subclass inheriting
> AbstractWindowedOperator and do all the work we need in that class (add
> more input ports, do join accumulation, etc.), but I am experiencing some
> difficulties doing so. Or should I directly change the codes in
> AbstractWindowedOperator?
>
> Thanks,
> Shunxin
>