You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Vinay Patil <vi...@gmail.com> on 2016/06/29 12:14:35 UTC

Re: [Discussion] Query regarding Join and Windows

Hi,

Ok.
Inside the checkAndGetNextWatermark(lastElement, extractedTimestamp) method
both these parameters are coming same (timestamp value) , I was expecting
last element timestamp value in the 1st param when I extract it.

Lets say I decide to use IngestionTime (since I am getting accurate results
here for now), if the joining element of both streams are assigned to
different windows , then it that case I will not get the match , right ?

However in case of event time this guarantees to be in the same window
since we are assigning the timestamp, correct me here.

 According to documentation :
* Ingestion Time programs cannot handle any out-of-order events or late
data*

In this context What do we mean by out-of-order events How does it know
that the events are out of order, I mean on which parameter does it decide
that the events are out-of-order  ? As in case of event time we can say the
timestamps received are out of order.

Late Data : does it have a threshold after which it does not accept late
data ?


Regards,
Vinay Patil

On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> the element will be kept around indefinitely if no new watermark arrives.
>
> I think the same problem will persist for AssignerWithPunctuatedWatermarks
> since there you also might not get the required "last watermark" to trigger
> processing of the last window.
>
> Cheers,
> Aljoscha
>
> On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vi...@gmail.com> wrote:
>
> > Hi Aljoscha,
> >
> > This clears a lot of doubts now.
> > So now lets say the stream paused for a while or it stops completely on
> > Friday , let us assume that the last message did not get processed and is
> > kept in the internal buffers.
> >
> > So when the stream starts again on Monday , will it consider the last
> > element that is in the internal buffer for processing ?
> >  How much time the internal buffer can hold the data or will it flush the
> > data after a threshold ?
> >
> > I have tried using AssignerWithPunctuatedWatermarks and generated the
> > watermark for each event, still getting one record less.
> >
> >
> > Regards,
> > Vinay Patil
> >
> > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Hi,
> > > the reason why the last element might never be emitted is the way the
> > > ascending timestamp extractor works. I'll try and explain with an
> > example.
> > >
> > > Let's say we have a window size of 2 milliseconds, elements arrive
> > starting
> > > with timestamp 0, window begin timestamp is inclusive, end timestamp is
> > > exclusive:
> > >
> > > Element 0, Timestamp 0 (at this point the watermark is -1)
> > > Element 1, Timestamp 1 (at this point the watermark is 0)
> > > Element 2, Timestamp 1 (at this point the watermark is still 0)
> > > Element 3, Timestamp 2 (at this point the watermark is 1)
> > >
> > > now we can process the window (0, 2) because we know from the watermark
> > > that no elements can arrive for that window anymore. The window
> contains
> > > elements 0,1,2
> > >
> > > Element 4, Timestamp 3 (at this point the watermark is 2)
> > > Element 5, Timestamp 4 (at this point the watermark is 3)
> > >
> > > now we can process window (2, 4). The window contains elements 3,4.
> > >
> > > At this point, we have Element 5 sitting in internal buffers for window
> > (4,
> > > 6) but if we don't receive further elements the watermark will never
> > > advance and we will never process that window.
> > >
> > > If, however, we get new elements at some point the watermark advances
> and
> > > we don't have a problem. That's what I meant when I said that you
> > shouldn't
> > > have a problem if data keeps continuously arriving.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > >
> > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <vi...@gmail.com>
> > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thanks a lot for your inputs.
> > > >
> > > > I still did not get you when you say you will not face this issue in
> > case
> > > > of continuous stream, lets consider the following example :
> > > > Assume that the stream runs continuously from Monday  to Friday, and
> on
> > > > Friday it stops after 5.00 PM , will I still face this issue ?
> > > >
> > > > I am actually not able to understand how it will differ in real time
> > > > streams.
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > ingestion time can only be used if you don't care about the
> timestamp
> > > in
> > > > > the elements. So if you have those you should probably use event
> > time.
> > > > >
> > > > > If your timestamps really are strictly increasing then the
> ascending
> > > > > extractor is good. And if you have a continuous stream of incoming
> > > > elements
> > > > > you will not see the behavior of not getting the last elements.
> > > > >
> > > > > By the way, when using Kafka you can also embed the timestamp
> > extractor
> > > > > directly in the Kafka consumer. This is described here:
> > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <vi...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > Thank you for your response.
> > > > > > So do you suggest to use different approach for extracting
> > timestamp
> > > > (as
> > > > > > given in document) instead of AscendingTimeStamp Extractor ?
> > > > > > Is that the reason I am seeing this unexpected behaviour ? in
> case
> > of
> > > > > > continuous stream I would not see any data loss ?
> > > > > >
> > > > > > Also assuming that the records are always going to be in order ,
> > > which
> > > > is
> > > > > > the best approach : Ingestion Time or Event Time ?
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > > Vinay Patil
> > > > > >
> > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> > > aljoscha@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > first regarding tumbling windows: even if you have 5 minute
> > windows
> > > > it
> > > > > > can
> > > > > > > happen that elements that are only seconds apart go into
> > different
> > > > > > windows.
> > > > > > > Consider the following case:
> > > > > > >
> > > > > > > |                x | x                 |
> > > > > > >
> > > > > > > These are two 5-mintue windows and the two elements are only
> > > seconds
> > > > > > apart
> > > > > > > but go into different windows because windows are aligned to
> > epoch.
> > > > > > >
> > > > > > > Now, for the ascending timestamp extractor. The reason this can
> > > > behave
> > > > > in
> > > > > > > unexpected ways is that it emits a watermark that is "last
> > > timestamp
> > > > -
> > > > > > 1",
> > > > > > > i.e. if it has seen timestamp t it can only emit watermark t-1
> > > > because
> > > > > > > there might be other elements with timestamp t arriving. If you
> > > have
> > > > a
> > > > > > > continuous stream of elements you wouldn't notice this. Only in
> > > this
> > > > > > > constructed example does it become visible.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> > vinay18.patil@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Following is the timestamp I am getting from DTO, here is the
> > > > > timestamp
> > > > > > > > difference between the two records :
> > > > > > > > 1466115892162154279
> > > > > > > > 1466116026233613409
> > > > > > > >
> > > > > > > > So the time difference is roughly 3 min, even if I apply the
> > > window
> > > > > of
> > > > > > > 5min
> > > > > > > > , I am not getting the last record (last timestamp value
> > above),
> > > > > > > > using ascending timestamp extractor for generating the
> > timestamp
> > > > > > > (assuming
> > > > > > > > that the timestamp are always in order)
> > > > > > > >
> > > > > > > > I was at-least expecting data to reach the co-group function.
> > > > > > > > What could be the reason for the data loss ? The data we are
> > > > getting
> > > > > is
> > > > > > > > critical, hence we cannot afford to loose any data
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Vinay Patil
> > > > > > > >
> > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > > > > vinay18.patil@gmail.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Just an update, when I keep IngestionTime and remove the
> > > > timestamp
> > > > > I
> > > > > > am
> > > > > > > > > generating, I am getting all the records, but for Event
> Time
> > I
> > > am
> > > > > > > getting
> > > > > > > > > one less record, I checked the Time Difference between two
> > > > records,
> > > > > > it
> > > > > > > > is 3
> > > > > > > > > min, I tried keeping the window time to 5 mins, but that
> even
> > > did
> > > > > not
> > > > > > > > work.
> > > > > > > > >
> > > > > > > > > Even when I try assigning timestamp for IngestionTime, I
> get
> > > one
> > > > > > record
> > > > > > > > > less, so should I safely use Ingestion Time or is it always
> > > > > advisable
> > > > > > > to
> > > > > > > > > use EventTime ?
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Vinay Patil
> > > > > > > > >
> > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > > > > > vinay18.patil@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi ,
> > > > > > > > >>
> > > > > > > > >> Actually I am only publishing 5 messages each to two
> > different
> > > > > kafka
> > > > > > > > >> topics (using Junit), even if I keep the window to 500
> > seconds
> > > > the
> > > > > > > > result
> > > > > > > > >> is same.
> > > > > > > > >>
> > > > > > > > >> I am not understanding why it is not sending the 5th
> element
> > > to
> > > > > > > co-group
> > > > > > > > >> operator even when the keys are same.
> > > > > > > > >>
> > > > > > > > >> I actually cannot share the actual client code.
> > > > > > > > >> But this is what the streams look like :
> > > > > > > > >> sourceStream.coGroup(destStream)
> > > > > > > > >> here the sourceStream and destStream is actually
> > > > > Tuple2<String,DTO>
> > > > > > ,
> > > > > > > > and
> > > > > > > > >> the ElementSelector returns tuple.f0 which is the key.
> > > > > > > > >>
> > > > > > > > >> I am generating the timestamp based on a field from the
> DTO
> > > > which
> > > > > is
> > > > > > > > >> guaranteed to be in order.
> > > > > > > > >>
> > > > > > > > >> Will using the triggers help here ?
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> Regards,
> > > > > > > > >> Vinay Patil
> > > > > > > > >>
> > > > > > > > >> *+91-800-728-4749*
> > > > > > > > >>
> > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > > > > > > aljoscha@apache.org>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >>> Hi,
> > > > > > > > >>> what timestamps are you assigning? Is it guaranteed that
> > all
> > > of
> > > > > > them
> > > > > > > > >>> would
> > > > > > > > >>> fall into the same 30 second window?
> > > > > > > > >>>
> > > > > > > > >>> The issue with duplicate printing in the ElementSelector
> is
> > > > > > strange?
> > > > > > > > >>> Could
> > > > > > > > >>> you post a more complete code example so that I can
> > reproduce
> > > > the
> > > > > > > > >>> problem?
> > > > > > > > >>>
> > > > > > > > >>> Cheers,
> > > > > > > > >>> Aljoscha
> > > > > > > > >>>
> > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> > > > > vinay18.patil@gmail.com>
> > > > > > > > >>> wrote:
> > > > > > > > >>>
> > > > > > > > >>> > Hi ,
> > > > > > > > >>> >
> > > > > > > > >>> > I am able to get the matching and non-matching
> elements.
> > > > > > > > >>> >
> > > > > > > > >>> > However when I am unit testing the code , I am getting
> > one
> > > > > record
> > > > > > > > less
> > > > > > > > >>> > inside the overriden cogroup function.
> > > > > > > > >>> > Testing the following way :
> > > > > > > > >>> >
> > > > > > > > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > > > > > > > >>> > 2) Insert different 5 messages into local kafka topic
> > > (test2)
> > > > > > > > >>> > 3) Consume 1) and 2) and I have two different kafka
> > > streams
> > > > > > > > >>> > 4) Generate ascending timestamp(using Event Time) for
> > both
> > > > > > streams
> > > > > > > > and
> > > > > > > > >>> > create key(String)
> > > > > > > > >>> >
> > > > > > > > >>> > Now till 4) I am able to get all the records (checked
> by
> > > > > printing
> > > > > > > the
> > > > > > > > >>> > stream in text file)
> > > > > > > > >>> >
> > > > > > > > >>> > However when I send the stream to co-group operator, I
> am
> > > > > > receiving
> > > > > > > > one
> > > > > > > > >>> > less record, using the following syntax:
> > > > > > > > >>> >
> > > > > > > > >>> > sourceStream.coGroup(destStream)
> > > > > > > > >>> > .where(new ElementSelector())
> > > > > > > > >>> > .equalTo(new ElementSelector())
> > > > > > > > >>> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > >>> > .apply(new JoinStreams);
> > > > > > > > >>> >
> > > > > > > > >>> > Also in the Element Selector I have inserted a sysout,
> I
> > am
> > > > > > getting
> > > > > > > > 20
> > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source and 10 for
> > > dest
> > > > > > > stream)
> > > > > > > > >>> >
> > > > > > > > >>> > Unable to understand why one record is coming less to
> > > > co-group
> > > > > > > > >>> >
> > > > > > > > >>> >
> > > > > > > > >>> >
> > > > > > > > >>> > Regards,
> > > > > > > > >>> > Vinay Patil
> > > > > > > > >>> >
> > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> > > > > > fhueske@gmail.com>
> > > > > > > > >>> wrote:
> > > > > > > > >>> >
> > > > > > > > >>> > > Can you add a flag to each element emitted by the
> > > > > > CoGroupFunction
> > > > > > > > >>> that
> > > > > > > > >>> > > indicates whether it was joined or not?
> > > > > > > > >>> > > Then you can use split to distinguish between both
> > cases
> > > > and
> > > > > > > handle
> > > > > > > > >>> both
> > > > > > > > >>> > > streams differently.
> > > > > > > > >>> > >
> > > > > > > > >>> > > Best, Fabian
> > > > > > > > >>> > >
> > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> > > > > vinay18.patil@gmail.com
> > > > > > >:
> > > > > > > > >>> > >
> > > > > > > > >>> > > > Hi Jark,
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > I am able to get the non-matching elements in a
> > stream
> > > :,
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > Of-course we can collect the matching elements in
> the
> > > > same
> > > > > > > stream
> > > > > > > > >>> as
> > > > > > > > >>> > > well,
> > > > > > > > >>> > > > however I want to perform additional operations on
> > the
> > > > > joined
> > > > > > > > >>> stream
> > > > > > > > >>> > > before
> > > > > > > > >>> > > > writing it to S3, so I would have to include a
> > separate
> > > > > join
> > > > > > > > >>> operator
> > > > > > > > >>> > for
> > > > > > > > >>> > > > the same two streams, right ?
> > > > > > > > >>> > > > Correct me if I am wrong.
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > I have pasted the dummy code which collects the
> > > > > non-matching
> > > > > > > > >>> records (i
> > > > > > > > >>> > > > have to perform this on the actual data, correct me
> > if
> > > I
> > > > am
> > > > > > > dong
> > > > > > > > >>> > wrong).
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > > > > >>> > ElementSelector()).equalTo(new
> > > > > > > > >>> > > > ElementSelector())
> > > > > > > > >>> > > >
> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer, Integer,
> > > Integer>() {
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > private static final long serialVersionUID =
> > > > > > > > 6408179761497497475L;
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > @Override
> > > > > > > > >>> > > > public void coGroup(Iterable<Integer>
> paramIterable,
> > > > > > > > >>> Iterable<Integer>
> > > > > > > > >>> > > > paramIterable1,
> > > > > > > > >>> > > > Collector<Integer> paramCollector) throws
> Exception {
> > > > > > > > >>> > > > long exactSizeIfKnown =
> > > > > > > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > > > > > >>> > > > long exactSizeIfKnown2 =
> > > > > > > > >>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > > > > > > >>> > > >
> > > paramCollector.collect(paramIterable1.iterator().next());
> > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > > > > > >>> > > >
> > > paramCollector.collect(paramIterable.iterator().next());
> > > > > > > > >>> > > > }
> > > > > > > > >>> > > > }
> > > > > > > > >>> > > > }).print();
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > Regards,
> > > > > > > > >>> > > > Vinay Patil
> > > > > > > > >>> > > >
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > > > > > > > >>> vinay18.patil@gmail.com>
> > > > > > > > >>> > > > wrote:
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > > You are right, debugged it for all elements , I
> can
> > > do
> > > > > that
> > > > > > > > now.
> > > > > > > > >>> > > > > Thanks a lot.
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > > Regards,
> > > > > > > > >>> > > > > Vinay Patil
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > > > > > > > >>> > wuchong.wc@alibaba-inc.com>
> > > > > > > > >>> > > > > wrote:
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
> > > Iterable<Integer>
> > > > > > > iter2,
> > > > > > > > >>> > > > >> Collector<Integer> out)` ,   when both iter1 and
> > > iter2
> > > > > are
> > > > > > > not
> > > > > > > > >>> > empty,
> > > > > > > > >>> > > it
> > > > > > > > >>> > > > >> means they are matched elements from both
> stream.
> > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty , it means
> > that
> > > > > they
> > > > > > > are
> > > > > > > > >>> > > unmatched.
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> > > > > > vinay18.patil@gmail.com
> > > > > > > >
> > > > > > > > >>> 写道:
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > Hi Matthias ,
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > I did not get you, even if we use Co-Group we
> > have
> > > > to
> > > > > > > apply
> > > > > > > > >>> it on
> > > > > > > > >>> > a
> > > > > > > > >>> > > > key
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > > > > >>> > > > >> > .where(new ElementSelector())
> > > > > > > > >>> > > > >> > .equalTo(new ElementSelector())
> > > > > > > > >>> > > > >> >
> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer,
> > > > > Integer>()
> > > > > > {
> > > > > > > > >>> > > > >> > private static final long serialVersionUID =
> > > > > > > > >>> 6408179761497497475L;
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > @Override
> > > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer>
> > > paramIterable,
> > > > > > > > >>> > > Iterable<Integer>
> > > > > > > > >>> > > > >> > paramIterable1,
> > > > > > > > >>> > > > >> > Collector<Integer> paramCollector) throws
> > > Exception
> > > > {
> > > > > > > > >>> > > > >> > Iterator<Integer> iterator =
> > > > paramIterable.iterator();
> > > > > > > > >>> > > > >> > while(iterator.hasNext()) {
> > > > > > > > >>> > > > >> > }
> > > > > > > > >>> > > > >> > }
> > > > > > > > >>> > > > >> > });
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > when I debug this ,only the matched element
> from
> > > > both
> > > > > > > stream
> > > > > > > > >>> will
> > > > > > > > >>> > > come
> > > > > > > > >>> > > > >> in
> > > > > > > > >>> > > > >> > the coGroup function.
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > What I want is how do I check for unmatched
> > > elements
> > > > > > from
> > > > > > > > both
> > > > > > > > >>> > > streams
> > > > > > > > >>> > > > >> and
> > > > > > > > >>> > > > >> > write it to sink.
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > Regards,
> > > > > > > > >>> > > > >> > Vinay Patil
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J.
> > Sax <
> > > > > > > > >>> > mjsax@apache.org>
> > > > > > > > >>> > > > >> wrote:
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> >> You need to do an outer-join. However, there
> is
> > > no
> > > > > > > build-in
> > > > > > > > >>> > support
> > > > > > > > >>> > > > for
> > > > > > > > >>> > > > >> >> outer-joins yet.
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >> You can use Window-CoGroup to implement the
> > > > > outer-join
> > > > > > as
> > > > > > > > an
> > > > > > > > >>> own
> > > > > > > > >>> > > > >> operator.
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >> -Matthias
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > > > > > > >>> > > > >> >>> Hi,
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>> I have a question regarding the join
> > operation,
> > > > > > consider
> > > > > > > > the
> > > > > > > > >>> > > > following
> > > > > > > > >>> > > > >> >>> dummy example:
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > > > > > > >>> > > > >> >>>
> > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> >
> > > > > >
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > > > > > > > >>> > > > >> >>> env.fromElements(10,20,23,25,30,33,102,18);
> > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > > > > > >>> > > > >> >>> .where(new ElementSelector())
> > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > > > > > > >>> > > > >> >>>
> > > > > > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer,
> > > > > Integer>() {
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>> private static final long serialVersionUID =
> > 1L;
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>> @Override
> > > > > > > > >>> > > > >> >>> public Integer join(Integer paramIN1,
> Integer
> > > > > > paramIN2)
> > > > > > > > >>> throws
> > > > > > > > >>> > > > >> Exception
> > > > > > > > >>> > > > >> >> {
> > > > > > > > >>> > > > >> >>> return paramIN1;
> > > > > > > > >>> > > > >> >>> }
> > > > > > > > >>> > > > >> >>> }).print();
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>> I perfectly get the elements that are
> matching
> > > in
> > > > > both
> > > > > > > the
> > > > > > > > >>> > > streams,
> > > > > > > > >>> > > > >> >> however
> > > > > > > > >>> > > > >> >>> my requirement is to write these matched
> > > elements
> > > > > and
> > > > > > > also
> > > > > > > > >>> the
> > > > > > > > >>> > > > >> unmatched
> > > > > > > > >>> > > > >> >>> elements to sink(S3)
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>> How do I get the unmatched elements from
> each
> > > > > stream ?
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>> Regards,
> > > > > > > > >>> > > > >> >>> Vinay Patil
> > > > > > > > >>> > > > >> >>>
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >> >>
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > >
> > > > > > > > >>> > >
> > > > > > > > >>> >
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [Discussion] Query regarding Join and Windows

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
yes, the window operator is stateful, which means that it will pick up
where it left in case of a failure and restore.

You're right about the graph, chained operators are shown as one box.

Cheers,
Aljoscha

On Fri, 1 Jul 2016 at 04:52 Vinay Patil <vi...@gmail.com> wrote:

> Hi,
>
> Just watched the video on Robust Stream Processing .
> So when we say Window is a stateful operator , does it mean that even if
> the task manager doing the window operation fails,  will it pick up from
> the state left earlier when it comes up ? (Have not read more on state for
> now)
>
>
> Also in one of our project when we deploy on cluster and check the Job
> Graph , everything is shown in one box , why this happens ? Is it because
> of chaining of streams ?
> So the box here represent the function flow, right ?
>
>
>
> Regards,
> Vinay Patil
>
> On Thu, Jun 30, 2016 at 7:29 PM, Vinay Patil <vi...@gmail.com>
> wrote:
>
> > Hi Aljoscha,
> >
> > Just wanted to check if it works with it.
> > Anyways to solve the problem what we have thought of is to push heartbeat
> > message to Kafka after certain interval, so that we get continuous stream
> > always and that edge case will never occur, right ?
> >
> > One more question I have regarding the failover case :
> > Lets say I have a window of 10 secs , and in that there are e0 to en
> > elements , what if during this time node goes down ?
> > When the node comes up will it resume from the same state or will it
> > resume from the last checkpointed state ?
> >
> > Can we explicitly checkpoint inside the window , may be at the start of
> > the window or before we are applying window ?
> >
> >
> > Regards,
> > Vinay Patil
> >
> > On Thu, Jun 30, 2016 at 2:11 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> >> Hi,
> >> I think the problem is that the DeltaFunction needs to have this
> >> signature:
> >>
> >> DeltaFunction<CoGroupedStreams.TaggedUnion<Tuple2<String,DTO>,
> >> Tuple2<String,DTO>>>
> >>
> >> because the Trigger will see elements from both input streams which are
> >> represented as a TaggedUnion that can contain an element from either
> side.
> >>
> >> May I ask why you want to use the DeltaTrigger?
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Wed, 29 Jun 2016 at 19:06 Vinay Patil <vi...@gmail.com>
> wrote:
> >>
> >> > Hi,
> >> >
> >> > Yes , now I am getting clear with the concepts here.
> >> > One last thing I want to try before going for custom trigger, I want
> to
> >> try
> >> > Delta Trigger but I am not able to get the syntax right , this is how
> I
> >> am
> >> > trying it :
> >> >
> >> > TypeInformation<Tuple2<String, DTO>> typeInfo = TypeInformation.of(new
> >> > TypeHint<Tuple2<String, DTO>>() {
> >> > });
> >> > // source and destStream : Tuple2<String,DTO>
> >> > sourceStream.coGroup(destStream).where(new
> >> ElementSelector()).equalTo(new
> >> > ElementSelector())
> >> > .window(TumblingTimeEventWindows.of(Time.seconds(10)))
> >> > .trigger(DeltaTrigger.of(triggerMeters,
> >> > new DeltaFunction<Tuple2<String,DTO>>() {
> >> > private static final long serialVersionUID = 1L;
> >> >
> >> > @Override
> >> > public double getDelta(
> >> > Tuple2<String,DTO> oldDataPoint,
> >> > Tuple2<String,DTO> newDataPoint) {
> >> > return <some_val>;
> >> > }
> >> > }, typeInfo.createSerializer(env.getConfig()).apply(new
> JoinStreams());
> >> >
> >> > I am getting error cannot convert from DeltaTrigger to Trigger<? super
> >> > CoGroupedStreams...
> >> > What am I doing wrong here, I have referred the sample example.
> >> >
> >> > Regards,
> >> > Vinay Patil
> >> >
> >> > On Wed, Jun 29, 2016 at 7:15 PM, Aljoscha Krettek <
> aljoscha@apache.org>
> >> > wrote:
> >> >
> >> > > Hi,
> >> > > you can use ingestion time if you don't care about the timestamps in
> >> your
> >> > > events, yes. If elements from the two streams happen to arrive at
> such
> >> > > times that they are not put into the same window then you won't get
> a
> >> > > match, correct.
> >> > >
> >> > > Regarding ingestion time and out-of-order events. I think this
> section
> >> > just
> >> > > reiterates that when using ingestion time the inherent timestamps in
> >> your
> >> > > events will not be considered and their order will not be respected.
> >> > >
> >> > > Regarding late data: right now, Flink always processes late data and
> >> it
> >> > is
> >> > > up to the Trigger to decide what to do with late data. You can
> >> implement
> >> > > your custom trigger based on EventTimeTrigger that would immediately
> >> > purge
> >> > > a window when an element arrives that is later than an allowed
> amount
> >> of
> >> > > lateness. In Flink 1.1 we will introduce a setting for windows that
> >> > allows
> >> > > to specify an allowed lateness. With this, late elements will be
> >> dropped
> >> > > automatically. This feature is already available in the master, by
> the
> >> > way.
> >> > >
> >> > > Cheers,
> >> > > Aljoscha
> >> > >
> >> > > On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vi...@gmail.com>
> >> > wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > Ok.
> >> > > > Inside the checkAndGetNextWatermark(lastElement,
> extractedTimestamp)
> >> > > method
> >> > > > both these parameters are coming same (timestamp value) , I was
> >> > expecting
> >> > > > last element timestamp value in the 1st param when I extract it.
> >> > > >
> >> > > > Lets say I decide to use IngestionTime (since I am getting
> accurate
> >> > > results
> >> > > > here for now), if the joining element of both streams are assigned
> >> to
> >> > > > different windows , then it that case I will not get the match ,
> >> right
> >> > ?
> >> > > >
> >> > > > However in case of event time this guarantees to be in the same
> >> window
> >> > > > since we are assigning the timestamp, correct me here.
> >> > > >
> >> > > >  According to documentation :
> >> > > > * Ingestion Time programs cannot handle any out-of-order events or
> >> late
> >> > > > data*
> >> > > >
> >> > > > In this context What do we mean by out-of-order events How does it
> >> know
> >> > > > that the events are out of order, I mean on which parameter does
> it
> >> > > decide
> >> > > > that the events are out-of-order  ? As in case of event time we
> can
> >> say
> >> > > the
> >> > > > timestamps received are out of order.
> >> > > >
> >> > > > Late Data : does it have a threshold after which it does not
> accept
> >> > late
> >> > > > data ?
> >> > > >
> >> > > >
> >> > > > Regards,
> >> > > > Vinay Patil
> >> > > >
> >> > > > On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <
> >> aljoscha@apache.org
> >> > >
> >> > > > wrote:
> >> > > >
> >> > > > > Hi,
> >> > > > > the element will be kept around indefinitely if no new watermark
> >> > > arrives.
> >> > > > >
> >> > > > > I think the same problem will persist for
> >> > > > AssignerWithPunctuatedWatermarks
> >> > > > > since there you also might not get the required "last watermark"
> >> to
> >> > > > trigger
> >> > > > > processing of the last window.
> >> > > > >
> >> > > > > Cheers,
> >> > > > > Aljoscha
> >> > > > >
> >> > > > > On Wed, 29 Jun 2016 at 13:18 Vinay Patil <
> vinay18.patil@gmail.com
> >> >
> >> > > > wrote:
> >> > > > >
> >> > > > > > Hi Aljoscha,
> >> > > > > >
> >> > > > > > This clears a lot of doubts now.
> >> > > > > > So now lets say the stream paused for a while or it stops
> >> > completely
> >> > > on
> >> > > > > > Friday , let us assume that the last message did not get
> >> processed
> >> > > and
> >> > > > is
> >> > > > > > kept in the internal buffers.
> >> > > > > >
> >> > > > > > So when the stream starts again on Monday , will it consider
> the
> >> > last
> >> > > > > > element that is in the internal buffer for processing ?
> >> > > > > >  How much time the internal buffer can hold the data or will
> it
> >> > flush
> >> > > > the
> >> > > > > > data after a threshold ?
> >> > > > > >
> >> > > > > > I have tried using AssignerWithPunctuatedWatermarks and
> >> generated
> >> > the
> >> > > > > > watermark for each event, still getting one record less.
> >> > > > > >
> >> > > > > >
> >> > > > > > Regards,
> >> > > > > > Vinay Patil
> >> > > > > >
> >> > > > > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <
> >> > > aljoscha@apache.org
> >> > > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi,
> >> > > > > > > the reason why the last element might never be emitted is
> the
> >> way
> >> > > the
> >> > > > > > > ascending timestamp extractor works. I'll try and explain
> >> with an
> >> > > > > > example.
> >> > > > > > >
> >> > > > > > > Let's say we have a window size of 2 milliseconds, elements
> >> > arrive
> >> > > > > > starting
> >> > > > > > > with timestamp 0, window begin timestamp is inclusive, end
> >> > > timestamp
> >> > > > is
> >> > > > > > > exclusive:
> >> > > > > > >
> >> > > > > > > Element 0, Timestamp 0 (at this point the watermark is -1)
> >> > > > > > > Element 1, Timestamp 1 (at this point the watermark is 0)
> >> > > > > > > Element 2, Timestamp 1 (at this point the watermark is still
> >> 0)
> >> > > > > > > Element 3, Timestamp 2 (at this point the watermark is 1)
> >> > > > > > >
> >> > > > > > > now we can process the window (0, 2) because we know from
> the
> >> > > > watermark
> >> > > > > > > that no elements can arrive for that window anymore. The
> >> window
> >> > > > > contains
> >> > > > > > > elements 0,1,2
> >> > > > > > >
> >> > > > > > > Element 4, Timestamp 3 (at this point the watermark is 2)
> >> > > > > > > Element 5, Timestamp 4 (at this point the watermark is 3)
> >> > > > > > >
> >> > > > > > > now we can process window (2, 4). The window contains
> elements
> >> > 3,4.
> >> > > > > > >
> >> > > > > > > At this point, we have Element 5 sitting in internal buffers
> >> for
> >> > > > window
> >> > > > > > (4,
> >> > > > > > > 6) but if we don't receive further elements the watermark
> will
> >> > > never
> >> > > > > > > advance and we will never process that window.
> >> > > > > > >
> >> > > > > > > If, however, we get new elements at some point the watermark
> >> > > advances
> >> > > > > and
> >> > > > > > > we don't have a problem. That's what I meant when I said
> that
> >> you
> >> > > > > > shouldn't
> >> > > > > > > have a problem if data keeps continuously arriving.
> >> > > > > > >
> >> > > > > > > Cheers,
> >> > > > > > > Aljoscha
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <
> >> > vinay18.patil@gmail.com>
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi Aljoscha,
> >> > > > > > > >
> >> > > > > > > > Thanks a lot for your inputs.
> >> > > > > > > >
> >> > > > > > > > I still did not get you when you say you will not face
> this
> >> > issue
> >> > > > in
> >> > > > > > case
> >> > > > > > > > of continuous stream, lets consider the following example
> :
> >> > > > > > > > Assume that the stream runs continuously from Monday  to
> >> > Friday,
> >> > > > and
> >> > > > > on
> >> > > > > > > > Friday it stops after 5.00 PM , will I still face this
> >> issue ?
> >> > > > > > > >
> >> > > > > > > > I am actually not able to understand how it will differ in
> >> real
> >> > > > time
> >> > > > > > > > streams.
> >> > > > > > > >
> >> > > > > > > > Regards,
> >> > > > > > > > Vinay Patil
> >> > > > > > > >
> >> > > > > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <
> >> > > > > aljoscha@apache.org
> >> > > > > > >
> >> > > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Hi,
> >> > > > > > > > > ingestion time can only be used if you don't care about
> >> the
> >> > > > > timestamp
> >> > > > > > > in
> >> > > > > > > > > the elements. So if you have those you should probably
> use
> >> > > event
> >> > > > > > time.
> >> > > > > > > > >
> >> > > > > > > > > If your timestamps really are strictly increasing then
> the
> >> > > > > ascending
> >> > > > > > > > > extractor is good. And if you have a continuous stream
> of
> >> > > > incoming
> >> > > > > > > > elements
> >> > > > > > > > > you will not see the behavior of not getting the last
> >> > elements.
> >> > > > > > > > >
> >> > > > > > > > > By the way, when using Kafka you can also embed the
> >> timestamp
> >> > > > > > extractor
> >> > > > > > > > > directly in the Kafka consumer. This is described here:
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> >> > > > > > > > >
> >> > > > > > > > > Cheers,
> >> > > > > > > > > Aljoscha
> >> > > > > > > > >
> >> > > > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <
> >> > > > vinay18.patil@gmail.com>
> >> > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hi Aljoscha,
> >> > > > > > > > > >
> >> > > > > > > > > > Thank you for your response.
> >> > > > > > > > > > So do you suggest to use different approach for
> >> extracting
> >> > > > > > timestamp
> >> > > > > > > > (as
> >> > > > > > > > > > given in document) instead of AscendingTimeStamp
> >> Extractor
> >> > ?
> >> > > > > > > > > > Is that the reason I am seeing this unexpected
> >> behaviour ?
> >> > in
> >> > > > > case
> >> > > > > > of
> >> > > > > > > > > > continuous stream I would not see any data loss ?
> >> > > > > > > > > >
> >> > > > > > > > > > Also assuming that the records are always going to be
> in
> >> > > order
> >> > > > ,
> >> > > > > > > which
> >> > > > > > > > is
> >> > > > > > > > > > the best approach : Ingestion Time or Event Time ?
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > Regards,
> >> > > > > > > > > > Vinay Patil
> >> > > > > > > > > >
> >> > > > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> >> > > > > > > aljoscha@apache.org
> >> > > > > > > > >
> >> > > > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > Hi,
> >> > > > > > > > > > > first regarding tumbling windows: even if you have 5
> >> > minute
> >> > > > > > windows
> >> > > > > > > > it
> >> > > > > > > > > > can
> >> > > > > > > > > > > happen that elements that are only seconds apart go
> >> into
> >> > > > > > different
> >> > > > > > > > > > windows.
> >> > > > > > > > > > > Consider the following case:
> >> > > > > > > > > > >
> >> > > > > > > > > > > |                x | x                 |
> >> > > > > > > > > > >
> >> > > > > > > > > > > These are two 5-mintue windows and the two elements
> >> are
> >> > > only
> >> > > > > > > seconds
> >> > > > > > > > > > apart
> >> > > > > > > > > > > but go into different windows because windows are
> >> aligned
> >> > > to
> >> > > > > > epoch.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Now, for the ascending timestamp extractor. The
> reason
> >> > this
> >> > > > can
> >> > > > > > > > behave
> >> > > > > > > > > in
> >> > > > > > > > > > > unexpected ways is that it emits a watermark that is
> >> > "last
> >> > > > > > > timestamp
> >> > > > > > > > -
> >> > > > > > > > > > 1",
> >> > > > > > > > > > > i.e. if it has seen timestamp t it can only emit
> >> > watermark
> >> > > > t-1
> >> > > > > > > > because
> >> > > > > > > > > > > there might be other elements with timestamp t
> >> arriving.
> >> > If
> >> > > > you
> >> > > > > > > have
> >> > > > > > > > a
> >> > > > > > > > > > > continuous stream of elements you wouldn't notice
> >> this.
> >> > > Only
> >> > > > in
> >> > > > > > > this
> >> > > > > > > > > > > constructed example does it become visible.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Cheers,
> >> > > > > > > > > > > Aljoscha
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> >> > > > > > vinay18.patil@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > Hi,
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Following is the timestamp I am getting from DTO,
> >> here
> >> > is
> >> > > > the
> >> > > > > > > > > timestamp
> >> > > > > > > > > > > > difference between the two records :
> >> > > > > > > > > > > > 1466115892162154279
> >> > > > > > > > > > > > 1466116026233613409
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > So the time difference is roughly 3 min, even if I
> >> > apply
> >> > > > the
> >> > > > > > > window
> >> > > > > > > > > of
> >> > > > > > > > > > > 5min
> >> > > > > > > > > > > > , I am not getting the last record (last timestamp
> >> > value
> >> > > > > > above),
> >> > > > > > > > > > > > using ascending timestamp extractor for generating
> >> the
> >> > > > > > timestamp
> >> > > > > > > > > > > (assuming
> >> > > > > > > > > > > > that the timestamp are always in order)
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I was at-least expecting data to reach the
> co-group
> >> > > > function.
> >> > > > > > > > > > > > What could be the reason for the data loss ? The
> >> data
> >> > we
> >> > > > are
> >> > > > > > > > getting
> >> > > > > > > > > is
> >> > > > > > > > > > > > critical, hence we cannot afford to loose any data
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Regards,
> >> > > > > > > > > > > > Vinay Patil
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> >> > > > > > > > > vinay18.patil@gmail.com
> >> > > > > > > > > > >
> >> > > > > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > Just an update, when I keep IngestionTime and
> >> remove
> >> > > the
> >> > > > > > > > timestamp
> >> > > > > > > > > I
> >> > > > > > > > > > am
> >> > > > > > > > > > > > > generating, I am getting all the records, but
> for
> >> > Event
> >> > > > > Time
> >> > > > > > I
> >> > > > > > > am
> >> > > > > > > > > > > getting
> >> > > > > > > > > > > > > one less record, I checked the Time Difference
> >> > between
> >> > > > two
> >> > > > > > > > records,
> >> > > > > > > > > > it
> >> > > > > > > > > > > > is 3
> >> > > > > > > > > > > > > min, I tried keeping the window time to 5 mins,
> >> but
> >> > > that
> >> > > > > even
> >> > > > > > > did
> >> > > > > > > > > not
> >> > > > > > > > > > > > work.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Even when I try assigning timestamp for
> >> > IngestionTime,
> >> > > I
> >> > > > > get
> >> > > > > > > one
> >> > > > > > > > > > record
> >> > > > > > > > > > > > > less, so should I safely use Ingestion Time or
> is
> >> it
> >> > > > always
> >> > > > > > > > > advisable
> >> > > > > > > > > > > to
> >> > > > > > > > > > > > > use EventTime ?
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > Regards,
> >> > > > > > > > > > > > > Vinay Patil
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> >> > > > > > > > > > vinay18.patil@gmail.com>
> >> > > > > > > > > > > > > wrote:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >> Hi ,
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> Actually I am only publishing 5 messages each
> to
> >> two
> >> > > > > > different
> >> > > > > > > > > kafka
> >> > > > > > > > > > > > >> topics (using Junit), even if I keep the window
> >> to
> >> > 500
> >> > > > > > seconds
> >> > > > > > > > the
> >> > > > > > > > > > > > result
> >> > > > > > > > > > > > >> is same.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> I am not understanding why it is not sending
> the
> >> 5th
> >> > > > > element
> >> > > > > > > to
> >> > > > > > > > > > > co-group
> >> > > > > > > > > > > > >> operator even when the keys are same.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> I actually cannot share the actual client code.
> >> > > > > > > > > > > > >> But this is what the streams look like :
> >> > > > > > > > > > > > >> sourceStream.coGroup(destStream)
> >> > > > > > > > > > > > >> here the sourceStream and destStream is
> actually
> >> > > > > > > > > Tuple2<String,DTO>
> >> > > > > > > > > > ,
> >> > > > > > > > > > > > and
> >> > > > > > > > > > > > >> the ElementSelector returns tuple.f0 which is
> the
> >> > key.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> I am generating the timestamp based on a field
> >> from
> >> > > the
> >> > > > > DTO
> >> > > > > > > > which
> >> > > > > > > > > is
> >> > > > > > > > > > > > >> guaranteed to be in order.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> Will using the triggers help here ?
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> Regards,
> >> > > > > > > > > > > > >> Vinay Patil
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> *+91-800-728-4749*
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha
> >> Krettek <
> >> > > > > > > > > > > aljoscha@apache.org>
> >> > > > > > > > > > > > >> wrote:
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>> Hi,
> >> > > > > > > > > > > > >>> what timestamps are you assigning? Is it
> >> guaranteed
> >> > > > that
> >> > > > > > all
> >> > > > > > > of
> >> > > > > > > > > > them
> >> > > > > > > > > > > > >>> would
> >> > > > > > > > > > > > >>> fall into the same 30 second window?
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> The issue with duplicate printing in the
> >> > > > ElementSelector
> >> > > > > is
> >> > > > > > > > > > strange?
> >> > > > > > > > > > > > >>> Could
> >> > > > > > > > > > > > >>> you post a more complete code example so that
> I
> >> can
> >> > > > > > reproduce
> >> > > > > > > > the
> >> > > > > > > > > > > > >>> problem?
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> Cheers,
> >> > > > > > > > > > > > >>> Aljoscha
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> >> > > > > > > > > vinay18.patil@gmail.com>
> >> > > > > > > > > > > > >>> wrote:
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> > Hi ,
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > I am able to get the matching and
> non-matching
> >> > > > > elements.
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > However when I am unit testing the code , I
> am
> >> > > > getting
> >> > > > > > one
> >> > > > > > > > > record
> >> > > > > > > > > > > > less
> >> > > > > > > > > > > > >>> > inside the overriden cogroup function.
> >> > > > > > > > > > > > >>> > Testing the following way :
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic
> >> > (test1)
> >> > > > > > > > > > > > >>> > 2) Insert different 5 messages into local
> >> kafka
> >> > > topic
> >> > > > > > > (test2)
> >> > > > > > > > > > > > >>> > 3) Consume 1) and 2) and I have two
> different
> >> > kafka
> >> > > > > > > streams
> >> > > > > > > > > > > > >>> > 4) Generate ascending timestamp(using Event
> >> Time)
> >> > > for
> >> > > > > > both
> >> > > > > > > > > > streams
> >> > > > > > > > > > > > and
> >> > > > > > > > > > > > >>> > create key(String)
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > Now till 4) I am able to get all the records
> >> > > (checked
> >> > > > > by
> >> > > > > > > > > printing
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > >>> > stream in text file)
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > However when I send the stream to co-group
> >> > > operator,
> >> > > > I
> >> > > > > am
> >> > > > > > > > > > receiving
> >> > > > > > > > > > > > one
> >> > > > > > > > > > > > >>> > less record, using the following syntax:
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > sourceStream.coGroup(destStream)
> >> > > > > > > > > > > > >>> > .where(new ElementSelector())
> >> > > > > > > > > > > > >>> > .equalTo(new ElementSelector())
> >> > > > > > > > > > > > >>> >
> >> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >> > > > > > > > > > > > >>> > .apply(new JoinStreams);
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > Also in the Element Selector I have
> inserted a
> >> > > > sysout,
> >> > > > > I
> >> > > > > > am
> >> > > > > > > > > > getting
> >> > > > > > > > > > > > 20
> >> > > > > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source
> >> and
> >> > 10
> >> > > > for
> >> > > > > > > dest
> >> > > > > > > > > > > stream)
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > Unable to understand why one record is
> coming
> >> > less
> >> > > to
> >> > > > > > > > co-group
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > Regards,
> >> > > > > > > > > > > > >>> > Vinay Patil
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian
> >> Hueske <
> >> > > > > > > > > > fhueske@gmail.com>
> >> > > > > > > > > > > > >>> wrote:
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>> > > Can you add a flag to each element emitted
> >> by
> >> > the
> >> > > > > > > > > > CoGroupFunction
> >> > > > > > > > > > > > >>> that
> >> > > > > > > > > > > > >>> > > indicates whether it was joined or not?
> >> > > > > > > > > > > > >>> > > Then you can use split to distinguish
> >> between
> >> > > both
> >> > > > > > cases
> >> > > > > > > > and
> >> > > > > > > > > > > handle
> >> > > > > > > > > > > > >>> both
> >> > > > > > > > > > > > >>> > > streams differently.
> >> > > > > > > > > > > > >>> > >
> >> > > > > > > > > > > > >>> > > Best, Fabian
> >> > > > > > > > > > > > >>> > >
> >> > > > > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> >> > > > > > > > > vinay18.patil@gmail.com
> >> > > > > > > > > > >:
> >> > > > > > > > > > > > >>> > >
> >> > > > > > > > > > > > >>> > > > Hi Jark,
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > I am able to get the non-matching
> elements
> >> > in a
> >> > > > > > stream
> >> > > > > > > :,
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > Of-course we can collect the matching
> >> > elements
> >> > > in
> >> > > > > the
> >> > > > > > > > same
> >> > > > > > > > > > > stream
> >> > > > > > > > > > > > >>> as
> >> > > > > > > > > > > > >>> > > well,
> >> > > > > > > > > > > > >>> > > > however I want to perform additional
> >> > operations
> >> > > > on
> >> > > > > > the
> >> > > > > > > > > joined
> >> > > > > > > > > > > > >>> stream
> >> > > > > > > > > > > > >>> > > before
> >> > > > > > > > > > > > >>> > > > writing it to S3, so I would have to
> >> include
> >> > a
> >> > > > > > separate
> >> > > > > > > > > join
> >> > > > > > > > > > > > >>> operator
> >> > > > > > > > > > > > >>> > for
> >> > > > > > > > > > > > >>> > > > the same two streams, right ?
> >> > > > > > > > > > > > >>> > > > Correct me if I am wrong.
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > I have pasted the dummy code which
> >> collects
> >> > the
> >> > > > > > > > > non-matching
> >> > > > > > > > > > > > >>> records (i
> >> > > > > > > > > > > > >>> > > > have to perform this on the actual data,
> >> > > correct
> >> > > > me
> >> > > > > > if
> >> > > > > > > I
> >> > > > > > > > am
> >> > > > > > > > > > > dong
> >> > > > > > > > > > > > >>> > wrong).
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > >
> sourceStream.coGroup(destStream).where(new
> >> > > > > > > > > > > > >>> > ElementSelector()).equalTo(new
> >> > > > > > > > > > > > >>> > > > ElementSelector())
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >> > > > > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer,
> >> Integer,
> >> > > > > > > Integer>() {
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > private static final long
> >> serialVersionUID =
> >> > > > > > > > > > > > 6408179761497497475L;
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > @Override
> >> > > > > > > > > > > > >>> > > > public void coGroup(Iterable<Integer>
> >> > > > > paramIterable,
> >> > > > > > > > > > > > >>> Iterable<Integer>
> >> > > > > > > > > > > > >>> > > > paramIterable1,
> >> > > > > > > > > > > > >>> > > > Collector<Integer> paramCollector)
> throws
> >> > > > > Exception {
> >> > > > > > > > > > > > >>> > > > long exactSizeIfKnown =
> >> > > > > > > > > > > > >>> > >
> >> > > paramIterable.spliterator().getExactSizeIfKnown();
> >> > > > > > > > > > > > >>> > > > long exactSizeIfKnown2 =
> >> > > > > > > > > > > > >>> > > >
> >> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> >> > > > > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > paramCollector.collect(paramIterable1.iterator().next());
> >> > > > > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > paramCollector.collect(paramIterable.iterator().next());
> >> > > > > > > > > > > > >>> > > > }
> >> > > > > > > > > > > > >>> > > > }
> >> > > > > > > > > > > > >>> > > > }).print();
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > Regards,
> >> > > > > > > > > > > > >>> > > > Vinay Patil
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay
> >> Patil
> >> > <
> >> > > > > > > > > > > > >>> vinay18.patil@gmail.com>
> >> > > > > > > > > > > > >>> > > > wrote:
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > > > > You are right, debugged it for all
> >> elements
> >> > > , I
> >> > > > > can
> >> > > > > > > do
> >> > > > > > > > > that
> >> > > > > > > > > > > > now.
> >> > > > > > > > > > > > >>> > > > > Thanks a lot.
> >> > > > > > > > > > > > >>> > > > >
> >> > > > > > > > > > > > >>> > > > > Regards,
> >> > > > > > > > > > > > >>> > > > > Vinay Patil
> >> > > > > > > > > > > > >>> > > > >
> >> > > > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark
> >> Wu <
> >> > > > > > > > > > > > >>> > wuchong.wc@alibaba-inc.com>
> >> > > > > > > > > > > > >>> > > > > wrote:
> >> > > > > > > > > > > > >>> > > > >
> >> > > > > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
> >> > > > > > > Iterable<Integer>
> >> > > > > > > > > > > iter2,
> >> > > > > > > > > > > > >>> > > > >> Collector<Integer> out)` ,   when
> both
> >> > iter1
> >> > > > and
> >> > > > > > > iter2
> >> > > > > > > > > are
> >> > > > > > > > > > > not
> >> > > > > > > > > > > > >>> > empty,
> >> > > > > > > > > > > > >>> > > it
> >> > > > > > > > > > > > >>> > > > >> means they are matched elements from
> >> both
> >> > > > > stream.
> >> > > > > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty
> ,
> >> it
> >> > > > means
> >> > > > > > that
> >> > > > > > > > > they
> >> > > > > > > > > > > are
> >> > > > > > > > > > > > >>> > > unmatched.
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >> - Jark Wu (wuchong)
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> >> > > > > > > > > > vinay18.patil@gmail.com
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > >>> 写道:
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > Hi Matthias ,
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > I did not get you, even if we use
> >> > Co-Group
> >> > > > we
> >> > > > > > have
> >> > > > > > > > to
> >> > > > > > > > > > > apply
> >> > > > > > > > > > > > >>> it on
> >> > > > > > > > > > > > >>> > a
> >> > > > > > > > > > > > >>> > > > key
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> >> > > > > > > > > > > > >>> > > > >> > .where(new ElementSelector())
> >> > > > > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector())
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> >> > > > > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer,
> >> > > Integer,
> >> > > > > > > > > Integer>()
> >> > > > > > > > > > {
> >> > > > > > > > > > > > >>> > > > >> > private static final long
> >> > > serialVersionUID =
> >> > > > > > > > > > > > >>> 6408179761497497475L;
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > @Override
> >> > > > > > > > > > > > >>> > > > >> > public void
> coGroup(Iterable<Integer>
> >> > > > > > > paramIterable,
> >> > > > > > > > > > > > >>> > > Iterable<Integer>
> >> > > > > > > > > > > > >>> > > > >> > paramIterable1,
> >> > > > > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector)
> >> > throws
> >> > > > > > > Exception
> >> > > > > > > > {
> >> > > > > > > > > > > > >>> > > > >> > Iterator<Integer> iterator =
> >> > > > > > > > paramIterable.iterator();
> >> > > > > > > > > > > > >>> > > > >> > while(iterator.hasNext()) {
> >> > > > > > > > > > > > >>> > > > >> > }
> >> > > > > > > > > > > > >>> > > > >> > }
> >> > > > > > > > > > > > >>> > > > >> > });
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > when I debug this ,only the matched
> >> > > element
> >> > > > > from
> >> > > > > > > > both
> >> > > > > > > > > > > stream
> >> > > > > > > > > > > > >>> will
> >> > > > > > > > > > > > >>> > > come
> >> > > > > > > > > > > > >>> > > > >> in
> >> > > > > > > > > > > > >>> > > > >> > the coGroup function.
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > What I want is how do I check for
> >> > > unmatched
> >> > > > > > > elements
> >> > > > > > > > > > from
> >> > > > > > > > > > > > both
> >> > > > > > > > > > > > >>> > > streams
> >> > > > > > > > > > > > >>> > > > >> and
> >> > > > > > > > > > > > >>> > > > >> > write it to sink.
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > Regards,
> >> > > > > > > > > > > > >>> > > > >> > Vinay Patil
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > *+91-800-728-4749*
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM,
> >> > Matthias
> >> > > J.
> >> > > > > > Sax <
> >> > > > > > > > > > > > >>> > mjsax@apache.org>
> >> > > > > > > > > > > > >>> > > > >> wrote:
> >> > > > > > > > > > > > >>> > > > >> >
> >> > > > > > > > > > > > >>> > > > >> >> You need to do an outer-join.
> >> However,
> >> > > > there
> >> > > > > is
> >> > > > > > > no
> >> > > > > > > > > > > build-in
> >> > > > > > > > > > > > >>> > support
> >> > > > > > > > > > > > >>> > > > for
> >> > > > > > > > > > > > >>> > > > >> >> outer-joins yet.
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to
> >> implement
> >> > > the
> >> > > > > > > > > outer-join
> >> > > > > > > > > > as
> >> > > > > > > > > > > > an
> >> > > > > > > > > > > > >>> own
> >> > > > > > > > > > > > >>> > > > >> operator.
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >> -Matthias
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay
> Patil
> >> > > wrote:
> >> > > > > > > > > > > > >>> > > > >> >>> Hi,
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> I have a question regarding the
> >> join
> >> > > > > > operation,
> >> > > > > > > > > > consider
> >> > > > > > > > > > > > the
> >> > > > > > > > > > > > >>> > > > following
> >> > > > > > > > > > > > >>> > > > >> >>> dummy example:
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > >
> >> > > > >
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> >> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
> >> > sourceStream =
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > env.fromElements(10,20,23,25,30,33,102,18);
> >> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
> >> destStream =
> >> > > > > > > > > > > > >>> > > > >> >>
> env.fromElements(20,30,40,50,60,10);
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> >> > > > > > > > > > > > >>> > > > >> >>> .where(new ElementSelector())
> >> > > > > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > >
> >> > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> >> > > > > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer,
> >> > > Integer,
> >> > > > > > > > > Integer>() {
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> private static final long
> >> > > > serialVersionUID =
> >> > > > > > 1L;
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> @Override
> >> > > > > > > > > > > > >>> > > > >> >>> public Integer join(Integer
> >> paramIN1,
> >> > > > > Integer
> >> > > > > > > > > > paramIN2)
> >> > > > > > > > > > > > >>> throws
> >> > > > > > > > > > > > >>> > > > >> Exception
> >> > > > > > > > > > > > >>> > > > >> >> {
> >> > > > > > > > > > > > >>> > > > >> >>> return paramIN1;
> >> > > > > > > > > > > > >>> > > > >> >>> }
> >> > > > > > > > > > > > >>> > > > >> >>> }).print();
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> I perfectly get the elements that
> >> are
> >> > > > > matching
> >> > > > > > > in
> >> > > > > > > > > both
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > >>> > > streams,
> >> > > > > > > > > > > > >>> > > > >> >> however
> >> > > > > > > > > > > > >>> > > > >> >>> my requirement is to write these
> >> > matched
> >> > > > > > > elements
> >> > > > > > > > > and
> >> > > > > > > > > > > also
> >> > > > > > > > > > > > >>> the
> >> > > > > > > > > > > > >>> > > > >> unmatched
> >> > > > > > > > > > > > >>> > > > >> >>> elements to sink(S3)
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> How do I get the unmatched
> elements
> >> > from
> >> > > > > each
> >> > > > > > > > > stream ?
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>> Regards,
> >> > > > > > > > > > > > >>> > > > >> >>> Vinay Patil
> >> > > > > > > > > > > > >>> > > > >> >>>
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >> >>
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >>
> >> > > > > > > > > > > > >>> > > > >
> >> > > > > > > > > > > > >>> > > >
> >> > > > > > > > > > > > >>> > >
> >> > > > > > > > > > > > >>> >
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: [Discussion] Query regarding Join and Windows

Posted by Vinay Patil <vi...@gmail.com>.
Hi,

Just watched the video on Robust Stream Processing .
So when we say Window is a stateful operator , does it mean that even if
the task manager doing the window operation fails,  will it pick up from
the state left earlier when it comes up ? (Have not read more on state for
now)


Also in one of our project when we deploy on cluster and check the Job
Graph , everything is shown in one box , why this happens ? Is it because
of chaining of streams ?
So the box here represent the function flow, right ?



Regards,
Vinay Patil

On Thu, Jun 30, 2016 at 7:29 PM, Vinay Patil <vi...@gmail.com>
wrote:

> Hi Aljoscha,
>
> Just wanted to check if it works with it.
> Anyways to solve the problem what we have thought of is to push heartbeat
> message to Kafka after certain interval, so that we get continuous stream
> always and that edge case will never occur, right ?
>
> One more question I have regarding the failover case :
> Lets say I have a window of 10 secs , and in that there are e0 to en
> elements , what if during this time node goes down ?
> When the node comes up will it resume from the same state or will it
> resume from the last checkpointed state ?
>
> Can we explicitly checkpoint inside the window , may be at the start of
> the window or before we are applying window ?
>
>
> Regards,
> Vinay Patil
>
> On Thu, Jun 30, 2016 at 2:11 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> I think the problem is that the DeltaFunction needs to have this
>> signature:
>>
>> DeltaFunction<CoGroupedStreams.TaggedUnion<Tuple2<String,DTO>,
>> Tuple2<String,DTO>>>
>>
>> because the Trigger will see elements from both input streams which are
>> represented as a TaggedUnion that can contain an element from either side.
>>
>> May I ask why you want to use the DeltaTrigger?
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 29 Jun 2016 at 19:06 Vinay Patil <vi...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > Yes , now I am getting clear with the concepts here.
>> > One last thing I want to try before going for custom trigger, I want to
>> try
>> > Delta Trigger but I am not able to get the syntax right , this is how I
>> am
>> > trying it :
>> >
>> > TypeInformation<Tuple2<String, DTO>> typeInfo = TypeInformation.of(new
>> > TypeHint<Tuple2<String, DTO>>() {
>> > });
>> > // source and destStream : Tuple2<String,DTO>
>> > sourceStream.coGroup(destStream).where(new
>> ElementSelector()).equalTo(new
>> > ElementSelector())
>> > .window(TumblingTimeEventWindows.of(Time.seconds(10)))
>> > .trigger(DeltaTrigger.of(triggerMeters,
>> > new DeltaFunction<Tuple2<String,DTO>>() {
>> > private static final long serialVersionUID = 1L;
>> >
>> > @Override
>> > public double getDelta(
>> > Tuple2<String,DTO> oldDataPoint,
>> > Tuple2<String,DTO> newDataPoint) {
>> > return <some_val>;
>> > }
>> > }, typeInfo.createSerializer(env.getConfig()).apply(new JoinStreams());
>> >
>> > I am getting error cannot convert from DeltaTrigger to Trigger<? super
>> > CoGroupedStreams...
>> > What am I doing wrong here, I have referred the sample example.
>> >
>> > Regards,
>> > Vinay Patil
>> >
>> > On Wed, Jun 29, 2016 at 7:15 PM, Aljoscha Krettek <al...@apache.org>
>> > wrote:
>> >
>> > > Hi,
>> > > you can use ingestion time if you don't care about the timestamps in
>> your
>> > > events, yes. If elements from the two streams happen to arrive at such
>> > > times that they are not put into the same window then you won't get a
>> > > match, correct.
>> > >
>> > > Regarding ingestion time and out-of-order events. I think this section
>> > just
>> > > reiterates that when using ingestion time the inherent timestamps in
>> your
>> > > events will not be considered and their order will not be respected.
>> > >
>> > > Regarding late data: right now, Flink always processes late data and
>> it
>> > is
>> > > up to the Trigger to decide what to do with late data. You can
>> implement
>> > > your custom trigger based on EventTimeTrigger that would immediately
>> > purge
>> > > a window when an element arrives that is later than an allowed amount
>> of
>> > > lateness. In Flink 1.1 we will introduce a setting for windows that
>> > allows
>> > > to specify an allowed lateness. With this, late elements will be
>> dropped
>> > > automatically. This feature is already available in the master, by the
>> > way.
>> > >
>> > > Cheers,
>> > > Aljoscha
>> > >
>> > > On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vi...@gmail.com>
>> > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > Ok.
>> > > > Inside the checkAndGetNextWatermark(lastElement, extractedTimestamp)
>> > > method
>> > > > both these parameters are coming same (timestamp value) , I was
>> > expecting
>> > > > last element timestamp value in the 1st param when I extract it.
>> > > >
>> > > > Lets say I decide to use IngestionTime (since I am getting accurate
>> > > results
>> > > > here for now), if the joining element of both streams are assigned
>> to
>> > > > different windows , then it that case I will not get the match ,
>> right
>> > ?
>> > > >
>> > > > However in case of event time this guarantees to be in the same
>> window
>> > > > since we are assigning the timestamp, correct me here.
>> > > >
>> > > >  According to documentation :
>> > > > * Ingestion Time programs cannot handle any out-of-order events or
>> late
>> > > > data*
>> > > >
>> > > > In this context What do we mean by out-of-order events How does it
>> know
>> > > > that the events are out of order, I mean on which parameter does it
>> > > decide
>> > > > that the events are out-of-order  ? As in case of event time we can
>> say
>> > > the
>> > > > timestamps received are out of order.
>> > > >
>> > > > Late Data : does it have a threshold after which it does not accept
>> > late
>> > > > data ?
>> > > >
>> > > >
>> > > > Regards,
>> > > > Vinay Patil
>> > > >
>> > > > On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <
>> aljoscha@apache.org
>> > >
>> > > > wrote:
>> > > >
>> > > > > Hi,
>> > > > > the element will be kept around indefinitely if no new watermark
>> > > arrives.
>> > > > >
>> > > > > I think the same problem will persist for
>> > > > AssignerWithPunctuatedWatermarks
>> > > > > since there you also might not get the required "last watermark"
>> to
>> > > > trigger
>> > > > > processing of the last window.
>> > > > >
>> > > > > Cheers,
>> > > > > Aljoscha
>> > > > >
>> > > > > On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vinay18.patil@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > > > Hi Aljoscha,
>> > > > > >
>> > > > > > This clears a lot of doubts now.
>> > > > > > So now lets say the stream paused for a while or it stops
>> > completely
>> > > on
>> > > > > > Friday , let us assume that the last message did not get
>> processed
>> > > and
>> > > > is
>> > > > > > kept in the internal buffers.
>> > > > > >
>> > > > > > So when the stream starts again on Monday , will it consider the
>> > last
>> > > > > > element that is in the internal buffer for processing ?
>> > > > > >  How much time the internal buffer can hold the data or will it
>> > flush
>> > > > the
>> > > > > > data after a threshold ?
>> > > > > >
>> > > > > > I have tried using AssignerWithPunctuatedWatermarks and
>> generated
>> > the
>> > > > > > watermark for each event, still getting one record less.
>> > > > > >
>> > > > > >
>> > > > > > Regards,
>> > > > > > Vinay Patil
>> > > > > >
>> > > > > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <
>> > > aljoscha@apache.org
>> > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Hi,
>> > > > > > > the reason why the last element might never be emitted is the
>> way
>> > > the
>> > > > > > > ascending timestamp extractor works. I'll try and explain
>> with an
>> > > > > > example.
>> > > > > > >
>> > > > > > > Let's say we have a window size of 2 milliseconds, elements
>> > arrive
>> > > > > > starting
>> > > > > > > with timestamp 0, window begin timestamp is inclusive, end
>> > > timestamp
>> > > > is
>> > > > > > > exclusive:
>> > > > > > >
>> > > > > > > Element 0, Timestamp 0 (at this point the watermark is -1)
>> > > > > > > Element 1, Timestamp 1 (at this point the watermark is 0)
>> > > > > > > Element 2, Timestamp 1 (at this point the watermark is still
>> 0)
>> > > > > > > Element 3, Timestamp 2 (at this point the watermark is 1)
>> > > > > > >
>> > > > > > > now we can process the window (0, 2) because we know from the
>> > > > watermark
>> > > > > > > that no elements can arrive for that window anymore. The
>> window
>> > > > > contains
>> > > > > > > elements 0,1,2
>> > > > > > >
>> > > > > > > Element 4, Timestamp 3 (at this point the watermark is 2)
>> > > > > > > Element 5, Timestamp 4 (at this point the watermark is 3)
>> > > > > > >
>> > > > > > > now we can process window (2, 4). The window contains elements
>> > 3,4.
>> > > > > > >
>> > > > > > > At this point, we have Element 5 sitting in internal buffers
>> for
>> > > > window
>> > > > > > (4,
>> > > > > > > 6) but if we don't receive further elements the watermark will
>> > > never
>> > > > > > > advance and we will never process that window.
>> > > > > > >
>> > > > > > > If, however, we get new elements at some point the watermark
>> > > advances
>> > > > > and
>> > > > > > > we don't have a problem. That's what I meant when I said that
>> you
>> > > > > > shouldn't
>> > > > > > > have a problem if data keeps continuously arriving.
>> > > > > > >
>> > > > > > > Cheers,
>> > > > > > > Aljoscha
>> > > > > > >
>> > > > > > >
>> > > > > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <
>> > vinay18.patil@gmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Aljoscha,
>> > > > > > > >
>> > > > > > > > Thanks a lot for your inputs.
>> > > > > > > >
>> > > > > > > > I still did not get you when you say you will not face this
>> > issue
>> > > > in
>> > > > > > case
>> > > > > > > > of continuous stream, lets consider the following example :
>> > > > > > > > Assume that the stream runs continuously from Monday  to
>> > Friday,
>> > > > and
>> > > > > on
>> > > > > > > > Friday it stops after 5.00 PM , will I still face this
>> issue ?
>> > > > > > > >
>> > > > > > > > I am actually not able to understand how it will differ in
>> real
>> > > > time
>> > > > > > > > streams.
>> > > > > > > >
>> > > > > > > > Regards,
>> > > > > > > > Vinay Patil
>> > > > > > > >
>> > > > > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <
>> > > > > aljoscha@apache.org
>> > > > > > >
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi,
>> > > > > > > > > ingestion time can only be used if you don't care about
>> the
>> > > > > timestamp
>> > > > > > > in
>> > > > > > > > > the elements. So if you have those you should probably use
>> > > event
>> > > > > > time.
>> > > > > > > > >
>> > > > > > > > > If your timestamps really are strictly increasing then the
>> > > > > ascending
>> > > > > > > > > extractor is good. And if you have a continuous stream of
>> > > > incoming
>> > > > > > > > elements
>> > > > > > > > > you will not see the behavior of not getting the last
>> > elements.
>> > > > > > > > >
>> > > > > > > > > By the way, when using Kafka you can also embed the
>> timestamp
>> > > > > > extractor
>> > > > > > > > > directly in the Kafka consumer. This is described here:
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>> > > > > > > > >
>> > > > > > > > > Cheers,
>> > > > > > > > > Aljoscha
>> > > > > > > > >
>> > > > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <
>> > > > vinay18.patil@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Aljoscha,
>> > > > > > > > > >
>> > > > > > > > > > Thank you for your response.
>> > > > > > > > > > So do you suggest to use different approach for
>> extracting
>> > > > > > timestamp
>> > > > > > > > (as
>> > > > > > > > > > given in document) instead of AscendingTimeStamp
>> Extractor
>> > ?
>> > > > > > > > > > Is that the reason I am seeing this unexpected
>> behaviour ?
>> > in
>> > > > > case
>> > > > > > of
>> > > > > > > > > > continuous stream I would not see any data loss ?
>> > > > > > > > > >
>> > > > > > > > > > Also assuming that the records are always going to be in
>> > > order
>> > > > ,
>> > > > > > > which
>> > > > > > > > is
>> > > > > > > > > > the best approach : Ingestion Time or Event Time ?
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Regards,
>> > > > > > > > > > Vinay Patil
>> > > > > > > > > >
>> > > > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
>> > > > > > > aljoscha@apache.org
>> > > > > > > > >
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hi,
>> > > > > > > > > > > first regarding tumbling windows: even if you have 5
>> > minute
>> > > > > > windows
>> > > > > > > > it
>> > > > > > > > > > can
>> > > > > > > > > > > happen that elements that are only seconds apart go
>> into
>> > > > > > different
>> > > > > > > > > > windows.
>> > > > > > > > > > > Consider the following case:
>> > > > > > > > > > >
>> > > > > > > > > > > |                x | x                 |
>> > > > > > > > > > >
>> > > > > > > > > > > These are two 5-mintue windows and the two elements
>> are
>> > > only
>> > > > > > > seconds
>> > > > > > > > > > apart
>> > > > > > > > > > > but go into different windows because windows are
>> aligned
>> > > to
>> > > > > > epoch.
>> > > > > > > > > > >
>> > > > > > > > > > > Now, for the ascending timestamp extractor. The reason
>> > this
>> > > > can
>> > > > > > > > behave
>> > > > > > > > > in
>> > > > > > > > > > > unexpected ways is that it emits a watermark that is
>> > "last
>> > > > > > > timestamp
>> > > > > > > > -
>> > > > > > > > > > 1",
>> > > > > > > > > > > i.e. if it has seen timestamp t it can only emit
>> > watermark
>> > > > t-1
>> > > > > > > > because
>> > > > > > > > > > > there might be other elements with timestamp t
>> arriving.
>> > If
>> > > > you
>> > > > > > > have
>> > > > > > > > a
>> > > > > > > > > > > continuous stream of elements you wouldn't notice
>> this.
>> > > Only
>> > > > in
>> > > > > > > this
>> > > > > > > > > > > constructed example does it become visible.
>> > > > > > > > > > >
>> > > > > > > > > > > Cheers,
>> > > > > > > > > > > Aljoscha
>> > > > > > > > > > >
>> > > > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
>> > > > > > vinay18.patil@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Following is the timestamp I am getting from DTO,
>> here
>> > is
>> > > > the
>> > > > > > > > > timestamp
>> > > > > > > > > > > > difference between the two records :
>> > > > > > > > > > > > 1466115892162154279
>> > > > > > > > > > > > 1466116026233613409
>> > > > > > > > > > > >
>> > > > > > > > > > > > So the time difference is roughly 3 min, even if I
>> > apply
>> > > > the
>> > > > > > > window
>> > > > > > > > > of
>> > > > > > > > > > > 5min
>> > > > > > > > > > > > , I am not getting the last record (last timestamp
>> > value
>> > > > > > above),
>> > > > > > > > > > > > using ascending timestamp extractor for generating
>> the
>> > > > > > timestamp
>> > > > > > > > > > > (assuming
>> > > > > > > > > > > > that the timestamp are always in order)
>> > > > > > > > > > > >
>> > > > > > > > > > > > I was at-least expecting data to reach the co-group
>> > > > function.
>> > > > > > > > > > > > What could be the reason for the data loss ? The
>> data
>> > we
>> > > > are
>> > > > > > > > getting
>> > > > > > > > > is
>> > > > > > > > > > > > critical, hence we cannot afford to loose any data
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > Regards,
>> > > > > > > > > > > > Vinay Patil
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
>> > > > > > > > > vinay18.patil@gmail.com
>> > > > > > > > > > >
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Just an update, when I keep IngestionTime and
>> remove
>> > > the
>> > > > > > > > timestamp
>> > > > > > > > > I
>> > > > > > > > > > am
>> > > > > > > > > > > > > generating, I am getting all the records, but for
>> > Event
>> > > > > Time
>> > > > > > I
>> > > > > > > am
>> > > > > > > > > > > getting
>> > > > > > > > > > > > > one less record, I checked the Time Difference
>> > between
>> > > > two
>> > > > > > > > records,
>> > > > > > > > > > it
>> > > > > > > > > > > > is 3
>> > > > > > > > > > > > > min, I tried keeping the window time to 5 mins,
>> but
>> > > that
>> > > > > even
>> > > > > > > did
>> > > > > > > > > not
>> > > > > > > > > > > > work.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Even when I try assigning timestamp for
>> > IngestionTime,
>> > > I
>> > > > > get
>> > > > > > > one
>> > > > > > > > > > record
>> > > > > > > > > > > > > less, so should I safely use Ingestion Time or is
>> it
>> > > > always
>> > > > > > > > > advisable
>> > > > > > > > > > > to
>> > > > > > > > > > > > > use EventTime ?
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Regards,
>> > > > > > > > > > > > > Vinay Patil
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
>> > > > > > > > > > vinay18.patil@gmail.com>
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >> Hi ,
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> Actually I am only publishing 5 messages each to
>> two
>> > > > > > different
>> > > > > > > > > kafka
>> > > > > > > > > > > > >> topics (using Junit), even if I keep the window
>> to
>> > 500
>> > > > > > seconds
>> > > > > > > > the
>> > > > > > > > > > > > result
>> > > > > > > > > > > > >> is same.
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> I am not understanding why it is not sending the
>> 5th
>> > > > > element
>> > > > > > > to
>> > > > > > > > > > > co-group
>> > > > > > > > > > > > >> operator even when the keys are same.
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> I actually cannot share the actual client code.
>> > > > > > > > > > > > >> But this is what the streams look like :
>> > > > > > > > > > > > >> sourceStream.coGroup(destStream)
>> > > > > > > > > > > > >> here the sourceStream and destStream is actually
>> > > > > > > > > Tuple2<String,DTO>
>> > > > > > > > > > ,
>> > > > > > > > > > > > and
>> > > > > > > > > > > > >> the ElementSelector returns tuple.f0 which is the
>> > key.
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> I am generating the timestamp based on a field
>> from
>> > > the
>> > > > > DTO
>> > > > > > > > which
>> > > > > > > > > is
>> > > > > > > > > > > > >> guaranteed to be in order.
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> Will using the triggers help here ?
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> Regards,
>> > > > > > > > > > > > >> Vinay Patil
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> *+91-800-728-4749*
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha
>> Krettek <
>> > > > > > > > > > > aljoscha@apache.org>
>> > > > > > > > > > > > >> wrote:
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >>> Hi,
>> > > > > > > > > > > > >>> what timestamps are you assigning? Is it
>> guaranteed
>> > > > that
>> > > > > > all
>> > > > > > > of
>> > > > > > > > > > them
>> > > > > > > > > > > > >>> would
>> > > > > > > > > > > > >>> fall into the same 30 second window?
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>> The issue with duplicate printing in the
>> > > > ElementSelector
>> > > > > is
>> > > > > > > > > > strange?
>> > > > > > > > > > > > >>> Could
>> > > > > > > > > > > > >>> you post a more complete code example so that I
>> can
>> > > > > > reproduce
>> > > > > > > > the
>> > > > > > > > > > > > >>> problem?
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>> Cheers,
>> > > > > > > > > > > > >>> Aljoscha
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
>> > > > > > > > > vinay18.patil@gmail.com>
>> > > > > > > > > > > > >>> wrote:
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>> > Hi ,
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > I am able to get the matching and non-matching
>> > > > > elements.
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > However when I am unit testing the code , I am
>> > > > getting
>> > > > > > one
>> > > > > > > > > record
>> > > > > > > > > > > > less
>> > > > > > > > > > > > >>> > inside the overriden cogroup function.
>> > > > > > > > > > > > >>> > Testing the following way :
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic
>> > (test1)
>> > > > > > > > > > > > >>> > 2) Insert different 5 messages into local
>> kafka
>> > > topic
>> > > > > > > (test2)
>> > > > > > > > > > > > >>> > 3) Consume 1) and 2) and I have two different
>> > kafka
>> > > > > > > streams
>> > > > > > > > > > > > >>> > 4) Generate ascending timestamp(using Event
>> Time)
>> > > for
>> > > > > > both
>> > > > > > > > > > streams
>> > > > > > > > > > > > and
>> > > > > > > > > > > > >>> > create key(String)
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > Now till 4) I am able to get all the records
>> > > (checked
>> > > > > by
>> > > > > > > > > printing
>> > > > > > > > > > > the
>> > > > > > > > > > > > >>> > stream in text file)
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > However when I send the stream to co-group
>> > > operator,
>> > > > I
>> > > > > am
>> > > > > > > > > > receiving
>> > > > > > > > > > > > one
>> > > > > > > > > > > > >>> > less record, using the following syntax:
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > sourceStream.coGroup(destStream)
>> > > > > > > > > > > > >>> > .where(new ElementSelector())
>> > > > > > > > > > > > >>> > .equalTo(new ElementSelector())
>> > > > > > > > > > > > >>> >
>> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> > > > > > > > > > > > >>> > .apply(new JoinStreams);
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > Also in the Element Selector I have inserted a
>> > > > sysout,
>> > > > > I
>> > > > > > am
>> > > > > > > > > > getting
>> > > > > > > > > > > > 20
>> > > > > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source
>> and
>> > 10
>> > > > for
>> > > > > > > dest
>> > > > > > > > > > > stream)
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > Unable to understand why one record is coming
>> > less
>> > > to
>> > > > > > > > co-group
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > Regards,
>> > > > > > > > > > > > >>> > Vinay Patil
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian
>> Hueske <
>> > > > > > > > > > fhueske@gmail.com>
>> > > > > > > > > > > > >>> wrote:
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>> > > Can you add a flag to each element emitted
>> by
>> > the
>> > > > > > > > > > CoGroupFunction
>> > > > > > > > > > > > >>> that
>> > > > > > > > > > > > >>> > > indicates whether it was joined or not?
>> > > > > > > > > > > > >>> > > Then you can use split to distinguish
>> between
>> > > both
>> > > > > > cases
>> > > > > > > > and
>> > > > > > > > > > > handle
>> > > > > > > > > > > > >>> both
>> > > > > > > > > > > > >>> > > streams differently.
>> > > > > > > > > > > > >>> > >
>> > > > > > > > > > > > >>> > > Best, Fabian
>> > > > > > > > > > > > >>> > >
>> > > > > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
>> > > > > > > > > vinay18.patil@gmail.com
>> > > > > > > > > > >:
>> > > > > > > > > > > > >>> > >
>> > > > > > > > > > > > >>> > > > Hi Jark,
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > > I am able to get the non-matching elements
>> > in a
>> > > > > > stream
>> > > > > > > :,
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > > Of-course we can collect the matching
>> > elements
>> > > in
>> > > > > the
>> > > > > > > > same
>> > > > > > > > > > > stream
>> > > > > > > > > > > > >>> as
>> > > > > > > > > > > > >>> > > well,
>> > > > > > > > > > > > >>> > > > however I want to perform additional
>> > operations
>> > > > on
>> > > > > > the
>> > > > > > > > > joined
>> > > > > > > > > > > > >>> stream
>> > > > > > > > > > > > >>> > > before
>> > > > > > > > > > > > >>> > > > writing it to S3, so I would have to
>> include
>> > a
>> > > > > > separate
>> > > > > > > > > join
>> > > > > > > > > > > > >>> operator
>> > > > > > > > > > > > >>> > for
>> > > > > > > > > > > > >>> > > > the same two streams, right ?
>> > > > > > > > > > > > >>> > > > Correct me if I am wrong.
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > > I have pasted the dummy code which
>> collects
>> > the
>> > > > > > > > > non-matching
>> > > > > > > > > > > > >>> records (i
>> > > > > > > > > > > > >>> > > > have to perform this on the actual data,
>> > > correct
>> > > > me
>> > > > > > if
>> > > > > > > I
>> > > > > > > > am
>> > > > > > > > > > > dong
>> > > > > > > > > > > > >>> > wrong).
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
>> > > > > > > > > > > > >>> > ElementSelector()).equalTo(new
>> > > > > > > > > > > > >>> > > > ElementSelector())
>> > > > > > > > > > > > >>> > > >
>> > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> > > > > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer,
>> Integer,
>> > > > > > > Integer>() {
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > > private static final long
>> serialVersionUID =
>> > > > > > > > > > > > 6408179761497497475L;
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > > @Override
>> > > > > > > > > > > > >>> > > > public void coGroup(Iterable<Integer>
>> > > > > paramIterable,
>> > > > > > > > > > > > >>> Iterable<Integer>
>> > > > > > > > > > > > >>> > > > paramIterable1,
>> > > > > > > > > > > > >>> > > > Collector<Integer> paramCollector) throws
>> > > > > Exception {
>> > > > > > > > > > > > >>> > > > long exactSizeIfKnown =
>> > > > > > > > > > > > >>> > >
>> > > paramIterable.spliterator().getExactSizeIfKnown();
>> > > > > > > > > > > > >>> > > > long exactSizeIfKnown2 =
>> > > > > > > > > > > > >>> > > >
>> > > > paramIterable1.spliterator().getExactSizeIfKnown();
>> > > > > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
>> > > > > > > > > > > > >>> > > >
>> > > > > > > paramCollector.collect(paramIterable1.iterator().next());
>> > > > > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
>> > > > > > > > > > > > >>> > > >
>> > > > > > > paramCollector.collect(paramIterable.iterator().next());
>> > > > > > > > > > > > >>> > > > }
>> > > > > > > > > > > > >>> > > > }
>> > > > > > > > > > > > >>> > > > }).print();
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > > Regards,
>> > > > > > > > > > > > >>> > > > Vinay Patil
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay
>> Patil
>> > <
>> > > > > > > > > > > > >>> vinay18.patil@gmail.com>
>> > > > > > > > > > > > >>> > > > wrote:
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > > > > You are right, debugged it for all
>> elements
>> > > , I
>> > > > > can
>> > > > > > > do
>> > > > > > > > > that
>> > > > > > > > > > > > now.
>> > > > > > > > > > > > >>> > > > > Thanks a lot.
>> > > > > > > > > > > > >>> > > > >
>> > > > > > > > > > > > >>> > > > > Regards,
>> > > > > > > > > > > > >>> > > > > Vinay Patil
>> > > > > > > > > > > > >>> > > > >
>> > > > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark
>> Wu <
>> > > > > > > > > > > > >>> > wuchong.wc@alibaba-inc.com>
>> > > > > > > > > > > > >>> > > > > wrote:
>> > > > > > > > > > > > >>> > > > >
>> > > > > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
>> > > > > > > Iterable<Integer>
>> > > > > > > > > > > iter2,
>> > > > > > > > > > > > >>> > > > >> Collector<Integer> out)` ,   when both
>> > iter1
>> > > > and
>> > > > > > > iter2
>> > > > > > > > > are
>> > > > > > > > > > > not
>> > > > > > > > > > > > >>> > empty,
>> > > > > > > > > > > > >>> > > it
>> > > > > > > > > > > > >>> > > > >> means they are matched elements from
>> both
>> > > > > stream.
>> > > > > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty ,
>> it
>> > > > means
>> > > > > > that
>> > > > > > > > > they
>> > > > > > > > > > > are
>> > > > > > > > > > > > >>> > > unmatched.
>> > > > > > > > > > > > >>> > > > >>
>> > > > > > > > > > > > >>> > > > >>
>> > > > > > > > > > > > >>> > > > >> - Jark Wu (wuchong)
>> > > > > > > > > > > > >>> > > > >>
>> > > > > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
>> > > > > > > > > > vinay18.patil@gmail.com
>> > > > > > > > > > > >
>> > > > > > > > > > > > >>> 写道:
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> > Hi Matthias ,
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> > I did not get you, even if we use
>> > Co-Group
>> > > > we
>> > > > > > have
>> > > > > > > > to
>> > > > > > > > > > > apply
>> > > > > > > > > > > > >>> it on
>> > > > > > > > > > > > >>> > a
>> > > > > > > > > > > > >>> > > > key
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
>> > > > > > > > > > > > >>> > > > >> > .where(new ElementSelector())
>> > > > > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector())
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
>> > > > > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer,
>> > > Integer,
>> > > > > > > > > Integer>()
>> > > > > > > > > > {
>> > > > > > > > > > > > >>> > > > >> > private static final long
>> > > serialVersionUID =
>> > > > > > > > > > > > >>> 6408179761497497475L;
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> > @Override
>> > > > > > > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer>
>> > > > > > > paramIterable,
>> > > > > > > > > > > > >>> > > Iterable<Integer>
>> > > > > > > > > > > > >>> > > > >> > paramIterable1,
>> > > > > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector)
>> > throws
>> > > > > > > Exception
>> > > > > > > > {
>> > > > > > > > > > > > >>> > > > >> > Iterator<Integer> iterator =
>> > > > > > > > paramIterable.iterator();
>> > > > > > > > > > > > >>> > > > >> > while(iterator.hasNext()) {
>> > > > > > > > > > > > >>> > > > >> > }
>> > > > > > > > > > > > >>> > > > >> > }
>> > > > > > > > > > > > >>> > > > >> > });
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> > when I debug this ,only the matched
>> > > element
>> > > > > from
>> > > > > > > > both
>> > > > > > > > > > > stream
>> > > > > > > > > > > > >>> will
>> > > > > > > > > > > > >>> > > come
>> > > > > > > > > > > > >>> > > > >> in
>> > > > > > > > > > > > >>> > > > >> > the coGroup function.
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> > What I want is how do I check for
>> > > unmatched
>> > > > > > > elements
>> > > > > > > > > > from
>> > > > > > > > > > > > both
>> > > > > > > > > > > > >>> > > streams
>> > > > > > > > > > > > >>> > > > >> and
>> > > > > > > > > > > > >>> > > > >> > write it to sink.
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> > Regards,
>> > > > > > > > > > > > >>> > > > >> > Vinay Patil
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> > *+91-800-728-4749*
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM,
>> > Matthias
>> > > J.
>> > > > > > Sax <
>> > > > > > > > > > > > >>> > mjsax@apache.org>
>> > > > > > > > > > > > >>> > > > >> wrote:
>> > > > > > > > > > > > >>> > > > >> >
>> > > > > > > > > > > > >>> > > > >> >> You need to do an outer-join.
>> However,
>> > > > there
>> > > > > is
>> > > > > > > no
>> > > > > > > > > > > build-in
>> > > > > > > > > > > > >>> > support
>> > > > > > > > > > > > >>> > > > for
>> > > > > > > > > > > > >>> > > > >> >> outer-joins yet.
>> > > > > > > > > > > > >>> > > > >> >>
>> > > > > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to
>> implement
>> > > the
>> > > > > > > > > outer-join
>> > > > > > > > > > as
>> > > > > > > > > > > > an
>> > > > > > > > > > > > >>> own
>> > > > > > > > > > > > >>> > > > >> operator.
>> > > > > > > > > > > > >>> > > > >> >>
>> > > > > > > > > > > > >>> > > > >> >>
>> > > > > > > > > > > > >>> > > > >> >> -Matthias
>> > > > > > > > > > > > >>> > > > >> >>
>> > > > > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil
>> > > wrote:
>> > > > > > > > > > > > >>> > > > >> >>> Hi,
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> > > > >> >>> I have a question regarding the
>> join
>> > > > > > operation,
>> > > > > > > > > > consider
>> > > > > > > > > > > > the
>> > > > > > > > > > > > >>> > > > following
>> > > > > > > > > > > > >>> > > > >> >>> dummy example:
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment();
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> >
>> > > > > > > > > >
>> > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
>> > sourceStream =
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > env.fromElements(10,20,23,25,30,33,102,18);
>> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
>> destStream =
>> > > > > > > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
>> > > > > > > > > > > > >>> > > > >> >>> .where(new ElementSelector())
>> > > > > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > >
>> > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
>> > > > > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer,
>> > > Integer,
>> > > > > > > > > Integer>() {
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> > > > >> >>> private static final long
>> > > > serialVersionUID =
>> > > > > > 1L;
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> > > > >> >>> @Override
>> > > > > > > > > > > > >>> > > > >> >>> public Integer join(Integer
>> paramIN1,
>> > > > > Integer
>> > > > > > > > > > paramIN2)
>> > > > > > > > > > > > >>> throws
>> > > > > > > > > > > > >>> > > > >> Exception
>> > > > > > > > > > > > >>> > > > >> >> {
>> > > > > > > > > > > > >>> > > > >> >>> return paramIN1;
>> > > > > > > > > > > > >>> > > > >> >>> }
>> > > > > > > > > > > > >>> > > > >> >>> }).print();
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> > > > >> >>> I perfectly get the elements that
>> are
>> > > > > matching
>> > > > > > > in
>> > > > > > > > > both
>> > > > > > > > > > > the
>> > > > > > > > > > > > >>> > > streams,
>> > > > > > > > > > > > >>> > > > >> >> however
>> > > > > > > > > > > > >>> > > > >> >>> my requirement is to write these
>> > matched
>> > > > > > > elements
>> > > > > > > > > and
>> > > > > > > > > > > also
>> > > > > > > > > > > > >>> the
>> > > > > > > > > > > > >>> > > > >> unmatched
>> > > > > > > > > > > > >>> > > > >> >>> elements to sink(S3)
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> > > > >> >>> How do I get the unmatched elements
>> > from
>> > > > > each
>> > > > > > > > > stream ?
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> > > > >> >>> Regards,
>> > > > > > > > > > > > >>> > > > >> >>> Vinay Patil
>> > > > > > > > > > > > >>> > > > >> >>>
>> > > > > > > > > > > > >>> > > > >> >>
>> > > > > > > > > > > > >>> > > > >> >>
>> > > > > > > > > > > > >>> > > > >>
>> > > > > > > > > > > > >>> > > > >>
>> > > > > > > > > > > > >>> > > > >
>> > > > > > > > > > > > >>> > > >
>> > > > > > > > > > > > >>> > >
>> > > > > > > > > > > > >>> >
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: [Discussion] Query regarding Join and Windows

Posted by Vinay Patil <vi...@gmail.com>.
Hi Aljoscha,

Just wanted to check if it works with it.
Anyways to solve the problem what we have thought of is to push heartbeat
message to Kafka after certain interval, so that we get continuous stream
always and that edge case will never occur, right ?

One more question I have regarding the failover case :
Lets say I have a window of 10 secs , and in that there are e0 to en
elements , what if during this time node goes down ?
When the node comes up will it resume from the same state or will it resume
from the last checkpointed state ?

Can we explicitly checkpoint inside the window , may be at the start of the
window or before we are applying window ?


Regards,
Vinay Patil

On Thu, Jun 30, 2016 at 2:11 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I think the problem is that the DeltaFunction needs to have this signature:
>
> DeltaFunction<CoGroupedStreams.TaggedUnion<Tuple2<String,DTO>,
> Tuple2<String,DTO>>>
>
> because the Trigger will see elements from both input streams which are
> represented as a TaggedUnion that can contain an element from either side.
>
> May I ask why you want to use the DeltaTrigger?
>
> Cheers,
> Aljoscha
>
> On Wed, 29 Jun 2016 at 19:06 Vinay Patil <vi...@gmail.com> wrote:
>
> > Hi,
> >
> > Yes , now I am getting clear with the concepts here.
> > One last thing I want to try before going for custom trigger, I want to
> try
> > Delta Trigger but I am not able to get the syntax right , this is how I
> am
> > trying it :
> >
> > TypeInformation<Tuple2<String, DTO>> typeInfo = TypeInformation.of(new
> > TypeHint<Tuple2<String, DTO>>() {
> > });
> > // source and destStream : Tuple2<String,DTO>
> > sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new
> > ElementSelector())
> > .window(TumblingTimeEventWindows.of(Time.seconds(10)))
> > .trigger(DeltaTrigger.of(triggerMeters,
> > new DeltaFunction<Tuple2<String,DTO>>() {
> > private static final long serialVersionUID = 1L;
> >
> > @Override
> > public double getDelta(
> > Tuple2<String,DTO> oldDataPoint,
> > Tuple2<String,DTO> newDataPoint) {
> > return <some_val>;
> > }
> > }, typeInfo.createSerializer(env.getConfig()).apply(new JoinStreams());
> >
> > I am getting error cannot convert from DeltaTrigger to Trigger<? super
> > CoGroupedStreams...
> > What am I doing wrong here, I have referred the sample example.
> >
> > Regards,
> > Vinay Patil
> >
> > On Wed, Jun 29, 2016 at 7:15 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Hi,
> > > you can use ingestion time if you don't care about the timestamps in
> your
> > > events, yes. If elements from the two streams happen to arrive at such
> > > times that they are not put into the same window then you won't get a
> > > match, correct.
> > >
> > > Regarding ingestion time and out-of-order events. I think this section
> > just
> > > reiterates that when using ingestion time the inherent timestamps in
> your
> > > events will not be considered and their order will not be respected.
> > >
> > > Regarding late data: right now, Flink always processes late data and it
> > is
> > > up to the Trigger to decide what to do with late data. You can
> implement
> > > your custom trigger based on EventTimeTrigger that would immediately
> > purge
> > > a window when an element arrives that is later than an allowed amount
> of
> > > lateness. In Flink 1.1 we will introduce a setting for windows that
> > allows
> > > to specify an allowed lateness. With this, late elements will be
> dropped
> > > automatically. This feature is already available in the master, by the
> > way.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vi...@gmail.com>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > Ok.
> > > > Inside the checkAndGetNextWatermark(lastElement, extractedTimestamp)
> > > method
> > > > both these parameters are coming same (timestamp value) , I was
> > expecting
> > > > last element timestamp value in the 1st param when I extract it.
> > > >
> > > > Lets say I decide to use IngestionTime (since I am getting accurate
> > > results
> > > > here for now), if the joining element of both streams are assigned to
> > > > different windows , then it that case I will not get the match ,
> right
> > ?
> > > >
> > > > However in case of event time this guarantees to be in the same
> window
> > > > since we are assigning the timestamp, correct me here.
> > > >
> > > >  According to documentation :
> > > > * Ingestion Time programs cannot handle any out-of-order events or
> late
> > > > data*
> > > >
> > > > In this context What do we mean by out-of-order events How does it
> know
> > > > that the events are out of order, I mean on which parameter does it
> > > decide
> > > > that the events are out-of-order  ? As in case of event time we can
> say
> > > the
> > > > timestamps received are out of order.
> > > >
> > > > Late Data : does it have a threshold after which it does not accept
> > late
> > > > data ?
> > > >
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > the element will be kept around indefinitely if no new watermark
> > > arrives.
> > > > >
> > > > > I think the same problem will persist for
> > > > AssignerWithPunctuatedWatermarks
> > > > > since there you also might not get the required "last watermark" to
> > > > trigger
> > > > > processing of the last window.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vi...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > This clears a lot of doubts now.
> > > > > > So now lets say the stream paused for a while or it stops
> > completely
> > > on
> > > > > > Friday , let us assume that the last message did not get
> processed
> > > and
> > > > is
> > > > > > kept in the internal buffers.
> > > > > >
> > > > > > So when the stream starts again on Monday , will it consider the
> > last
> > > > > > element that is in the internal buffer for processing ?
> > > > > >  How much time the internal buffer can hold the data or will it
> > flush
> > > > the
> > > > > > data after a threshold ?
> > > > > >
> > > > > > I have tried using AssignerWithPunctuatedWatermarks and generated
> > the
> > > > > > watermark for each event, still getting one record less.
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > > Vinay Patil
> > > > > >
> > > > > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <
> > > aljoscha@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > the reason why the last element might never be emitted is the
> way
> > > the
> > > > > > > ascending timestamp extractor works. I'll try and explain with
> an
> > > > > > example.
> > > > > > >
> > > > > > > Let's say we have a window size of 2 milliseconds, elements
> > arrive
> > > > > > starting
> > > > > > > with timestamp 0, window begin timestamp is inclusive, end
> > > timestamp
> > > > is
> > > > > > > exclusive:
> > > > > > >
> > > > > > > Element 0, Timestamp 0 (at this point the watermark is -1)
> > > > > > > Element 1, Timestamp 1 (at this point the watermark is 0)
> > > > > > > Element 2, Timestamp 1 (at this point the watermark is still 0)
> > > > > > > Element 3, Timestamp 2 (at this point the watermark is 1)
> > > > > > >
> > > > > > > now we can process the window (0, 2) because we know from the
> > > > watermark
> > > > > > > that no elements can arrive for that window anymore. The window
> > > > > contains
> > > > > > > elements 0,1,2
> > > > > > >
> > > > > > > Element 4, Timestamp 3 (at this point the watermark is 2)
> > > > > > > Element 5, Timestamp 4 (at this point the watermark is 3)
> > > > > > >
> > > > > > > now we can process window (2, 4). The window contains elements
> > 3,4.
> > > > > > >
> > > > > > > At this point, we have Element 5 sitting in internal buffers
> for
> > > > window
> > > > > > (4,
> > > > > > > 6) but if we don't receive further elements the watermark will
> > > never
> > > > > > > advance and we will never process that window.
> > > > > > >
> > > > > > > If, however, we get new elements at some point the watermark
> > > advances
> > > > > and
> > > > > > > we don't have a problem. That's what I meant when I said that
> you
> > > > > > shouldn't
> > > > > > > have a problem if data keeps continuously arriving.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > >
> > > > > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <
> > vinay18.patil@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Aljoscha,
> > > > > > > >
> > > > > > > > Thanks a lot for your inputs.
> > > > > > > >
> > > > > > > > I still did not get you when you say you will not face this
> > issue
> > > > in
> > > > > > case
> > > > > > > > of continuous stream, lets consider the following example :
> > > > > > > > Assume that the stream runs continuously from Monday  to
> > Friday,
> > > > and
> > > > > on
> > > > > > > > Friday it stops after 5.00 PM , will I still face this issue
> ?
> > > > > > > >
> > > > > > > > I am actually not able to understand how it will differ in
> real
> > > > time
> > > > > > > > streams.
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Vinay Patil
> > > > > > > >
> > > > > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <
> > > > > aljoscha@apache.org
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > ingestion time can only be used if you don't care about the
> > > > > timestamp
> > > > > > > in
> > > > > > > > > the elements. So if you have those you should probably use
> > > event
> > > > > > time.
> > > > > > > > >
> > > > > > > > > If your timestamps really are strictly increasing then the
> > > > > ascending
> > > > > > > > > extractor is good. And if you have a continuous stream of
> > > > incoming
> > > > > > > > elements
> > > > > > > > > you will not see the behavior of not getting the last
> > elements.
> > > > > > > > >
> > > > > > > > > By the way, when using Kafka you can also embed the
> timestamp
> > > > > > extractor
> > > > > > > > > directly in the Kafka consumer. This is described here:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Aljoscha
> > > > > > > > >
> > > > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <
> > > > vinay18.patil@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Aljoscha,
> > > > > > > > > >
> > > > > > > > > > Thank you for your response.
> > > > > > > > > > So do you suggest to use different approach for
> extracting
> > > > > > timestamp
> > > > > > > > (as
> > > > > > > > > > given in document) instead of AscendingTimeStamp
> Extractor
> > ?
> > > > > > > > > > Is that the reason I am seeing this unexpected behaviour
> ?
> > in
> > > > > case
> > > > > > of
> > > > > > > > > > continuous stream I would not see any data loss ?
> > > > > > > > > >
> > > > > > > > > > Also assuming that the records are always going to be in
> > > order
> > > > ,
> > > > > > > which
> > > > > > > > is
> > > > > > > > > > the best approach : Ingestion Time or Event Time ?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Vinay Patil
> > > > > > > > > >
> > > > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> > > > > > > aljoscha@apache.org
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > > first regarding tumbling windows: even if you have 5
> > minute
> > > > > > windows
> > > > > > > > it
> > > > > > > > > > can
> > > > > > > > > > > happen that elements that are only seconds apart go
> into
> > > > > > different
> > > > > > > > > > windows.
> > > > > > > > > > > Consider the following case:
> > > > > > > > > > >
> > > > > > > > > > > |                x | x                 |
> > > > > > > > > > >
> > > > > > > > > > > These are two 5-mintue windows and the two elements are
> > > only
> > > > > > > seconds
> > > > > > > > > > apart
> > > > > > > > > > > but go into different windows because windows are
> aligned
> > > to
> > > > > > epoch.
> > > > > > > > > > >
> > > > > > > > > > > Now, for the ascending timestamp extractor. The reason
> > this
> > > > can
> > > > > > > > behave
> > > > > > > > > in
> > > > > > > > > > > unexpected ways is that it emits a watermark that is
> > "last
> > > > > > > timestamp
> > > > > > > > -
> > > > > > > > > > 1",
> > > > > > > > > > > i.e. if it has seen timestamp t it can only emit
> > watermark
> > > > t-1
> > > > > > > > because
> > > > > > > > > > > there might be other elements with timestamp t
> arriving.
> > If
> > > > you
> > > > > > > have
> > > > > > > > a
> > > > > > > > > > > continuous stream of elements you wouldn't notice this.
> > > Only
> > > > in
> > > > > > > this
> > > > > > > > > > > constructed example does it become visible.
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > > Aljoscha
> > > > > > > > > > >
> > > > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> > > > > > vinay18.patil@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi,
> > > > > > > > > > > >
> > > > > > > > > > > > Following is the timestamp I am getting from DTO,
> here
> > is
> > > > the
> > > > > > > > > timestamp
> > > > > > > > > > > > difference between the two records :
> > > > > > > > > > > > 1466115892162154279
> > > > > > > > > > > > 1466116026233613409
> > > > > > > > > > > >
> > > > > > > > > > > > So the time difference is roughly 3 min, even if I
> > apply
> > > > the
> > > > > > > window
> > > > > > > > > of
> > > > > > > > > > > 5min
> > > > > > > > > > > > , I am not getting the last record (last timestamp
> > value
> > > > > > above),
> > > > > > > > > > > > using ascending timestamp extractor for generating
> the
> > > > > > timestamp
> > > > > > > > > > > (assuming
> > > > > > > > > > > > that the timestamp are always in order)
> > > > > > > > > > > >
> > > > > > > > > > > > I was at-least expecting data to reach the co-group
> > > > function.
> > > > > > > > > > > > What could be the reason for the data loss ? The data
> > we
> > > > are
> > > > > > > > getting
> > > > > > > > > is
> > > > > > > > > > > > critical, hence we cannot afford to loose any data
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Regards,
> > > > > > > > > > > > Vinay Patil
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > > > > > > > > vinay18.patil@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Just an update, when I keep IngestionTime and
> remove
> > > the
> > > > > > > > timestamp
> > > > > > > > > I
> > > > > > > > > > am
> > > > > > > > > > > > > generating, I am getting all the records, but for
> > Event
> > > > > Time
> > > > > > I
> > > > > > > am
> > > > > > > > > > > getting
> > > > > > > > > > > > > one less record, I checked the Time Difference
> > between
> > > > two
> > > > > > > > records,
> > > > > > > > > > it
> > > > > > > > > > > > is 3
> > > > > > > > > > > > > min, I tried keeping the window time to 5 mins, but
> > > that
> > > > > even
> > > > > > > did
> > > > > > > > > not
> > > > > > > > > > > > work.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Even when I try assigning timestamp for
> > IngestionTime,
> > > I
> > > > > get
> > > > > > > one
> > > > > > > > > > record
> > > > > > > > > > > > > less, so should I safely use Ingestion Time or is
> it
> > > > always
> > > > > > > > > advisable
> > > > > > > > > > > to
> > > > > > > > > > > > > use EventTime ?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regards,
> > > > > > > > > > > > > Vinay Patil
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > > > > > > > > > vinay18.patil@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > >> Hi ,
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Actually I am only publishing 5 messages each to
> two
> > > > > > different
> > > > > > > > > kafka
> > > > > > > > > > > > >> topics (using Junit), even if I keep the window to
> > 500
> > > > > > seconds
> > > > > > > > the
> > > > > > > > > > > > result
> > > > > > > > > > > > >> is same.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> I am not understanding why it is not sending the
> 5th
> > > > > element
> > > > > > > to
> > > > > > > > > > > co-group
> > > > > > > > > > > > >> operator even when the keys are same.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> I actually cannot share the actual client code.
> > > > > > > > > > > > >> But this is what the streams look like :
> > > > > > > > > > > > >> sourceStream.coGroup(destStream)
> > > > > > > > > > > > >> here the sourceStream and destStream is actually
> > > > > > > > > Tuple2<String,DTO>
> > > > > > > > > > ,
> > > > > > > > > > > > and
> > > > > > > > > > > > >> the ElementSelector returns tuple.f0 which is the
> > key.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> I am generating the timestamp based on a field
> from
> > > the
> > > > > DTO
> > > > > > > > which
> > > > > > > > > is
> > > > > > > > > > > > >> guaranteed to be in order.
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Will using the triggers help here ?
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> Regards,
> > > > > > > > > > > > >> Vinay Patil
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> *+91-800-728-4749*
> > > > > > > > > > > > >>
> > > > > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek
> <
> > > > > > > > > > > aljoscha@apache.org>
> > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>> Hi,
> > > > > > > > > > > > >>> what timestamps are you assigning? Is it
> guaranteed
> > > > that
> > > > > > all
> > > > > > > of
> > > > > > > > > > them
> > > > > > > > > > > > >>> would
> > > > > > > > > > > > >>> fall into the same 30 second window?
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> The issue with duplicate printing in the
> > > > ElementSelector
> > > > > is
> > > > > > > > > > strange?
> > > > > > > > > > > > >>> Could
> > > > > > > > > > > > >>> you post a more complete code example so that I
> can
> > > > > > reproduce
> > > > > > > > the
> > > > > > > > > > > > >>> problem?
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> Cheers,
> > > > > > > > > > > > >>> Aljoscha
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> > > > > > > > > vinay18.patil@gmail.com>
> > > > > > > > > > > > >>> wrote:
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>> > Hi ,
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > I am able to get the matching and non-matching
> > > > > elements.
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > However when I am unit testing the code , I am
> > > > getting
> > > > > > one
> > > > > > > > > record
> > > > > > > > > > > > less
> > > > > > > > > > > > >>> > inside the overriden cogroup function.
> > > > > > > > > > > > >>> > Testing the following way :
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic
> > (test1)
> > > > > > > > > > > > >>> > 2) Insert different 5 messages into local kafka
> > > topic
> > > > > > > (test2)
> > > > > > > > > > > > >>> > 3) Consume 1) and 2) and I have two different
> > kafka
> > > > > > > streams
> > > > > > > > > > > > >>> > 4) Generate ascending timestamp(using Event
> Time)
> > > for
> > > > > > both
> > > > > > > > > > streams
> > > > > > > > > > > > and
> > > > > > > > > > > > >>> > create key(String)
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > Now till 4) I am able to get all the records
> > > (checked
> > > > > by
> > > > > > > > > printing
> > > > > > > > > > > the
> > > > > > > > > > > > >>> > stream in text file)
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > However when I send the stream to co-group
> > > operator,
> > > > I
> > > > > am
> > > > > > > > > > receiving
> > > > > > > > > > > > one
> > > > > > > > > > > > >>> > less record, using the following syntax:
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > sourceStream.coGroup(destStream)
> > > > > > > > > > > > >>> > .where(new ElementSelector())
> > > > > > > > > > > > >>> > .equalTo(new ElementSelector())
> > > > > > > > > > > > >>> >
> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > > > > >>> > .apply(new JoinStreams);
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > Also in the Element Selector I have inserted a
> > > > sysout,
> > > > > I
> > > > > > am
> > > > > > > > > > getting
> > > > > > > > > > > > 20
> > > > > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source
> and
> > 10
> > > > for
> > > > > > > dest
> > > > > > > > > > > stream)
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > Unable to understand why one record is coming
> > less
> > > to
> > > > > > > > co-group
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > Regards,
> > > > > > > > > > > > >>> > Vinay Patil
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske
> <
> > > > > > > > > > fhueske@gmail.com>
> > > > > > > > > > > > >>> wrote:
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>> > > Can you add a flag to each element emitted by
> > the
> > > > > > > > > > CoGroupFunction
> > > > > > > > > > > > >>> that
> > > > > > > > > > > > >>> > > indicates whether it was joined or not?
> > > > > > > > > > > > >>> > > Then you can use split to distinguish between
> > > both
> > > > > > cases
> > > > > > > > and
> > > > > > > > > > > handle
> > > > > > > > > > > > >>> both
> > > > > > > > > > > > >>> > > streams differently.
> > > > > > > > > > > > >>> > >
> > > > > > > > > > > > >>> > > Best, Fabian
> > > > > > > > > > > > >>> > >
> > > > > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> > > > > > > > > vinay18.patil@gmail.com
> > > > > > > > > > >:
> > > > > > > > > > > > >>> > >
> > > > > > > > > > > > >>> > > > Hi Jark,
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > > I am able to get the non-matching elements
> > in a
> > > > > > stream
> > > > > > > :,
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > > Of-course we can collect the matching
> > elements
> > > in
> > > > > the
> > > > > > > > same
> > > > > > > > > > > stream
> > > > > > > > > > > > >>> as
> > > > > > > > > > > > >>> > > well,
> > > > > > > > > > > > >>> > > > however I want to perform additional
> > operations
> > > > on
> > > > > > the
> > > > > > > > > joined
> > > > > > > > > > > > >>> stream
> > > > > > > > > > > > >>> > > before
> > > > > > > > > > > > >>> > > > writing it to S3, so I would have to
> include
> > a
> > > > > > separate
> > > > > > > > > join
> > > > > > > > > > > > >>> operator
> > > > > > > > > > > > >>> > for
> > > > > > > > > > > > >>> > > > the same two streams, right ?
> > > > > > > > > > > > >>> > > > Correct me if I am wrong.
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > > I have pasted the dummy code which collects
> > the
> > > > > > > > > non-matching
> > > > > > > > > > > > >>> records (i
> > > > > > > > > > > > >>> > > > have to perform this on the actual data,
> > > correct
> > > > me
> > > > > > if
> > > > > > > I
> > > > > > > > am
> > > > > > > > > > > dong
> > > > > > > > > > > > >>> > wrong).
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > > > > > > > > >>> > ElementSelector()).equalTo(new
> > > > > > > > > > > > >>> > > > ElementSelector())
> > > > > > > > > > > > >>> > > >
> > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer,
> Integer,
> > > > > > > Integer>() {
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > > private static final long serialVersionUID
> =
> > > > > > > > > > > > 6408179761497497475L;
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > > @Override
> > > > > > > > > > > > >>> > > > public void coGroup(Iterable<Integer>
> > > > > paramIterable,
> > > > > > > > > > > > >>> Iterable<Integer>
> > > > > > > > > > > > >>> > > > paramIterable1,
> > > > > > > > > > > > >>> > > > Collector<Integer> paramCollector) throws
> > > > > Exception {
> > > > > > > > > > > > >>> > > > long exactSizeIfKnown =
> > > > > > > > > > > > >>> > >
> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > > > > > > > > > >>> > > > long exactSizeIfKnown2 =
> > > > > > > > > > > > >>> > > >
> > > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > > > > > > > > > > >>> > > >
> > > > > > > paramCollector.collect(paramIterable1.iterator().next());
> > > > > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > > > > > > > > > >>> > > >
> > > > > > > paramCollector.collect(paramIterable.iterator().next());
> > > > > > > > > > > > >>> > > > }
> > > > > > > > > > > > >>> > > > }
> > > > > > > > > > > > >>> > > > }).print();
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > > Regards,
> > > > > > > > > > > > >>> > > > Vinay Patil
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay
> Patil
> > <
> > > > > > > > > > > > >>> vinay18.patil@gmail.com>
> > > > > > > > > > > > >>> > > > wrote:
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > > > > You are right, debugged it for all
> elements
> > > , I
> > > > > can
> > > > > > > do
> > > > > > > > > that
> > > > > > > > > > > > now.
> > > > > > > > > > > > >>> > > > > Thanks a lot.
> > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > >>> > > > > Regards,
> > > > > > > > > > > > >>> > > > > Vinay Patil
> > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark
> Wu <
> > > > > > > > > > > > >>> > wuchong.wc@alibaba-inc.com>
> > > > > > > > > > > > >>> > > > > wrote:
> > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
> > > > > > > Iterable<Integer>
> > > > > > > > > > > iter2,
> > > > > > > > > > > > >>> > > > >> Collector<Integer> out)` ,   when both
> > iter1
> > > > and
> > > > > > > iter2
> > > > > > > > > are
> > > > > > > > > > > not
> > > > > > > > > > > > >>> > empty,
> > > > > > > > > > > > >>> > > it
> > > > > > > > > > > > >>> > > > >> means they are matched elements from
> both
> > > > > stream.
> > > > > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty ,
> it
> > > > means
> > > > > > that
> > > > > > > > > they
> > > > > > > > > > > are
> > > > > > > > > > > > >>> > > unmatched.
> > > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> > > > > > > > > > vinay18.patil@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > >>> 写道:
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> > Hi Matthias ,
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> > I did not get you, even if we use
> > Co-Group
> > > > we
> > > > > > have
> > > > > > > > to
> > > > > > > > > > > apply
> > > > > > > > > > > > >>> it on
> > > > > > > > > > > > >>> > a
> > > > > > > > > > > > >>> > > > key
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > > > > > > > > >>> > > > >> > .where(new ElementSelector())
> > > > > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector())
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer,
> > > Integer,
> > > > > > > > > Integer>()
> > > > > > > > > > {
> > > > > > > > > > > > >>> > > > >> > private static final long
> > > serialVersionUID =
> > > > > > > > > > > > >>> 6408179761497497475L;
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> > @Override
> > > > > > > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer>
> > > > > > > paramIterable,
> > > > > > > > > > > > >>> > > Iterable<Integer>
> > > > > > > > > > > > >>> > > > >> > paramIterable1,
> > > > > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector)
> > throws
> > > > > > > Exception
> > > > > > > > {
> > > > > > > > > > > > >>> > > > >> > Iterator<Integer> iterator =
> > > > > > > > paramIterable.iterator();
> > > > > > > > > > > > >>> > > > >> > while(iterator.hasNext()) {
> > > > > > > > > > > > >>> > > > >> > }
> > > > > > > > > > > > >>> > > > >> > }
> > > > > > > > > > > > >>> > > > >> > });
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> > when I debug this ,only the matched
> > > element
> > > > > from
> > > > > > > > both
> > > > > > > > > > > stream
> > > > > > > > > > > > >>> will
> > > > > > > > > > > > >>> > > come
> > > > > > > > > > > > >>> > > > >> in
> > > > > > > > > > > > >>> > > > >> > the coGroup function.
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> > What I want is how do I check for
> > > unmatched
> > > > > > > elements
> > > > > > > > > > from
> > > > > > > > > > > > both
> > > > > > > > > > > > >>> > > streams
> > > > > > > > > > > > >>> > > > >> and
> > > > > > > > > > > > >>> > > > >> > write it to sink.
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> > Regards,
> > > > > > > > > > > > >>> > > > >> > Vinay Patil
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM,
> > Matthias
> > > J.
> > > > > > Sax <
> > > > > > > > > > > > >>> > mjsax@apache.org>
> > > > > > > > > > > > >>> > > > >> wrote:
> > > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > > >>> > > > >> >> You need to do an outer-join.
> However,
> > > > there
> > > > > is
> > > > > > > no
> > > > > > > > > > > build-in
> > > > > > > > > > > > >>> > support
> > > > > > > > > > > > >>> > > > for
> > > > > > > > > > > > >>> > > > >> >> outer-joins yet.
> > > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to
> implement
> > > the
> > > > > > > > > outer-join
> > > > > > > > > > as
> > > > > > > > > > > > an
> > > > > > > > > > > > >>> own
> > > > > > > > > > > > >>> > > > >> operator.
> > > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > > >>> > > > >> >> -Matthias
> > > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil
> > > wrote:
> > > > > > > > > > > > >>> > > > >> >>> Hi,
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> > > > >> >>> I have a question regarding the join
> > > > > > operation,
> > > > > > > > > > consider
> > > > > > > > > > > > the
> > > > > > > > > > > > >>> > > > following
> > > > > > > > > > > > >>> > > > >> >>> dummy example:
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> >
> > > > > > > > > >
> > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
> > sourceStream =
> > > > > > > > > > > > >>> > > > >> >>>
> > > > env.fromElements(10,20,23,25,30,33,102,18);
> > > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
> destStream =
> > > > > > > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > > > > > > > > > >>> > > > >> >>> .where(new ElementSelector())
> > > > > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >
> > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer,
> > > Integer,
> > > > > > > > > Integer>() {
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> > > > >> >>> private static final long
> > > > serialVersionUID =
> > > > > > 1L;
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> > > > >> >>> @Override
> > > > > > > > > > > > >>> > > > >> >>> public Integer join(Integer
> paramIN1,
> > > > > Integer
> > > > > > > > > > paramIN2)
> > > > > > > > > > > > >>> throws
> > > > > > > > > > > > >>> > > > >> Exception
> > > > > > > > > > > > >>> > > > >> >> {
> > > > > > > > > > > > >>> > > > >> >>> return paramIN1;
> > > > > > > > > > > > >>> > > > >> >>> }
> > > > > > > > > > > > >>> > > > >> >>> }).print();
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> > > > >> >>> I perfectly get the elements that
> are
> > > > > matching
> > > > > > > in
> > > > > > > > > both
> > > > > > > > > > > the
> > > > > > > > > > > > >>> > > streams,
> > > > > > > > > > > > >>> > > > >> >> however
> > > > > > > > > > > > >>> > > > >> >>> my requirement is to write these
> > matched
> > > > > > > elements
> > > > > > > > > and
> > > > > > > > > > > also
> > > > > > > > > > > > >>> the
> > > > > > > > > > > > >>> > > > >> unmatched
> > > > > > > > > > > > >>> > > > >> >>> elements to sink(S3)
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> > > > >> >>> How do I get the unmatched elements
> > from
> > > > > each
> > > > > > > > > stream ?
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> > > > >> >>> Regards,
> > > > > > > > > > > > >>> > > > >> >>> Vinay Patil
> > > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > > >>> > > > >
> > > > > > > > > > > > >>> > > >
> > > > > > > > > > > > >>> > >
> > > > > > > > > > > > >>> >
> > > > > > > > > > > > >>>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >>
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [Discussion] Query regarding Join and Windows

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think the problem is that the DeltaFunction needs to have this signature:

DeltaFunction<CoGroupedStreams.TaggedUnion<Tuple2<String,DTO>,
Tuple2<String,DTO>>>

because the Trigger will see elements from both input streams which are
represented as a TaggedUnion that can contain an element from either side.

May I ask why you want to use the DeltaTrigger?

Cheers,
Aljoscha

On Wed, 29 Jun 2016 at 19:06 Vinay Patil <vi...@gmail.com> wrote:

> Hi,
>
> Yes , now I am getting clear with the concepts here.
> One last thing I want to try before going for custom trigger, I want to try
> Delta Trigger but I am not able to get the syntax right , this is how I am
> trying it :
>
> TypeInformation<Tuple2<String, DTO>> typeInfo = TypeInformation.of(new
> TypeHint<Tuple2<String, DTO>>() {
> });
> // source and destStream : Tuple2<String,DTO>
> sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new
> ElementSelector())
> .window(TumblingTimeEventWindows.of(Time.seconds(10)))
> .trigger(DeltaTrigger.of(triggerMeters,
> new DeltaFunction<Tuple2<String,DTO>>() {
> private static final long serialVersionUID = 1L;
>
> @Override
> public double getDelta(
> Tuple2<String,DTO> oldDataPoint,
> Tuple2<String,DTO> newDataPoint) {
> return <some_val>;
> }
> }, typeInfo.createSerializer(env.getConfig()).apply(new JoinStreams());
>
> I am getting error cannot convert from DeltaTrigger to Trigger<? super
> CoGroupedStreams...
> What am I doing wrong here, I have referred the sample example.
>
> Regards,
> Vinay Patil
>
> On Wed, Jun 29, 2016 at 7:15 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi,
> > you can use ingestion time if you don't care about the timestamps in your
> > events, yes. If elements from the two streams happen to arrive at such
> > times that they are not put into the same window then you won't get a
> > match, correct.
> >
> > Regarding ingestion time and out-of-order events. I think this section
> just
> > reiterates that when using ingestion time the inherent timestamps in your
> > events will not be considered and their order will not be respected.
> >
> > Regarding late data: right now, Flink always processes late data and it
> is
> > up to the Trigger to decide what to do with late data. You can implement
> > your custom trigger based on EventTimeTrigger that would immediately
> purge
> > a window when an element arrives that is later than an allowed amount of
> > lateness. In Flink 1.1 we will introduce a setting for windows that
> allows
> > to specify an allowed lateness. With this, late elements will be dropped
> > automatically. This feature is already available in the master, by the
> way.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vi...@gmail.com>
> wrote:
> >
> > > Hi,
> > >
> > > Ok.
> > > Inside the checkAndGetNextWatermark(lastElement, extractedTimestamp)
> > method
> > > both these parameters are coming same (timestamp value) , I was
> expecting
> > > last element timestamp value in the 1st param when I extract it.
> > >
> > > Lets say I decide to use IngestionTime (since I am getting accurate
> > results
> > > here for now), if the joining element of both streams are assigned to
> > > different windows , then it that case I will not get the match , right
> ?
> > >
> > > However in case of event time this guarantees to be in the same window
> > > since we are assigning the timestamp, correct me here.
> > >
> > >  According to documentation :
> > > * Ingestion Time programs cannot handle any out-of-order events or late
> > > data*
> > >
> > > In this context What do we mean by out-of-order events How does it know
> > > that the events are out of order, I mean on which parameter does it
> > decide
> > > that the events are out-of-order  ? As in case of event time we can say
> > the
> > > timestamps received are out of order.
> > >
> > > Late Data : does it have a threshold after which it does not accept
> late
> > > data ?
> > >
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > Hi,
> > > > the element will be kept around indefinitely if no new watermark
> > arrives.
> > > >
> > > > I think the same problem will persist for
> > > AssignerWithPunctuatedWatermarks
> > > > since there you also might not get the required "last watermark" to
> > > trigger
> > > > processing of the last window.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > > On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vi...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > This clears a lot of doubts now.
> > > > > So now lets say the stream paused for a while or it stops
> completely
> > on
> > > > > Friday , let us assume that the last message did not get processed
> > and
> > > is
> > > > > kept in the internal buffers.
> > > > >
> > > > > So when the stream starts again on Monday , will it consider the
> last
> > > > > element that is in the internal buffer for processing ?
> > > > >  How much time the internal buffer can hold the data or will it
> flush
> > > the
> > > > > data after a threshold ?
> > > > >
> > > > > I have tried using AssignerWithPunctuatedWatermarks and generated
> the
> > > > > watermark for each event, still getting one record less.
> > > > >
> > > > >
> > > > > Regards,
> > > > > Vinay Patil
> > > > >
> > > > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > the reason why the last element might never be emitted is the way
> > the
> > > > > > ascending timestamp extractor works. I'll try and explain with an
> > > > > example.
> > > > > >
> > > > > > Let's say we have a window size of 2 milliseconds, elements
> arrive
> > > > > starting
> > > > > > with timestamp 0, window begin timestamp is inclusive, end
> > timestamp
> > > is
> > > > > > exclusive:
> > > > > >
> > > > > > Element 0, Timestamp 0 (at this point the watermark is -1)
> > > > > > Element 1, Timestamp 1 (at this point the watermark is 0)
> > > > > > Element 2, Timestamp 1 (at this point the watermark is still 0)
> > > > > > Element 3, Timestamp 2 (at this point the watermark is 1)
> > > > > >
> > > > > > now we can process the window (0, 2) because we know from the
> > > watermark
> > > > > > that no elements can arrive for that window anymore. The window
> > > > contains
> > > > > > elements 0,1,2
> > > > > >
> > > > > > Element 4, Timestamp 3 (at this point the watermark is 2)
> > > > > > Element 5, Timestamp 4 (at this point the watermark is 3)
> > > > > >
> > > > > > now we can process window (2, 4). The window contains elements
> 3,4.
> > > > > >
> > > > > > At this point, we have Element 5 sitting in internal buffers for
> > > window
> > > > > (4,
> > > > > > 6) but if we don't receive further elements the watermark will
> > never
> > > > > > advance and we will never process that window.
> > > > > >
> > > > > > If, however, we get new elements at some point the watermark
> > advances
> > > > and
> > > > > > we don't have a problem. That's what I meant when I said that you
> > > > > shouldn't
> > > > > > have a problem if data keeps continuously arriving.
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > >
> > > > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <
> vinay18.patil@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Aljoscha,
> > > > > > >
> > > > > > > Thanks a lot for your inputs.
> > > > > > >
> > > > > > > I still did not get you when you say you will not face this
> issue
> > > in
> > > > > case
> > > > > > > of continuous stream, lets consider the following example :
> > > > > > > Assume that the stream runs continuously from Monday  to
> Friday,
> > > and
> > > > on
> > > > > > > Friday it stops after 5.00 PM , will I still face this issue ?
> > > > > > >
> > > > > > > I am actually not able to understand how it will differ in real
> > > time
> > > > > > > streams.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Vinay Patil
> > > > > > >
> > > > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <
> > > > aljoscha@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > ingestion time can only be used if you don't care about the
> > > > timestamp
> > > > > > in
> > > > > > > > the elements. So if you have those you should probably use
> > event
> > > > > time.
> > > > > > > >
> > > > > > > > If your timestamps really are strictly increasing then the
> > > > ascending
> > > > > > > > extractor is good. And if you have a continuous stream of
> > > incoming
> > > > > > > elements
> > > > > > > > you will not see the behavior of not getting the last
> elements.
> > > > > > > >
> > > > > > > > By the way, when using Kafka you can also embed the timestamp
> > > > > extractor
> > > > > > > > directly in the Kafka consumer. This is described here:
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Aljoscha
> > > > > > > >
> > > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <
> > > vinay18.patil@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Aljoscha,
> > > > > > > > >
> > > > > > > > > Thank you for your response.
> > > > > > > > > So do you suggest to use different approach for extracting
> > > > > timestamp
> > > > > > > (as
> > > > > > > > > given in document) instead of AscendingTimeStamp Extractor
> ?
> > > > > > > > > Is that the reason I am seeing this unexpected behaviour ?
> in
> > > > case
> > > > > of
> > > > > > > > > continuous stream I would not see any data loss ?
> > > > > > > > >
> > > > > > > > > Also assuming that the records are always going to be in
> > order
> > > ,
> > > > > > which
> > > > > > > is
> > > > > > > > > the best approach : Ingestion Time or Event Time ?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Vinay Patil
> > > > > > > > >
> > > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> > > > > > aljoscha@apache.org
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > > first regarding tumbling windows: even if you have 5
> minute
> > > > > windows
> > > > > > > it
> > > > > > > > > can
> > > > > > > > > > happen that elements that are only seconds apart go into
> > > > > different
> > > > > > > > > windows.
> > > > > > > > > > Consider the following case:
> > > > > > > > > >
> > > > > > > > > > |                x | x                 |
> > > > > > > > > >
> > > > > > > > > > These are two 5-mintue windows and the two elements are
> > only
> > > > > > seconds
> > > > > > > > > apart
> > > > > > > > > > but go into different windows because windows are aligned
> > to
> > > > > epoch.
> > > > > > > > > >
> > > > > > > > > > Now, for the ascending timestamp extractor. The reason
> this
> > > can
> > > > > > > behave
> > > > > > > > in
> > > > > > > > > > unexpected ways is that it emits a watermark that is
> "last
> > > > > > timestamp
> > > > > > > -
> > > > > > > > > 1",
> > > > > > > > > > i.e. if it has seen timestamp t it can only emit
> watermark
> > > t-1
> > > > > > > because
> > > > > > > > > > there might be other elements with timestamp t arriving.
> If
> > > you
> > > > > > have
> > > > > > > a
> > > > > > > > > > continuous stream of elements you wouldn't notice this.
> > Only
> > > in
> > > > > > this
> > > > > > > > > > constructed example does it become visible.
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > > Aljoscha
> > > > > > > > > >
> > > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> > > > > vinay18.patil@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > Following is the timestamp I am getting from DTO, here
> is
> > > the
> > > > > > > > timestamp
> > > > > > > > > > > difference between the two records :
> > > > > > > > > > > 1466115892162154279
> > > > > > > > > > > 1466116026233613409
> > > > > > > > > > >
> > > > > > > > > > > So the time difference is roughly 3 min, even if I
> apply
> > > the
> > > > > > window
> > > > > > > > of
> > > > > > > > > > 5min
> > > > > > > > > > > , I am not getting the last record (last timestamp
> value
> > > > > above),
> > > > > > > > > > > using ascending timestamp extractor for generating the
> > > > > timestamp
> > > > > > > > > > (assuming
> > > > > > > > > > > that the timestamp are always in order)
> > > > > > > > > > >
> > > > > > > > > > > I was at-least expecting data to reach the co-group
> > > function.
> > > > > > > > > > > What could be the reason for the data loss ? The data
> we
> > > are
> > > > > > > getting
> > > > > > > > is
> > > > > > > > > > > critical, hence we cannot afford to loose any data
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > > Vinay Patil
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > > > > > > > vinay18.patil@gmail.com
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Just an update, when I keep IngestionTime and remove
> > the
> > > > > > > timestamp
> > > > > > > > I
> > > > > > > > > am
> > > > > > > > > > > > generating, I am getting all the records, but for
> Event
> > > > Time
> > > > > I
> > > > > > am
> > > > > > > > > > getting
> > > > > > > > > > > > one less record, I checked the Time Difference
> between
> > > two
> > > > > > > records,
> > > > > > > > > it
> > > > > > > > > > > is 3
> > > > > > > > > > > > min, I tried keeping the window time to 5 mins, but
> > that
> > > > even
> > > > > > did
> > > > > > > > not
> > > > > > > > > > > work.
> > > > > > > > > > > >
> > > > > > > > > > > > Even when I try assigning timestamp for
> IngestionTime,
> > I
> > > > get
> > > > > > one
> > > > > > > > > record
> > > > > > > > > > > > less, so should I safely use Ingestion Time or is it
> > > always
> > > > > > > > advisable
> > > > > > > > > > to
> > > > > > > > > > > > use EventTime ?
> > > > > > > > > > > >
> > > > > > > > > > > > Regards,
> > > > > > > > > > > > Vinay Patil
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > > > > > > > > vinay18.patil@gmail.com>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> Hi ,
> > > > > > > > > > > >>
> > > > > > > > > > > >> Actually I am only publishing 5 messages each to two
> > > > > different
> > > > > > > > kafka
> > > > > > > > > > > >> topics (using Junit), even if I keep the window to
> 500
> > > > > seconds
> > > > > > > the
> > > > > > > > > > > result
> > > > > > > > > > > >> is same.
> > > > > > > > > > > >>
> > > > > > > > > > > >> I am not understanding why it is not sending the 5th
> > > > element
> > > > > > to
> > > > > > > > > > co-group
> > > > > > > > > > > >> operator even when the keys are same.
> > > > > > > > > > > >>
> > > > > > > > > > > >> I actually cannot share the actual client code.
> > > > > > > > > > > >> But this is what the streams look like :
> > > > > > > > > > > >> sourceStream.coGroup(destStream)
> > > > > > > > > > > >> here the sourceStream and destStream is actually
> > > > > > > > Tuple2<String,DTO>
> > > > > > > > > ,
> > > > > > > > > > > and
> > > > > > > > > > > >> the ElementSelector returns tuple.f0 which is the
> key.
> > > > > > > > > > > >>
> > > > > > > > > > > >> I am generating the timestamp based on a field from
> > the
> > > > DTO
> > > > > > > which
> > > > > > > > is
> > > > > > > > > > > >> guaranteed to be in order.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Will using the triggers help here ?
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >> Regards,
> > > > > > > > > > > >> Vinay Patil
> > > > > > > > > > > >>
> > > > > > > > > > > >> *+91-800-728-4749*
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > > > > > > > > > aljoscha@apache.org>
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >>> Hi,
> > > > > > > > > > > >>> what timestamps are you assigning? Is it guaranteed
> > > that
> > > > > all
> > > > > > of
> > > > > > > > > them
> > > > > > > > > > > >>> would
> > > > > > > > > > > >>> fall into the same 30 second window?
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> The issue with duplicate printing in the
> > > ElementSelector
> > > > is
> > > > > > > > > strange?
> > > > > > > > > > > >>> Could
> > > > > > > > > > > >>> you post a more complete code example so that I can
> > > > > reproduce
> > > > > > > the
> > > > > > > > > > > >>> problem?
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> Cheers,
> > > > > > > > > > > >>> Aljoscha
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> > > > > > > > vinay18.patil@gmail.com>
> > > > > > > > > > > >>> wrote:
> > > > > > > > > > > >>>
> > > > > > > > > > > >>> > Hi ,
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > I am able to get the matching and non-matching
> > > > elements.
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > However when I am unit testing the code , I am
> > > getting
> > > > > one
> > > > > > > > record
> > > > > > > > > > > less
> > > > > > > > > > > >>> > inside the overriden cogroup function.
> > > > > > > > > > > >>> > Testing the following way :
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic
> (test1)
> > > > > > > > > > > >>> > 2) Insert different 5 messages into local kafka
> > topic
> > > > > > (test2)
> > > > > > > > > > > >>> > 3) Consume 1) and 2) and I have two different
> kafka
> > > > > > streams
> > > > > > > > > > > >>> > 4) Generate ascending timestamp(using Event Time)
> > for
> > > > > both
> > > > > > > > > streams
> > > > > > > > > > > and
> > > > > > > > > > > >>> > create key(String)
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > Now till 4) I am able to get all the records
> > (checked
> > > > by
> > > > > > > > printing
> > > > > > > > > > the
> > > > > > > > > > > >>> > stream in text file)
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > However when I send the stream to co-group
> > operator,
> > > I
> > > > am
> > > > > > > > > receiving
> > > > > > > > > > > one
> > > > > > > > > > > >>> > less record, using the following syntax:
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > sourceStream.coGroup(destStream)
> > > > > > > > > > > >>> > .where(new ElementSelector())
> > > > > > > > > > > >>> > .equalTo(new ElementSelector())
> > > > > > > > > > > >>> >
> > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > > > >>> > .apply(new JoinStreams);
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > Also in the Element Selector I have inserted a
> > > sysout,
> > > > I
> > > > > am
> > > > > > > > > getting
> > > > > > > > > > > 20
> > > > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source and
> 10
> > > for
> > > > > > dest
> > > > > > > > > > stream)
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > Unable to understand why one record is coming
> less
> > to
> > > > > > > co-group
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > Regards,
> > > > > > > > > > > >>> > Vinay Patil
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> > > > > > > > > fhueske@gmail.com>
> > > > > > > > > > > >>> wrote:
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>> > > Can you add a flag to each element emitted by
> the
> > > > > > > > > CoGroupFunction
> > > > > > > > > > > >>> that
> > > > > > > > > > > >>> > > indicates whether it was joined or not?
> > > > > > > > > > > >>> > > Then you can use split to distinguish between
> > both
> > > > > cases
> > > > > > > and
> > > > > > > > > > handle
> > > > > > > > > > > >>> both
> > > > > > > > > > > >>> > > streams differently.
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> > > Best, Fabian
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> > > > > > > > vinay18.patil@gmail.com
> > > > > > > > > >:
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> > > > Hi Jark,
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > I am able to get the non-matching elements
> in a
> > > > > stream
> > > > > > :,
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > Of-course we can collect the matching
> elements
> > in
> > > > the
> > > > > > > same
> > > > > > > > > > stream
> > > > > > > > > > > >>> as
> > > > > > > > > > > >>> > > well,
> > > > > > > > > > > >>> > > > however I want to perform additional
> operations
> > > on
> > > > > the
> > > > > > > > joined
> > > > > > > > > > > >>> stream
> > > > > > > > > > > >>> > > before
> > > > > > > > > > > >>> > > > writing it to S3, so I would have to include
> a
> > > > > separate
> > > > > > > > join
> > > > > > > > > > > >>> operator
> > > > > > > > > > > >>> > for
> > > > > > > > > > > >>> > > > the same two streams, right ?
> > > > > > > > > > > >>> > > > Correct me if I am wrong.
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > I have pasted the dummy code which collects
> the
> > > > > > > > non-matching
> > > > > > > > > > > >>> records (i
> > > > > > > > > > > >>> > > > have to perform this on the actual data,
> > correct
> > > me
> > > > > if
> > > > > > I
> > > > > > > am
> > > > > > > > > > dong
> > > > > > > > > > > >>> > wrong).
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > > > > > > > >>> > ElementSelector()).equalTo(new
> > > > > > > > > > > >>> > > > ElementSelector())
> > > > > > > > > > > >>> > > >
> > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer, Integer,
> > > > > > Integer>() {
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > private static final long serialVersionUID =
> > > > > > > > > > > 6408179761497497475L;
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > @Override
> > > > > > > > > > > >>> > > > public void coGroup(Iterable<Integer>
> > > > paramIterable,
> > > > > > > > > > > >>> Iterable<Integer>
> > > > > > > > > > > >>> > > > paramIterable1,
> > > > > > > > > > > >>> > > > Collector<Integer> paramCollector) throws
> > > > Exception {
> > > > > > > > > > > >>> > > > long exactSizeIfKnown =
> > > > > > > > > > > >>> > >
> > paramIterable.spliterator().getExactSizeIfKnown();
> > > > > > > > > > > >>> > > > long exactSizeIfKnown2 =
> > > > > > > > > > > >>> > > >
> > > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > > > > > > > > > >>> > > >
> > > > > > paramCollector.collect(paramIterable1.iterator().next());
> > > > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > > > > > > > > >>> > > >
> > > > > > paramCollector.collect(paramIterable.iterator().next());
> > > > > > > > > > > >>> > > > }
> > > > > > > > > > > >>> > > > }
> > > > > > > > > > > >>> > > > }).print();
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > Regards,
> > > > > > > > > > > >>> > > > Vinay Patil
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil
> <
> > > > > > > > > > > >>> vinay18.patil@gmail.com>
> > > > > > > > > > > >>> > > > wrote:
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > > > > You are right, debugged it for all elements
> > , I
> > > > can
> > > > > > do
> > > > > > > > that
> > > > > > > > > > > now.
> > > > > > > > > > > >>> > > > > Thanks a lot.
> > > > > > > > > > > >>> > > > >
> > > > > > > > > > > >>> > > > > Regards,
> > > > > > > > > > > >>> > > > > Vinay Patil
> > > > > > > > > > > >>> > > > >
> > > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > > > > > > > > > > >>> > wuchong.wc@alibaba-inc.com>
> > > > > > > > > > > >>> > > > > wrote:
> > > > > > > > > > > >>> > > > >
> > > > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
> > > > > > Iterable<Integer>
> > > > > > > > > > iter2,
> > > > > > > > > > > >>> > > > >> Collector<Integer> out)` ,   when both
> iter1
> > > and
> > > > > > iter2
> > > > > > > > are
> > > > > > > > > > not
> > > > > > > > > > > >>> > empty,
> > > > > > > > > > > >>> > > it
> > > > > > > > > > > >>> > > > >> means they are matched elements from both
> > > > stream.
> > > > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty , it
> > > means
> > > > > that
> > > > > > > > they
> > > > > > > > > > are
> > > > > > > > > > > >>> > > unmatched.
> > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> > > > > > > > > vinay18.patil@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > >>> 写道:
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> > Hi Matthias ,
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> > I did not get you, even if we use
> Co-Group
> > > we
> > > > > have
> > > > > > > to
> > > > > > > > > > apply
> > > > > > > > > > > >>> it on
> > > > > > > > > > > >>> > a
> > > > > > > > > > > >>> > > > key
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > > > > > > > >>> > > > >> > .where(new ElementSelector())
> > > > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector())
> > > > > > > > > > > >>> > > > >> >
> > > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer,
> > Integer,
> > > > > > > > Integer>()
> > > > > > > > > {
> > > > > > > > > > > >>> > > > >> > private static final long
> > serialVersionUID =
> > > > > > > > > > > >>> 6408179761497497475L;
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> > @Override
> > > > > > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer>
> > > > > > paramIterable,
> > > > > > > > > > > >>> > > Iterable<Integer>
> > > > > > > > > > > >>> > > > >> > paramIterable1,
> > > > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector)
> throws
> > > > > > Exception
> > > > > > > {
> > > > > > > > > > > >>> > > > >> > Iterator<Integer> iterator =
> > > > > > > paramIterable.iterator();
> > > > > > > > > > > >>> > > > >> > while(iterator.hasNext()) {
> > > > > > > > > > > >>> > > > >> > }
> > > > > > > > > > > >>> > > > >> > }
> > > > > > > > > > > >>> > > > >> > });
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> > when I debug this ,only the matched
> > element
> > > > from
> > > > > > > both
> > > > > > > > > > stream
> > > > > > > > > > > >>> will
> > > > > > > > > > > >>> > > come
> > > > > > > > > > > >>> > > > >> in
> > > > > > > > > > > >>> > > > >> > the coGroup function.
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> > What I want is how do I check for
> > unmatched
> > > > > > elements
> > > > > > > > > from
> > > > > > > > > > > both
> > > > > > > > > > > >>> > > streams
> > > > > > > > > > > >>> > > > >> and
> > > > > > > > > > > >>> > > > >> > write it to sink.
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> > Regards,
> > > > > > > > > > > >>> > > > >> > Vinay Patil
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM,
> Matthias
> > J.
> > > > > Sax <
> > > > > > > > > > > >>> > mjsax@apache.org>
> > > > > > > > > > > >>> > > > >> wrote:
> > > > > > > > > > > >>> > > > >> >
> > > > > > > > > > > >>> > > > >> >> You need to do an outer-join. However,
> > > there
> > > > is
> > > > > > no
> > > > > > > > > > build-in
> > > > > > > > > > > >>> > support
> > > > > > > > > > > >>> > > > for
> > > > > > > > > > > >>> > > > >> >> outer-joins yet.
> > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to implement
> > the
> > > > > > > > outer-join
> > > > > > > > > as
> > > > > > > > > > > an
> > > > > > > > > > > >>> own
> > > > > > > > > > > >>> > > > >> operator.
> > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > >>> > > > >> >> -Matthias
> > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil
> > wrote:
> > > > > > > > > > > >>> > > > >> >>> Hi,
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> > > > >> >>> I have a question regarding the join
> > > > > operation,
> > > > > > > > > consider
> > > > > > > > > > > the
> > > > > > > > > > > >>> > > > following
> > > > > > > > > > > >>> > > > >> >>> dummy example:
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> >
> > > > > > > > >
> > > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer>
> sourceStream =
> > > > > > > > > > > >>> > > > >> >>>
> > > env.fromElements(10,20,23,25,30,33,102,18);
> > > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > > > > > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > > > > > > > > >>> > > > >> >>> .where(new ElementSelector())
> > > > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >
> > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer,
> > Integer,
> > > > > > > > Integer>() {
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> > > > >> >>> private static final long
> > > serialVersionUID =
> > > > > 1L;
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> > > > >> >>> @Override
> > > > > > > > > > > >>> > > > >> >>> public Integer join(Integer paramIN1,
> > > > Integer
> > > > > > > > > paramIN2)
> > > > > > > > > > > >>> throws
> > > > > > > > > > > >>> > > > >> Exception
> > > > > > > > > > > >>> > > > >> >> {
> > > > > > > > > > > >>> > > > >> >>> return paramIN1;
> > > > > > > > > > > >>> > > > >> >>> }
> > > > > > > > > > > >>> > > > >> >>> }).print();
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> > > > >> >>> I perfectly get the elements that are
> > > > matching
> > > > > > in
> > > > > > > > both
> > > > > > > > > > the
> > > > > > > > > > > >>> > > streams,
> > > > > > > > > > > >>> > > > >> >> however
> > > > > > > > > > > >>> > > > >> >>> my requirement is to write these
> matched
> > > > > > elements
> > > > > > > > and
> > > > > > > > > > also
> > > > > > > > > > > >>> the
> > > > > > > > > > > >>> > > > >> unmatched
> > > > > > > > > > > >>> > > > >> >>> elements to sink(S3)
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> > > > >> >>> How do I get the unmatched elements
> from
> > > > each
> > > > > > > > stream ?
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> > > > >> >>> Regards,
> > > > > > > > > > > >>> > > > >> >>> Vinay Patil
> > > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > >>> > > > >>
> > > > > > > > > > > >>> > > > >
> > > > > > > > > > > >>> > > >
> > > > > > > > > > > >>> > >
> > > > > > > > > > > >>> >
> > > > > > > > > > > >>>
> > > > > > > > > > > >>
> > > > > > > > > > > >>
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [Discussion] Query regarding Join and Windows

Posted by Vinay Patil <vi...@gmail.com>.
Hi,

Yes , now I am getting clear with the concepts here.
One last thing I want to try before going for custom trigger, I want to try
Delta Trigger but I am not able to get the syntax right , this is how I am
trying it :

TypeInformation<Tuple2<String, DTO>> typeInfo = TypeInformation.of(new
TypeHint<Tuple2<String, DTO>>() {
});
// source and destStream : Tuple2<String,DTO>
sourceStream.coGroup(destStream).where(new ElementSelector()).equalTo(new
ElementSelector())
.window(TumblingTimeEventWindows.of(Time.seconds(10)))
.trigger(DeltaTrigger.of(triggerMeters,
new DeltaFunction<Tuple2<String,DTO>>() {
private static final long serialVersionUID = 1L;

@Override
public double getDelta(
Tuple2<String,DTO> oldDataPoint,
Tuple2<String,DTO> newDataPoint) {
return <some_val>;
}
}, typeInfo.createSerializer(env.getConfig()).apply(new JoinStreams());

I am getting error cannot convert from DeltaTrigger to Trigger<? super
CoGroupedStreams...
What am I doing wrong here, I have referred the sample example.

Regards,
Vinay Patil

On Wed, Jun 29, 2016 at 7:15 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> you can use ingestion time if you don't care about the timestamps in your
> events, yes. If elements from the two streams happen to arrive at such
> times that they are not put into the same window then you won't get a
> match, correct.
>
> Regarding ingestion time and out-of-order events. I think this section just
> reiterates that when using ingestion time the inherent timestamps in your
> events will not be considered and their order will not be respected.
>
> Regarding late data: right now, Flink always processes late data and it is
> up to the Trigger to decide what to do with late data. You can implement
> your custom trigger based on EventTimeTrigger that would immediately purge
> a window when an element arrives that is later than an allowed amount of
> lateness. In Flink 1.1 we will introduce a setting for windows that allows
> to specify an allowed lateness. With this, late elements will be dropped
> automatically. This feature is already available in the master, by the way.
>
> Cheers,
> Aljoscha
>
> On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vi...@gmail.com> wrote:
>
> > Hi,
> >
> > Ok.
> > Inside the checkAndGetNextWatermark(lastElement, extractedTimestamp)
> method
> > both these parameters are coming same (timestamp value) , I was expecting
> > last element timestamp value in the 1st param when I extract it.
> >
> > Lets say I decide to use IngestionTime (since I am getting accurate
> results
> > here for now), if the joining element of both streams are assigned to
> > different windows , then it that case I will not get the match , right ?
> >
> > However in case of event time this guarantees to be in the same window
> > since we are assigning the timestamp, correct me here.
> >
> >  According to documentation :
> > * Ingestion Time programs cannot handle any out-of-order events or late
> > data*
> >
> > In this context What do we mean by out-of-order events How does it know
> > that the events are out of order, I mean on which parameter does it
> decide
> > that the events are out-of-order  ? As in case of event time we can say
> the
> > timestamps received are out of order.
> >
> > Late Data : does it have a threshold after which it does not accept late
> > data ?
> >
> >
> > Regards,
> > Vinay Patil
> >
> > On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <al...@apache.org>
> > wrote:
> >
> > > Hi,
> > > the element will be kept around indefinitely if no new watermark
> arrives.
> > >
> > > I think the same problem will persist for
> > AssignerWithPunctuatedWatermarks
> > > since there you also might not get the required "last watermark" to
> > trigger
> > > processing of the last window.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vi...@gmail.com>
> > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > This clears a lot of doubts now.
> > > > So now lets say the stream paused for a while or it stops completely
> on
> > > > Friday , let us assume that the last message did not get processed
> and
> > is
> > > > kept in the internal buffers.
> > > >
> > > > So when the stream starts again on Monday , will it consider the last
> > > > element that is in the internal buffer for processing ?
> > > >  How much time the internal buffer can hold the data or will it flush
> > the
> > > > data after a threshold ?
> > > >
> > > > I have tried using AssignerWithPunctuatedWatermarks and generated the
> > > > watermark for each event, still getting one record less.
> > > >
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > the reason why the last element might never be emitted is the way
> the
> > > > > ascending timestamp extractor works. I'll try and explain with an
> > > > example.
> > > > >
> > > > > Let's say we have a window size of 2 milliseconds, elements arrive
> > > > starting
> > > > > with timestamp 0, window begin timestamp is inclusive, end
> timestamp
> > is
> > > > > exclusive:
> > > > >
> > > > > Element 0, Timestamp 0 (at this point the watermark is -1)
> > > > > Element 1, Timestamp 1 (at this point the watermark is 0)
> > > > > Element 2, Timestamp 1 (at this point the watermark is still 0)
> > > > > Element 3, Timestamp 2 (at this point the watermark is 1)
> > > > >
> > > > > now we can process the window (0, 2) because we know from the
> > watermark
> > > > > that no elements can arrive for that window anymore. The window
> > > contains
> > > > > elements 0,1,2
> > > > >
> > > > > Element 4, Timestamp 3 (at this point the watermark is 2)
> > > > > Element 5, Timestamp 4 (at this point the watermark is 3)
> > > > >
> > > > > now we can process window (2, 4). The window contains elements 3,4.
> > > > >
> > > > > At this point, we have Element 5 sitting in internal buffers for
> > window
> > > > (4,
> > > > > 6) but if we don't receive further elements the watermark will
> never
> > > > > advance and we will never process that window.
> > > > >
> > > > > If, however, we get new elements at some point the watermark
> advances
> > > and
> > > > > we don't have a problem. That's what I meant when I said that you
> > > > shouldn't
> > > > > have a problem if data keeps continuously arriving.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > >
> > > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <vi...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > Thanks a lot for your inputs.
> > > > > >
> > > > > > I still did not get you when you say you will not face this issue
> > in
> > > > case
> > > > > > of continuous stream, lets consider the following example :
> > > > > > Assume that the stream runs continuously from Monday  to Friday,
> > and
> > > on
> > > > > > Friday it stops after 5.00 PM , will I still face this issue ?
> > > > > >
> > > > > > I am actually not able to understand how it will differ in real
> > time
> > > > > > streams.
> > > > > >
> > > > > > Regards,
> > > > > > Vinay Patil
> > > > > >
> > > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <
> > > aljoscha@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > ingestion time can only be used if you don't care about the
> > > timestamp
> > > > > in
> > > > > > > the elements. So if you have those you should probably use
> event
> > > > time.
> > > > > > >
> > > > > > > If your timestamps really are strictly increasing then the
> > > ascending
> > > > > > > extractor is good. And if you have a continuous stream of
> > incoming
> > > > > > elements
> > > > > > > you will not see the behavior of not getting the last elements.
> > > > > > >
> > > > > > > By the way, when using Kafka you can also embed the timestamp
> > > > extractor
> > > > > > > directly in the Kafka consumer. This is described here:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <
> > vinay18.patil@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Aljoscha,
> > > > > > > >
> > > > > > > > Thank you for your response.
> > > > > > > > So do you suggest to use different approach for extracting
> > > > timestamp
> > > > > > (as
> > > > > > > > given in document) instead of AscendingTimeStamp Extractor ?
> > > > > > > > Is that the reason I am seeing this unexpected behaviour ? in
> > > case
> > > > of
> > > > > > > > continuous stream I would not see any data loss ?
> > > > > > > >
> > > > > > > > Also assuming that the records are always going to be in
> order
> > ,
> > > > > which
> > > > > > is
> > > > > > > > the best approach : Ingestion Time or Event Time ?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Vinay Patil
> > > > > > > >
> > > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> > > > > aljoscha@apache.org
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > first regarding tumbling windows: even if you have 5 minute
> > > > windows
> > > > > > it
> > > > > > > > can
> > > > > > > > > happen that elements that are only seconds apart go into
> > > > different
> > > > > > > > windows.
> > > > > > > > > Consider the following case:
> > > > > > > > >
> > > > > > > > > |                x | x                 |
> > > > > > > > >
> > > > > > > > > These are two 5-mintue windows and the two elements are
> only
> > > > > seconds
> > > > > > > > apart
> > > > > > > > > but go into different windows because windows are aligned
> to
> > > > epoch.
> > > > > > > > >
> > > > > > > > > Now, for the ascending timestamp extractor. The reason this
> > can
> > > > > > behave
> > > > > > > in
> > > > > > > > > unexpected ways is that it emits a watermark that is "last
> > > > > timestamp
> > > > > > -
> > > > > > > > 1",
> > > > > > > > > i.e. if it has seen timestamp t it can only emit watermark
> > t-1
> > > > > > because
> > > > > > > > > there might be other elements with timestamp t arriving. If
> > you
> > > > > have
> > > > > > a
> > > > > > > > > continuous stream of elements you wouldn't notice this.
> Only
> > in
> > > > > this
> > > > > > > > > constructed example does it become visible.
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Aljoscha
> > > > > > > > >
> > > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> > > > vinay18.patil@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > Following is the timestamp I am getting from DTO, here is
> > the
> > > > > > > timestamp
> > > > > > > > > > difference between the two records :
> > > > > > > > > > 1466115892162154279
> > > > > > > > > > 1466116026233613409
> > > > > > > > > >
> > > > > > > > > > So the time difference is roughly 3 min, even if I apply
> > the
> > > > > window
> > > > > > > of
> > > > > > > > > 5min
> > > > > > > > > > , I am not getting the last record (last timestamp value
> > > > above),
> > > > > > > > > > using ascending timestamp extractor for generating the
> > > > timestamp
> > > > > > > > > (assuming
> > > > > > > > > > that the timestamp are always in order)
> > > > > > > > > >
> > > > > > > > > > I was at-least expecting data to reach the co-group
> > function.
> > > > > > > > > > What could be the reason for the data loss ? The data we
> > are
> > > > > > getting
> > > > > > > is
> > > > > > > > > > critical, hence we cannot afford to loose any data
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Vinay Patil
> > > > > > > > > >
> > > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > > > > > > vinay18.patil@gmail.com
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Just an update, when I keep IngestionTime and remove
> the
> > > > > > timestamp
> > > > > > > I
> > > > > > > > am
> > > > > > > > > > > generating, I am getting all the records, but for Event
> > > Time
> > > > I
> > > > > am
> > > > > > > > > getting
> > > > > > > > > > > one less record, I checked the Time Difference between
> > two
> > > > > > records,
> > > > > > > > it
> > > > > > > > > > is 3
> > > > > > > > > > > min, I tried keeping the window time to 5 mins, but
> that
> > > even
> > > > > did
> > > > > > > not
> > > > > > > > > > work.
> > > > > > > > > > >
> > > > > > > > > > > Even when I try assigning timestamp for IngestionTime,
> I
> > > get
> > > > > one
> > > > > > > > record
> > > > > > > > > > > less, so should I safely use Ingestion Time or is it
> > always
> > > > > > > advisable
> > > > > > > > > to
> > > > > > > > > > > use EventTime ?
> > > > > > > > > > >
> > > > > > > > > > > Regards,
> > > > > > > > > > > Vinay Patil
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > > > > > > > vinay18.patil@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hi ,
> > > > > > > > > > >>
> > > > > > > > > > >> Actually I am only publishing 5 messages each to two
> > > > different
> > > > > > > kafka
> > > > > > > > > > >> topics (using Junit), even if I keep the window to 500
> > > > seconds
> > > > > > the
> > > > > > > > > > result
> > > > > > > > > > >> is same.
> > > > > > > > > > >>
> > > > > > > > > > >> I am not understanding why it is not sending the 5th
> > > element
> > > > > to
> > > > > > > > > co-group
> > > > > > > > > > >> operator even when the keys are same.
> > > > > > > > > > >>
> > > > > > > > > > >> I actually cannot share the actual client code.
> > > > > > > > > > >> But this is what the streams look like :
> > > > > > > > > > >> sourceStream.coGroup(destStream)
> > > > > > > > > > >> here the sourceStream and destStream is actually
> > > > > > > Tuple2<String,DTO>
> > > > > > > > ,
> > > > > > > > > > and
> > > > > > > > > > >> the ElementSelector returns tuple.f0 which is the key.
> > > > > > > > > > >>
> > > > > > > > > > >> I am generating the timestamp based on a field from
> the
> > > DTO
> > > > > > which
> > > > > > > is
> > > > > > > > > > >> guaranteed to be in order.
> > > > > > > > > > >>
> > > > > > > > > > >> Will using the triggers help here ?
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> Regards,
> > > > > > > > > > >> Vinay Patil
> > > > > > > > > > >>
> > > > > > > > > > >> *+91-800-728-4749*
> > > > > > > > > > >>
> > > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > > > > > > > > aljoscha@apache.org>
> > > > > > > > > > >> wrote:
> > > > > > > > > > >>
> > > > > > > > > > >>> Hi,
> > > > > > > > > > >>> what timestamps are you assigning? Is it guaranteed
> > that
> > > > all
> > > > > of
> > > > > > > > them
> > > > > > > > > > >>> would
> > > > > > > > > > >>> fall into the same 30 second window?
> > > > > > > > > > >>>
> > > > > > > > > > >>> The issue with duplicate printing in the
> > ElementSelector
> > > is
> > > > > > > > strange?
> > > > > > > > > > >>> Could
> > > > > > > > > > >>> you post a more complete code example so that I can
> > > > reproduce
> > > > > > the
> > > > > > > > > > >>> problem?
> > > > > > > > > > >>>
> > > > > > > > > > >>> Cheers,
> > > > > > > > > > >>> Aljoscha
> > > > > > > > > > >>>
> > > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> > > > > > > vinay18.patil@gmail.com>
> > > > > > > > > > >>> wrote:
> > > > > > > > > > >>>
> > > > > > > > > > >>> > Hi ,
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > I am able to get the matching and non-matching
> > > elements.
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > However when I am unit testing the code , I am
> > getting
> > > > one
> > > > > > > record
> > > > > > > > > > less
> > > > > > > > > > >>> > inside the overriden cogroup function.
> > > > > > > > > > >>> > Testing the following way :
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > > > > > > > > > >>> > 2) Insert different 5 messages into local kafka
> topic
> > > > > (test2)
> > > > > > > > > > >>> > 3) Consume 1) and 2) and I have two different kafka
> > > > > streams
> > > > > > > > > > >>> > 4) Generate ascending timestamp(using Event Time)
> for
> > > > both
> > > > > > > > streams
> > > > > > > > > > and
> > > > > > > > > > >>> > create key(String)
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > Now till 4) I am able to get all the records
> (checked
> > > by
> > > > > > > printing
> > > > > > > > > the
> > > > > > > > > > >>> > stream in text file)
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > However when I send the stream to co-group
> operator,
> > I
> > > am
> > > > > > > > receiving
> > > > > > > > > > one
> > > > > > > > > > >>> > less record, using the following syntax:
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > sourceStream.coGroup(destStream)
> > > > > > > > > > >>> > .where(new ElementSelector())
> > > > > > > > > > >>> > .equalTo(new ElementSelector())
> > > > > > > > > > >>> >
> > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > > >>> > .apply(new JoinStreams);
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > Also in the Element Selector I have inserted a
> > sysout,
> > > I
> > > > am
> > > > > > > > getting
> > > > > > > > > > 20
> > > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source and 10
> > for
> > > > > dest
> > > > > > > > > stream)
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > Unable to understand why one record is coming less
> to
> > > > > > co-group
> > > > > > > > > > >>> >
> > > > > > > > > > >>> >
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > Regards,
> > > > > > > > > > >>> > Vinay Patil
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> > > > > > > > fhueske@gmail.com>
> > > > > > > > > > >>> wrote:
> > > > > > > > > > >>> >
> > > > > > > > > > >>> > > Can you add a flag to each element emitted by the
> > > > > > > > CoGroupFunction
> > > > > > > > > > >>> that
> > > > > > > > > > >>> > > indicates whether it was joined or not?
> > > > > > > > > > >>> > > Then you can use split to distinguish between
> both
> > > > cases
> > > > > > and
> > > > > > > > > handle
> > > > > > > > > > >>> both
> > > > > > > > > > >>> > > streams differently.
> > > > > > > > > > >>> > >
> > > > > > > > > > >>> > > Best, Fabian
> > > > > > > > > > >>> > >
> > > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> > > > > > > vinay18.patil@gmail.com
> > > > > > > > >:
> > > > > > > > > > >>> > >
> > > > > > > > > > >>> > > > Hi Jark,
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > > I am able to get the non-matching elements in a
> > > > stream
> > > > > :,
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > > Of-course we can collect the matching elements
> in
> > > the
> > > > > > same
> > > > > > > > > stream
> > > > > > > > > > >>> as
> > > > > > > > > > >>> > > well,
> > > > > > > > > > >>> > > > however I want to perform additional operations
> > on
> > > > the
> > > > > > > joined
> > > > > > > > > > >>> stream
> > > > > > > > > > >>> > > before
> > > > > > > > > > >>> > > > writing it to S3, so I would have to include a
> > > > separate
> > > > > > > join
> > > > > > > > > > >>> operator
> > > > > > > > > > >>> > for
> > > > > > > > > > >>> > > > the same two streams, right ?
> > > > > > > > > > >>> > > > Correct me if I am wrong.
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > > I have pasted the dummy code which collects the
> > > > > > > non-matching
> > > > > > > > > > >>> records (i
> > > > > > > > > > >>> > > > have to perform this on the actual data,
> correct
> > me
> > > > if
> > > > > I
> > > > > > am
> > > > > > > > > dong
> > > > > > > > > > >>> > wrong).
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > > > > > > >>> > ElementSelector()).equalTo(new
> > > > > > > > > > >>> > > > ElementSelector())
> > > > > > > > > > >>> > > >
> > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer, Integer,
> > > > > Integer>() {
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > > private static final long serialVersionUID =
> > > > > > > > > > 6408179761497497475L;
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > > @Override
> > > > > > > > > > >>> > > > public void coGroup(Iterable<Integer>
> > > paramIterable,
> > > > > > > > > > >>> Iterable<Integer>
> > > > > > > > > > >>> > > > paramIterable1,
> > > > > > > > > > >>> > > > Collector<Integer> paramCollector) throws
> > > Exception {
> > > > > > > > > > >>> > > > long exactSizeIfKnown =
> > > > > > > > > > >>> > >
> paramIterable.spliterator().getExactSizeIfKnown();
> > > > > > > > > > >>> > > > long exactSizeIfKnown2 =
> > > > > > > > > > >>> > > >
> > paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > > > > > > > > >>> > > >
> > > > > paramCollector.collect(paramIterable1.iterator().next());
> > > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > > > > > > > >>> > > >
> > > > > paramCollector.collect(paramIterable.iterator().next());
> > > > > > > > > > >>> > > > }
> > > > > > > > > > >>> > > > }
> > > > > > > > > > >>> > > > }).print();
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > > Regards,
> > > > > > > > > > >>> > > > Vinay Patil
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > > > > > > > > > >>> vinay18.patil@gmail.com>
> > > > > > > > > > >>> > > > wrote:
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > > > > You are right, debugged it for all elements
> , I
> > > can
> > > > > do
> > > > > > > that
> > > > > > > > > > now.
> > > > > > > > > > >>> > > > > Thanks a lot.
> > > > > > > > > > >>> > > > >
> > > > > > > > > > >>> > > > > Regards,
> > > > > > > > > > >>> > > > > Vinay Patil
> > > > > > > > > > >>> > > > >
> > > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > > > > > > > > > >>> > wuchong.wc@alibaba-inc.com>
> > > > > > > > > > >>> > > > > wrote:
> > > > > > > > > > >>> > > > >
> > > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
> > > > > Iterable<Integer>
> > > > > > > > > iter2,
> > > > > > > > > > >>> > > > >> Collector<Integer> out)` ,   when both iter1
> > and
> > > > > iter2
> > > > > > > are
> > > > > > > > > not
> > > > > > > > > > >>> > empty,
> > > > > > > > > > >>> > > it
> > > > > > > > > > >>> > > > >> means they are matched elements from both
> > > stream.
> > > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty , it
> > means
> > > > that
> > > > > > > they
> > > > > > > > > are
> > > > > > > > > > >>> > > unmatched.
> > > > > > > > > > >>> > > > >>
> > > > > > > > > > >>> > > > >>
> > > > > > > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > > > > > > >>> > > > >>
> > > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> > > > > > > > vinay18.patil@gmail.com
> > > > > > > > > >
> > > > > > > > > > >>> 写道:
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> > Hi Matthias ,
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> > I did not get you, even if we use Co-Group
> > we
> > > > have
> > > > > > to
> > > > > > > > > apply
> > > > > > > > > > >>> it on
> > > > > > > > > > >>> > a
> > > > > > > > > > >>> > > > key
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > > > > > > >>> > > > >> > .where(new ElementSelector())
> > > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector())
> > > > > > > > > > >>> > > > >> >
> > > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer,
> Integer,
> > > > > > > Integer>()
> > > > > > > > {
> > > > > > > > > > >>> > > > >> > private static final long
> serialVersionUID =
> > > > > > > > > > >>> 6408179761497497475L;
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> > @Override
> > > > > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer>
> > > > > paramIterable,
> > > > > > > > > > >>> > > Iterable<Integer>
> > > > > > > > > > >>> > > > >> > paramIterable1,
> > > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector) throws
> > > > > Exception
> > > > > > {
> > > > > > > > > > >>> > > > >> > Iterator<Integer> iterator =
> > > > > > paramIterable.iterator();
> > > > > > > > > > >>> > > > >> > while(iterator.hasNext()) {
> > > > > > > > > > >>> > > > >> > }
> > > > > > > > > > >>> > > > >> > }
> > > > > > > > > > >>> > > > >> > });
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> > when I debug this ,only the matched
> element
> > > from
> > > > > > both
> > > > > > > > > stream
> > > > > > > > > > >>> will
> > > > > > > > > > >>> > > come
> > > > > > > > > > >>> > > > >> in
> > > > > > > > > > >>> > > > >> > the coGroup function.
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> > What I want is how do I check for
> unmatched
> > > > > elements
> > > > > > > > from
> > > > > > > > > > both
> > > > > > > > > > >>> > > streams
> > > > > > > > > > >>> > > > >> and
> > > > > > > > > > >>> > > > >> > write it to sink.
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> > Regards,
> > > > > > > > > > >>> > > > >> > Vinay Patil
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias
> J.
> > > > Sax <
> > > > > > > > > > >>> > mjsax@apache.org>
> > > > > > > > > > >>> > > > >> wrote:
> > > > > > > > > > >>> > > > >> >
> > > > > > > > > > >>> > > > >> >> You need to do an outer-join. However,
> > there
> > > is
> > > > > no
> > > > > > > > > build-in
> > > > > > > > > > >>> > support
> > > > > > > > > > >>> > > > for
> > > > > > > > > > >>> > > > >> >> outer-joins yet.
> > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to implement
> the
> > > > > > > outer-join
> > > > > > > > as
> > > > > > > > > > an
> > > > > > > > > > >>> own
> > > > > > > > > > >>> > > > >> operator.
> > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > >>> > > > >> >> -Matthias
> > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil
> wrote:
> > > > > > > > > > >>> > > > >> >>> Hi,
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> > > > >> >>> I have a question regarding the join
> > > > operation,
> > > > > > > > consider
> > > > > > > > > > the
> > > > > > > > > > >>> > > > following
> > > > > > > > > > >>> > > > >> >>> dummy example:
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> >
> > > > > > > >
> > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > > > > > > > > > >>> > > > >> >>>
> > env.fromElements(10,20,23,25,30,33,102,18);
> > > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > > > > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > > > > > > > >>> > > > >> >>> .where(new ElementSelector())
> > > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >
> .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer,
> Integer,
> > > > > > > Integer>() {
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> > > > >> >>> private static final long
> > serialVersionUID =
> > > > 1L;
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> > > > >> >>> @Override
> > > > > > > > > > >>> > > > >> >>> public Integer join(Integer paramIN1,
> > > Integer
> > > > > > > > paramIN2)
> > > > > > > > > > >>> throws
> > > > > > > > > > >>> > > > >> Exception
> > > > > > > > > > >>> > > > >> >> {
> > > > > > > > > > >>> > > > >> >>> return paramIN1;
> > > > > > > > > > >>> > > > >> >>> }
> > > > > > > > > > >>> > > > >> >>> }).print();
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> > > > >> >>> I perfectly get the elements that are
> > > matching
> > > > > in
> > > > > > > both
> > > > > > > > > the
> > > > > > > > > > >>> > > streams,
> > > > > > > > > > >>> > > > >> >> however
> > > > > > > > > > >>> > > > >> >>> my requirement is to write these matched
> > > > > elements
> > > > > > > and
> > > > > > > > > also
> > > > > > > > > > >>> the
> > > > > > > > > > >>> > > > >> unmatched
> > > > > > > > > > >>> > > > >> >>> elements to sink(S3)
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> > > > >> >>> How do I get the unmatched elements from
> > > each
> > > > > > > stream ?
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> > > > >> >>> Regards,
> > > > > > > > > > >>> > > > >> >>> Vinay Patil
> > > > > > > > > > >>> > > > >> >>>
> > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > >>> > > > >> >>
> > > > > > > > > > >>> > > > >>
> > > > > > > > > > >>> > > > >>
> > > > > > > > > > >>> > > > >
> > > > > > > > > > >>> > > >
> > > > > > > > > > >>> > >
> > > > > > > > > > >>> >
> > > > > > > > > > >>>
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: [Discussion] Query regarding Join and Windows

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
you can use ingestion time if you don't care about the timestamps in your
events, yes. If elements from the two streams happen to arrive at such
times that they are not put into the same window then you won't get a
match, correct.

Regarding ingestion time and out-of-order events. I think this section just
reiterates that when using ingestion time the inherent timestamps in your
events will not be considered and their order will not be respected.

Regarding late data: right now, Flink always processes late data and it is
up to the Trigger to decide what to do with late data. You can implement
your custom trigger based on EventTimeTrigger that would immediately purge
a window when an element arrives that is later than an allowed amount of
lateness. In Flink 1.1 we will introduce a setting for windows that allows
to specify an allowed lateness. With this, late elements will be dropped
automatically. This feature is already available in the master, by the way.

Cheers,
Aljoscha

On Wed, 29 Jun 2016 at 14:14 Vinay Patil <vi...@gmail.com> wrote:

> Hi,
>
> Ok.
> Inside the checkAndGetNextWatermark(lastElement, extractedTimestamp) method
> both these parameters are coming same (timestamp value) , I was expecting
> last element timestamp value in the 1st param when I extract it.
>
> Lets say I decide to use IngestionTime (since I am getting accurate results
> here for now), if the joining element of both streams are assigned to
> different windows , then it that case I will not get the match , right ?
>
> However in case of event time this guarantees to be in the same window
> since we are assigning the timestamp, correct me here.
>
>  According to documentation :
> * Ingestion Time programs cannot handle any out-of-order events or late
> data*
>
> In this context What do we mean by out-of-order events How does it know
> that the events are out of order, I mean on which parameter does it decide
> that the events are out-of-order  ? As in case of event time we can say the
> timestamps received are out of order.
>
> Late Data : does it have a threshold after which it does not accept late
> data ?
>
>
> Regards,
> Vinay Patil
>
> On Wed, Jun 29, 2016 at 5:15 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
> > Hi,
> > the element will be kept around indefinitely if no new watermark arrives.
> >
> > I think the same problem will persist for
> AssignerWithPunctuatedWatermarks
> > since there you also might not get the required "last watermark" to
> trigger
> > processing of the last window.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 29 Jun 2016 at 13:18 Vinay Patil <vi...@gmail.com>
> wrote:
> >
> > > Hi Aljoscha,
> > >
> > > This clears a lot of doubts now.
> > > So now lets say the stream paused for a while or it stops completely on
> > > Friday , let us assume that the last message did not get processed and
> is
> > > kept in the internal buffers.
> > >
> > > So when the stream starts again on Monday , will it consider the last
> > > element that is in the internal buffer for processing ?
> > >  How much time the internal buffer can hold the data or will it flush
> the
> > > data after a threshold ?
> > >
> > > I have tried using AssignerWithPunctuatedWatermarks and generated the
> > > watermark for each event, still getting one record less.
> > >
> > >
> > > Regards,
> > > Vinay Patil
> > >
> > > On Wed, Jun 29, 2016 at 2:21 PM, Aljoscha Krettek <aljoscha@apache.org
> >
> > > wrote:
> > >
> > > > Hi,
> > > > the reason why the last element might never be emitted is the way the
> > > > ascending timestamp extractor works. I'll try and explain with an
> > > example.
> > > >
> > > > Let's say we have a window size of 2 milliseconds, elements arrive
> > > starting
> > > > with timestamp 0, window begin timestamp is inclusive, end timestamp
> is
> > > > exclusive:
> > > >
> > > > Element 0, Timestamp 0 (at this point the watermark is -1)
> > > > Element 1, Timestamp 1 (at this point the watermark is 0)
> > > > Element 2, Timestamp 1 (at this point the watermark is still 0)
> > > > Element 3, Timestamp 2 (at this point the watermark is 1)
> > > >
> > > > now we can process the window (0, 2) because we know from the
> watermark
> > > > that no elements can arrive for that window anymore. The window
> > contains
> > > > elements 0,1,2
> > > >
> > > > Element 4, Timestamp 3 (at this point the watermark is 2)
> > > > Element 5, Timestamp 4 (at this point the watermark is 3)
> > > >
> > > > now we can process window (2, 4). The window contains elements 3,4.
> > > >
> > > > At this point, we have Element 5 sitting in internal buffers for
> window
> > > (4,
> > > > 6) but if we don't receive further elements the watermark will never
> > > > advance and we will never process that window.
> > > >
> > > > If, however, we get new elements at some point the watermark advances
> > and
> > > > we don't have a problem. That's what I meant when I said that you
> > > shouldn't
> > > > have a problem if data keeps continuously arriving.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > > >
> > > > On Tue, 28 Jun 2016 at 17:14 Vinay Patil <vi...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > Thanks a lot for your inputs.
> > > > >
> > > > > I still did not get you when you say you will not face this issue
> in
> > > case
> > > > > of continuous stream, lets consider the following example :
> > > > > Assume that the stream runs continuously from Monday  to Friday,
> and
> > on
> > > > > Friday it stops after 5.00 PM , will I still face this issue ?
> > > > >
> > > > > I am actually not able to understand how it will differ in real
> time
> > > > > streams.
> > > > >
> > > > > Regards,
> > > > > Vinay Patil
> > > > >
> > > > > On Tue, Jun 28, 2016 at 5:07 PM, Aljoscha Krettek <
> > aljoscha@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > > ingestion time can only be used if you don't care about the
> > timestamp
> > > > in
> > > > > > the elements. So if you have those you should probably use event
> > > time.
> > > > > >
> > > > > > If your timestamps really are strictly increasing then the
> > ascending
> > > > > > extractor is good. And if you have a continuous stream of
> incoming
> > > > > elements
> > > > > > you will not see the behavior of not getting the last elements.
> > > > > >
> > > > > > By the way, when using Kafka you can also embed the timestamp
> > > extractor
> > > > > > directly in the Kafka consumer. This is described here:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> > > > > >
> > > > > > Cheers,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Tue, 28 Jun 2016 at 11:44 Vinay Patil <
> vinay18.patil@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Aljoscha,
> > > > > > >
> > > > > > > Thank you for your response.
> > > > > > > So do you suggest to use different approach for extracting
> > > timestamp
> > > > > (as
> > > > > > > given in document) instead of AscendingTimeStamp Extractor ?
> > > > > > > Is that the reason I am seeing this unexpected behaviour ? in
> > case
> > > of
> > > > > > > continuous stream I would not see any data loss ?
> > > > > > >
> > > > > > > Also assuming that the records are always going to be in order
> ,
> > > > which
> > > > > is
> > > > > > > the best approach : Ingestion Time or Event Time ?
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Regards,
> > > > > > > Vinay Patil
> > > > > > >
> > > > > > > On Tue, Jun 28, 2016 at 2:41 PM, Aljoscha Krettek <
> > > > aljoscha@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > first regarding tumbling windows: even if you have 5 minute
> > > windows
> > > > > it
> > > > > > > can
> > > > > > > > happen that elements that are only seconds apart go into
> > > different
> > > > > > > windows.
> > > > > > > > Consider the following case:
> > > > > > > >
> > > > > > > > |                x | x                 |
> > > > > > > >
> > > > > > > > These are two 5-mintue windows and the two elements are only
> > > > seconds
> > > > > > > apart
> > > > > > > > but go into different windows because windows are aligned to
> > > epoch.
> > > > > > > >
> > > > > > > > Now, for the ascending timestamp extractor. The reason this
> can
> > > > > behave
> > > > > > in
> > > > > > > > unexpected ways is that it emits a watermark that is "last
> > > > timestamp
> > > > > -
> > > > > > > 1",
> > > > > > > > i.e. if it has seen timestamp t it can only emit watermark
> t-1
> > > > > because
> > > > > > > > there might be other elements with timestamp t arriving. If
> you
> > > > have
> > > > > a
> > > > > > > > continuous stream of elements you wouldn't notice this. Only
> in
> > > > this
> > > > > > > > constructed example does it become visible.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > > Aljoscha
> > > > > > > >
> > > > > > > > On Tue, 28 Jun 2016 at 06:04 Vinay Patil <
> > > vinay18.patil@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > Following is the timestamp I am getting from DTO, here is
> the
> > > > > > timestamp
> > > > > > > > > difference between the two records :
> > > > > > > > > 1466115892162154279
> > > > > > > > > 1466116026233613409
> > > > > > > > >
> > > > > > > > > So the time difference is roughly 3 min, even if I apply
> the
> > > > window
> > > > > > of
> > > > > > > > 5min
> > > > > > > > > , I am not getting the last record (last timestamp value
> > > above),
> > > > > > > > > using ascending timestamp extractor for generating the
> > > timestamp
> > > > > > > > (assuming
> > > > > > > > > that the timestamp are always in order)
> > > > > > > > >
> > > > > > > > > I was at-least expecting data to reach the co-group
> function.
> > > > > > > > > What could be the reason for the data loss ? The data we
> are
> > > > > getting
> > > > > > is
> > > > > > > > > critical, hence we cannot afford to loose any data
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Vinay Patil
> > > > > > > > >
> > > > > > > > > On Mon, Jun 27, 2016 at 11:31 PM, Vinay Patil <
> > > > > > vinay18.patil@gmail.com
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Just an update, when I keep IngestionTime and remove the
> > > > > timestamp
> > > > > > I
> > > > > > > am
> > > > > > > > > > generating, I am getting all the records, but for Event
> > Time
> > > I
> > > > am
> > > > > > > > getting
> > > > > > > > > > one less record, I checked the Time Difference between
> two
> > > > > records,
> > > > > > > it
> > > > > > > > > is 3
> > > > > > > > > > min, I tried keeping the window time to 5 mins, but that
> > even
> > > > did
> > > > > > not
> > > > > > > > > work.
> > > > > > > > > >
> > > > > > > > > > Even when I try assigning timestamp for IngestionTime, I
> > get
> > > > one
> > > > > > > record
> > > > > > > > > > less, so should I safely use Ingestion Time or is it
> always
> > > > > > advisable
> > > > > > > > to
> > > > > > > > > > use EventTime ?
> > > > > > > > > >
> > > > > > > > > > Regards,
> > > > > > > > > > Vinay Patil
> > > > > > > > > >
> > > > > > > > > > On Mon, Jun 27, 2016 at 8:16 PM, Vinay Patil <
> > > > > > > vinay18.patil@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi ,
> > > > > > > > > >>
> > > > > > > > > >> Actually I am only publishing 5 messages each to two
> > > different
> > > > > > kafka
> > > > > > > > > >> topics (using Junit), even if I keep the window to 500
> > > seconds
> > > > > the
> > > > > > > > > result
> > > > > > > > > >> is same.
> > > > > > > > > >>
> > > > > > > > > >> I am not understanding why it is not sending the 5th
> > element
> > > > to
> > > > > > > > co-group
> > > > > > > > > >> operator even when the keys are same.
> > > > > > > > > >>
> > > > > > > > > >> I actually cannot share the actual client code.
> > > > > > > > > >> But this is what the streams look like :
> > > > > > > > > >> sourceStream.coGroup(destStream)
> > > > > > > > > >> here the sourceStream and destStream is actually
> > > > > > Tuple2<String,DTO>
> > > > > > > ,
> > > > > > > > > and
> > > > > > > > > >> the ElementSelector returns tuple.f0 which is the key.
> > > > > > > > > >>
> > > > > > > > > >> I am generating the timestamp based on a field from the
> > DTO
> > > > > which
> > > > > > is
> > > > > > > > > >> guaranteed to be in order.
> > > > > > > > > >>
> > > > > > > > > >> Will using the triggers help here ?
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> Regards,
> > > > > > > > > >> Vinay Patil
> > > > > > > > > >>
> > > > > > > > > >> *+91-800-728-4749*
> > > > > > > > > >>
> > > > > > > > > >> On Mon, Jun 27, 2016 at 7:05 PM, Aljoscha Krettek <
> > > > > > > > aljoscha@apache.org>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >>> Hi,
> > > > > > > > > >>> what timestamps are you assigning? Is it guaranteed
> that
> > > all
> > > > of
> > > > > > > them
> > > > > > > > > >>> would
> > > > > > > > > >>> fall into the same 30 second window?
> > > > > > > > > >>>
> > > > > > > > > >>> The issue with duplicate printing in the
> ElementSelector
> > is
> > > > > > > strange?
> > > > > > > > > >>> Could
> > > > > > > > > >>> you post a more complete code example so that I can
> > > reproduce
> > > > > the
> > > > > > > > > >>> problem?
> > > > > > > > > >>>
> > > > > > > > > >>> Cheers,
> > > > > > > > > >>> Aljoscha
> > > > > > > > > >>>
> > > > > > > > > >>> On Mon, 27 Jun 2016 at 13:21 Vinay Patil <
> > > > > > vinay18.patil@gmail.com>
> > > > > > > > > >>> wrote:
> > > > > > > > > >>>
> > > > > > > > > >>> > Hi ,
> > > > > > > > > >>> >
> > > > > > > > > >>> > I am able to get the matching and non-matching
> > elements.
> > > > > > > > > >>> >
> > > > > > > > > >>> > However when I am unit testing the code , I am
> getting
> > > one
> > > > > > record
> > > > > > > > > less
> > > > > > > > > >>> > inside the overriden cogroup function.
> > > > > > > > > >>> > Testing the following way :
> > > > > > > > > >>> >
> > > > > > > > > >>> > 1) Insert 5 messages into local kafka topic (test1)
> > > > > > > > > >>> > 2) Insert different 5 messages into local kafka topic
> > > > (test2)
> > > > > > > > > >>> > 3) Consume 1) and 2) and I have two different kafka
> > > > streams
> > > > > > > > > >>> > 4) Generate ascending timestamp(using Event Time) for
> > > both
> > > > > > > streams
> > > > > > > > > and
> > > > > > > > > >>> > create key(String)
> > > > > > > > > >>> >
> > > > > > > > > >>> > Now till 4) I am able to get all the records (checked
> > by
> > > > > > printing
> > > > > > > > the
> > > > > > > > > >>> > stream in text file)
> > > > > > > > > >>> >
> > > > > > > > > >>> > However when I send the stream to co-group operator,
> I
> > am
> > > > > > > receiving
> > > > > > > > > one
> > > > > > > > > >>> > less record, using the following syntax:
> > > > > > > > > >>> >
> > > > > > > > > >>> > sourceStream.coGroup(destStream)
> > > > > > > > > >>> > .where(new ElementSelector())
> > > > > > > > > >>> > .equalTo(new ElementSelector())
> > > > > > > > > >>> >
> .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > >>> > .apply(new JoinStreams);
> > > > > > > > > >>> >
> > > > > > > > > >>> > Also in the Element Selector I have inserted a
> sysout,
> > I
> > > am
> > > > > > > getting
> > > > > > > > > 20
> > > > > > > > > >>> > sysouts instead of 10 (10 sysouts for source and 10
> for
> > > > dest
> > > > > > > > stream)
> > > > > > > > > >>> >
> > > > > > > > > >>> > Unable to understand why one record is coming less to
> > > > > co-group
> > > > > > > > > >>> >
> > > > > > > > > >>> >
> > > > > > > > > >>> >
> > > > > > > > > >>> > Regards,
> > > > > > > > > >>> > Vinay Patil
> > > > > > > > > >>> >
> > > > > > > > > >>> > On Wed, Jun 15, 2016 at 1:39 PM, Fabian Hueske <
> > > > > > > fhueske@gmail.com>
> > > > > > > > > >>> wrote:
> > > > > > > > > >>> >
> > > > > > > > > >>> > > Can you add a flag to each element emitted by the
> > > > > > > CoGroupFunction
> > > > > > > > > >>> that
> > > > > > > > > >>> > > indicates whether it was joined or not?
> > > > > > > > > >>> > > Then you can use split to distinguish between both
> > > cases
> > > > > and
> > > > > > > > handle
> > > > > > > > > >>> both
> > > > > > > > > >>> > > streams differently.
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > Best, Fabian
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > 2016-06-15 6:45 GMT+02:00 Vinay Patil <
> > > > > > vinay18.patil@gmail.com
> > > > > > > >:
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > > Hi Jark,
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > I am able to get the non-matching elements in a
> > > stream
> > > > :,
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > Of-course we can collect the matching elements in
> > the
> > > > > same
> > > > > > > > stream
> > > > > > > > > >>> as
> > > > > > > > > >>> > > well,
> > > > > > > > > >>> > > > however I want to perform additional operations
> on
> > > the
> > > > > > joined
> > > > > > > > > >>> stream
> > > > > > > > > >>> > > before
> > > > > > > > > >>> > > > writing it to S3, so I would have to include a
> > > separate
> > > > > > join
> > > > > > > > > >>> operator
> > > > > > > > > >>> > for
> > > > > > > > > >>> > > > the same two streams, right ?
> > > > > > > > > >>> > > > Correct me if I am wrong.
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > I have pasted the dummy code which collects the
> > > > > > non-matching
> > > > > > > > > >>> records (i
> > > > > > > > > >>> > > > have to perform this on the actual data, correct
> me
> > > if
> > > > I
> > > > > am
> > > > > > > > dong
> > > > > > > > > >>> > wrong).
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > sourceStream.coGroup(destStream).where(new
> > > > > > > > > >>> > ElementSelector()).equalTo(new
> > > > > > > > > >>> > > > ElementSelector())
> > > > > > > > > >>> > > >
> > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > >>> > > > .apply(new CoGroupFunction<Integer, Integer,
> > > > Integer>() {
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > private static final long serialVersionUID =
> > > > > > > > > 6408179761497497475L;
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > @Override
> > > > > > > > > >>> > > > public void coGroup(Iterable<Integer>
> > paramIterable,
> > > > > > > > > >>> Iterable<Integer>
> > > > > > > > > >>> > > > paramIterable1,
> > > > > > > > > >>> > > > Collector<Integer> paramCollector) throws
> > Exception {
> > > > > > > > > >>> > > > long exactSizeIfKnown =
> > > > > > > > > >>> > > paramIterable.spliterator().getExactSizeIfKnown();
> > > > > > > > > >>> > > > long exactSizeIfKnown2 =
> > > > > > > > > >>> > > >
> paramIterable1.spliterator().getExactSizeIfKnown();
> > > > > > > > > >>> > > > if(exactSizeIfKnown == 0 ) {
> > > > > > > > > >>> > > >
> > > > paramCollector.collect(paramIterable1.iterator().next());
> > > > > > > > > >>> > > > } else if (exactSizeIfKnown2 == 0) {
> > > > > > > > > >>> > > >
> > > > paramCollector.collect(paramIterable.iterator().next());
> > > > > > > > > >>> > > > }
> > > > > > > > > >>> > > > }
> > > > > > > > > >>> > > > }).print();
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > Regards,
> > > > > > > > > >>> > > > Vinay Patil
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > On Tue, Jun 14, 2016 at 1:37 PM, Vinay Patil <
> > > > > > > > > >>> vinay18.patil@gmail.com>
> > > > > > > > > >>> > > > wrote:
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > > You are right, debugged it for all elements , I
> > can
> > > > do
> > > > > > that
> > > > > > > > > now.
> > > > > > > > > >>> > > > > Thanks a lot.
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > > > Regards,
> > > > > > > > > >>> > > > > Vinay Patil
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > > > On Tue, Jun 14, 2016 at 11:56 AM, Jark Wu <
> > > > > > > > > >>> > wuchong.wc@alibaba-inc.com>
> > > > > > > > > >>> > > > > wrote:
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > > >> In `coGroup(Iterable<Integer> iter1,
> > > > Iterable<Integer>
> > > > > > > > iter2,
> > > > > > > > > >>> > > > >> Collector<Integer> out)` ,   when both iter1
> and
> > > > iter2
> > > > > > are
> > > > > > > > not
> > > > > > > > > >>> > empty,
> > > > > > > > > >>> > > it
> > > > > > > > > >>> > > > >> means they are matched elements from both
> > stream.
> > > > > > > > > >>> > > > >> When one of iter1 and iter2 is empty , it
> means
> > > that
> > > > > > they
> > > > > > > > are
> > > > > > > > > >>> > > unmatched.
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> - Jark Wu (wuchong)
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> > 在 2016年6月14日,下午12:46,Vinay Patil <
> > > > > > > vinay18.patil@gmail.com
> > > > > > > > >
> > > > > > > > > >>> 写道:
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > Hi Matthias ,
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > I did not get you, even if we use Co-Group
> we
> > > have
> > > > > to
> > > > > > > > apply
> > > > > > > > > >>> it on
> > > > > > > > > >>> > a
> > > > > > > > > >>> > > > key
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > sourceStream.coGroup(destStream)
> > > > > > > > > >>> > > > >> > .where(new ElementSelector())
> > > > > > > > > >>> > > > >> > .equalTo(new ElementSelector())
> > > > > > > > > >>> > > > >> >
> > > > > .window(TumblingEventTimeWindows.of(Time.seconds(30)))
> > > > > > > > > >>> > > > >> > .apply(new CoGroupFunction<Integer, Integer,
> > > > > > Integer>()
> > > > > > > {
> > > > > > > > > >>> > > > >> > private static final long serialVersionUID =
> > > > > > > > > >>> 6408179761497497475L;
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > @Override
> > > > > > > > > >>> > > > >> > public void coGroup(Iterable<Integer>
> > > > paramIterable,
> > > > > > > > > >>> > > Iterable<Integer>
> > > > > > > > > >>> > > > >> > paramIterable1,
> > > > > > > > > >>> > > > >> > Collector<Integer> paramCollector) throws
> > > > Exception
> > > > > {
> > > > > > > > > >>> > > > >> > Iterator<Integer> iterator =
> > > > > paramIterable.iterator();
> > > > > > > > > >>> > > > >> > while(iterator.hasNext()) {
> > > > > > > > > >>> > > > >> > }
> > > > > > > > > >>> > > > >> > }
> > > > > > > > > >>> > > > >> > });
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > when I debug this ,only the matched element
> > from
> > > > > both
> > > > > > > > stream
> > > > > > > > > >>> will
> > > > > > > > > >>> > > come
> > > > > > > > > >>> > > > >> in
> > > > > > > > > >>> > > > >> > the coGroup function.
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > What I want is how do I check for unmatched
> > > > elements
> > > > > > > from
> > > > > > > > > both
> > > > > > > > > >>> > > streams
> > > > > > > > > >>> > > > >> and
> > > > > > > > > >>> > > > >> > write it to sink.
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > Regards,
> > > > > > > > > >>> > > > >> > Vinay Patil
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > *+91-800-728-4749*
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > On Tue, Jun 14, 2016 at 2:07 AM, Matthias J.
> > > Sax <
> > > > > > > > > >>> > mjsax@apache.org>
> > > > > > > > > >>> > > > >> wrote:
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> >> You need to do an outer-join. However,
> there
> > is
> > > > no
> > > > > > > > build-in
> > > > > > > > > >>> > support
> > > > > > > > > >>> > > > for
> > > > > > > > > >>> > > > >> >> outer-joins yet.
> > > > > > > > > >>> > > > >> >>
> > > > > > > > > >>> > > > >> >> You can use Window-CoGroup to implement the
> > > > > > outer-join
> > > > > > > as
> > > > > > > > > an
> > > > > > > > > >>> own
> > > > > > > > > >>> > > > >> operator.
> > > > > > > > > >>> > > > >> >>
> > > > > > > > > >>> > > > >> >>
> > > > > > > > > >>> > > > >> >> -Matthias
> > > > > > > > > >>> > > > >> >>
> > > > > > > > > >>> > > > >> >> On 06/13/2016 06:53 PM, Vinay Patil wrote:
> > > > > > > > > >>> > > > >> >>> Hi,
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> > > > >> >>> I have a question regarding the join
> > > operation,
> > > > > > > consider
> > > > > > > > > the
> > > > > > > > > >>> > > > following
> > > > > > > > > >>> > > > >> >>> dummy example:
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> > > > >> >>> StreamExecutionEnvironment env =
> > > > > > > > > >>> > > > >> >>>
> > > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> >
> > > > > > >
> > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> sourceStream =
> > > > > > > > > >>> > > > >> >>>
> env.fromElements(10,20,23,25,30,33,102,18);
> > > > > > > > > >>> > > > >> >>> DataStreamSource<Integer> destStream =
> > > > > > > > > >>> > > > >> >> env.fromElements(20,30,40,50,60,10);
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> > > > >> >>> sourceStream.join(destStream)
> > > > > > > > > >>> > > > >> >>> .where(new ElementSelector())
> > > > > > > > > >>> > > > >> >>> .equalTo(new ElementSelector())
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
> > > > > > > > > >>> > > > >> >>> .apply(new JoinFunction<Integer, Integer,
> > > > > > Integer>() {
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> > > > >> >>> private static final long
> serialVersionUID =
> > > 1L;
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> > > > >> >>> @Override
> > > > > > > > > >>> > > > >> >>> public Integer join(Integer paramIN1,
> > Integer
> > > > > > > paramIN2)
> > > > > > > > > >>> throws
> > > > > > > > > >>> > > > >> Exception
> > > > > > > > > >>> > > > >> >> {
> > > > > > > > > >>> > > > >> >>> return paramIN1;
> > > > > > > > > >>> > > > >> >>> }
> > > > > > > > > >>> > > > >> >>> }).print();
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> > > > >> >>> I perfectly get the elements that are
> > matching
> > > > in
> > > > > > both
> > > > > > > > the
> > > > > > > > > >>> > > streams,
> > > > > > > > > >>> > > > >> >> however
> > > > > > > > > >>> > > > >> >>> my requirement is to write these matched
> > > > elements
> > > > > > and
> > > > > > > > also
> > > > > > > > > >>> the
> > > > > > > > > >>> > > > >> unmatched
> > > > > > > > > >>> > > > >> >>> elements to sink(S3)
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> > > > >> >>> How do I get the unmatched elements from
> > each
> > > > > > stream ?
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> > > > >> >>> Regards,
> > > > > > > > > >>> > > > >> >>> Vinay Patil
> > > > > > > > > >>> > > > >> >>>
> > > > > > > > > >>> > > > >> >>
> > > > > > > > > >>> > > > >> >>
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> >
> > > > > > > > > >>>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>