You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Bhupesh Chawda <bh...@datatorrent.com> on 2017/03/17 08:52:53 UTC

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

Created a JIRA to track this:
https://issues.apache.org/jira/browse/APEXMALHAR-2449

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: bhupesh@datatorrent.com | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Tue, Feb 28, 2017 at 10:59 PM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> Hi Thomas,
>
> Even though the windowing operator is not just "event time", it seems it
> is too much dependent on the "time" attribute of the incoming tuple. This
> is the reason we had to model the file index as a timestamp to solve the
> batch case for files.
> Perhaps we should work on increasing the scope of the windowed operator to
> consider other types of windows as well. The Sequence option suggested by
> David seems to be something in that direction.
>
> ~ Bhupesh
>
>
> _______________________________________________________
>
> Bhupesh Chawda
>
> E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>
> www.datatorrent.com  |  apex.apache.org
>
>
>
> On Tue, Feb 28, 2017 at 10:48 PM, Thomas Weise <th...@apache.org> wrote:
>
>> That's correct, we are looking at a generalized approach for state
>> management vs. a series of special cases.
>>
>> And to be clear, windowing does not imply event time, otherwise it would
>> be
>> "EventTimeOperator" :-)
>>
>> Thomas
>>
>> On Tue, Feb 28, 2017 at 9:11 AM, Bhupesh Chawda <bh...@datatorrent.com>
>> wrote:
>>
>> > Hi David,
>> >
>> > I went through the discussion, but it seems like it is more on the event
>> > time watermark handling as opposed to batches. What we are trying to do
>> is
>> > have watermarks serve the purpose of demarcating batches using control
>> > tuples. Since each batch is separate from others, we would like to have
>> > stateful processing within a batch, but not across batches.
>> > At the same time, we would like to do this in a manner which is
>> consistent
>> > with the windowing mechanism provided by the windowed operator. This
>> will
>> > allow us to treat a single batch as a (bounded) stream and apply all the
>> > event time windowing concepts in that time span.
>> >
>> > For example, let's say I need to process data for a day (24 hours) as a
>> > single batch. The application is still streaming in nature: it would end
>> > the batch after a day and start a new batch the next day. At the same
>> time,
>> > I would be able to have early trigger firings every minute as well as
>> drop
>> > any data which is say, 5 mins late. All this within a single day.
>> >
>> > ~ Bhupesh
>> >
>> >
>> >
>> > _______________________________________________________
>> >
>> > Bhupesh Chawda
>> >
>> > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>> >
>> > www.datatorrent.com  |  apex.apache.org
>> >
>> >
>> >
>> > On Tue, Feb 28, 2017 at 9:27 PM, David Yan <da...@gmail.com> wrote:
>> >
>> > > There is a discussion in the Flink mailing list about key-based
>> > watermarks.
>> > > I think it's relevant to our use case here.
>> > > https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef
>> > > 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
>> > >
>> > > David
>> > >
>> > > On Tue, Feb 28, 2017 at 2:13 AM, Bhupesh Chawda <
>> bhupesh@datatorrent.com
>> > >
>> > > wrote:
>> > >
>> > > > Hi David,
>> > > >
>> > > > If using time window does not seem appropriate, we can have another
>> > class
>> > > > which is more suited for such sequential and distinct windows.
>> > Perhaps, a
>> > > > CustomWindow option can be introduced which takes in a window id.
>> The
>> > > > purpose of this window option could be to translate the window id
>> into
>> > > > appropriate timestamps.
>> > > >
>> > > > Another option would be to go with a custom timestampExtractor for
>> such
>> > > > tuples which translates the each unique file name to a distinct
>> > timestamp
>> > > > while using time windows in the windowed operator.
>> > > >
>> > > > ~ Bhupesh
>> > > >
>> > > >
>> > > > _______________________________________________________
>> > > >
>> > > > Bhupesh Chawda
>> > > >
>> > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>> > > >
>> > > > www.datatorrent.com  |  apex.apache.org
>> > > >
>> > > >
>> > > >
>> > > > On Tue, Feb 28, 2017 at 12:28 AM, David Yan <da...@gmail.com>
>> > wrote:
>> > > >
>> > > > > I now see your rationale on putting the filename in the window.
>> > > > > As far as I understand, the reasons why the filename is not part
>> of
>> > the
>> > > > key
>> > > > > and the Global Window is not used are:
>> > > > >
>> > > > > 1) The files are processed in sequence, not in parallel
>> > > > > 2) The windowed operator should not keep the state associated with
>> > the
>> > > > file
>> > > > > when the processing of the file is done
>> > > > > 3) The trigger should be fired for the file when a file is done
>> > > > processing.
>> > > > >
>> > > > > However, if the file is just a sequence has nothing to do with a
>> > > > timestamp,
>> > > > > assigning a timestamp to a file is not an intuitive thing to do
>> and
>> > > would
>> > > > > just create confusions to the users, especially when it's used as
>> an
>> > > > > example for new users.
>> > > > >
>> > > > > How about having a separate class called SequenceWindow? And
>> perhaps
>> > > > > TimeWindow can inherit from it?
>> > > > >
>> > > > > David
>> > > > >
>> > > > > On Mon, Feb 27, 2017 at 8:58 AM, Thomas Weise <th...@apache.org>
>> > wrote:
>> > > > >
>> > > > > > On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda <
>> > > > bhupesh@datatorrent.com
>> > > > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > I think my comments related to count based windows might be
>> > causing
>> > > > > > > confusion. Let's not discuss count based scenarios for now.
>> > > > > > >
>> > > > > > > Just want to make sure we are on the same page wrt. the "each
>> > file
>> > > > is a
>> > > > > > > batch" use case. As mentioned by Thomas, the each tuple from
>> the
>> > > same
>> > > > > > file
>> > > > > > > has the same timestamp (which is just a sequence number) and
>> that
>> > > > helps
>> > > > > > > keep tuples from each file in a separate window.
>> > > > > > >
>> > > > > >
>> > > > > > Yes, in this case it is a sequence number, but it could be a
>> time
>> > > stamp
>> > > > > > also, depending on the file naming convention. And if it was
>> event
>> > > time
>> > > > > > processing, the watermark would be derived from records within
>> the
>> > > > file.
>> > > > > >
>> > > > > > Agreed, the source should have a mechanism to control the time
>> > stamp
>> > > > > > extraction along with everything else pertaining to the
>> watermark
>> > > > > > generation.
>> > > > > >
>> > > > > >
>> > > > > > > We could also implement a "timestampExtractor" interface to
>> > > identify
>> > > > > the
>> > > > > > > timestamp (sequence number) for a file.
>> > > > > > >
>> > > > > > > ~ Bhupesh
>> > > > > > >
>> > > > > > >
>> > > > > > > _______________________________________________________
>> > > > > > >
>> > > > > > > Bhupesh Chawda
>> > > > > > >
>> > > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>> > > > > > >
>> > > > > > > www.datatorrent.com  |  apex.apache.org
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Mon, Feb 27, 2017 at 9:52 PM, Thomas Weise <thw@apache.org
>> >
>> > > > wrote:
>> > > > > > >
>> > > > > > > > I don't think this is a use case for count based window.
>> > > > > > > >
>> > > > > > > > We have multiple files that are retrieved in a sequence and
>> > there
>> > > > is
>> > > > > no
>> > > > > > > > knowledge of the number of records per file. The
>> requirement is
>> > > to
>> > > > > > > > aggregate each file separately and emit the aggregate when
>> the
>> > > file
>> > > > > is
>> > > > > > > read
>> > > > > > > > fully. There is no concept of "end of something" for an
>> > > individual
>> > > > > key
>> > > > > > > and
>> > > > > > > > global window isn't applicable.
>> > > > > > > >
>> > > > > > > > However, as already explained and implemented by Bhupesh,
>> this
>> > > can
>> > > > be
>> > > > > > > > solved using watermark and window (in this case the window
>> > > > timestamp
>> > > > > > > isn't
>> > > > > > > > a timestamp, but a file sequence, but that doesn't matter.
>> > > > > > > >
>> > > > > > > > Thomas
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Mon, Feb 27, 2017 at 8:05 AM, David Yan <
>> davidyan@gmail.com
>> > >
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > > > I don't think this is the way to go. Global Window only
>> means
>> > > the
>> > > > > > > > timestamp
>> > > > > > > > > does not matter (or that there is no timestamp). It does
>> not
>> > > > > > > necessarily
>> > > > > > > > > mean it's a large batch. Unless there is some notion of
>> event
>> > > > time
>> > > > > > for
>> > > > > > > > each
>> > > > > > > > > file, you don't want to embed the file into the window
>> > itself.
>> > > > > > > > >
>> > > > > > > > > If you want the result broken up by file name, and if the
>> > files
>> > > > are
>> > > > > > to
>> > > > > > > be
>> > > > > > > > > processed in parallel, I think making the file name be
>> part
>> > of
>> > > > the
>> > > > > > key
>> > > > > > > is
>> > > > > > > > > the way to go. I think it's very confusing if we somehow
>> make
>> > > the
>> > > > > > file
>> > > > > > > to
>> > > > > > > > > be part of the window.
>> > > > > > > > >
>> > > > > > > > > For count-based window, it's not implemented yet and
>> you're
>> > > > welcome
>> > > > > > to
>> > > > > > > > add
>> > > > > > > > > that feature. In case of count-based windows, there would
>> be
>> > no
>> > > > > > notion
>> > > > > > > of
>> > > > > > > > > time and you probably only trigger at the end of each
>> window.
>> > > In
>> > > > > the
>> > > > > > > case
>> > > > > > > > > of count-based windows, the watermark only matters for
>> batch
>> > > > since
>> > > > > > you
>> > > > > > > > need
>> > > > > > > > > a way to know when the batch has ended (if the count is
>> 10,
>> > the
>> > > > > > number
>> > > > > > > of
>> > > > > > > > > tuples in the batch is let's say 105, you need a way to
>> end
>> > the
>> > > > > last
>> > > > > > > > window
>> > > > > > > > > with 5 tuples).
>> > > > > > > > >
>> > > > > > > > > David
>> > > > > > > > >
>> > > > > > > > > On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <
>> > > > > > > bhupesh@datatorrent.com
>> > > > > > > > >
>> > > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi David,
>> > > > > > > > > >
>> > > > > > > > > > Thanks for your comments.
>> > > > > > > > > >
>> > > > > > > > > > The wordcount example that I created based on the
>> windowed
>> > > > > operator
>> > > > > > > > does
>> > > > > > > > > > processing of word counts per file (each file as a
>> separate
>> > > > > batch),
>> > > > > > > > i.e.
>> > > > > > > > > > process counts for each file and dump into separate
>> files.
>> > > > > > > > > > As I understand Global window is for one large batch;
>> i.e.
>> > > all
>> > > > > > > incoming
>> > > > > > > > > > data falls into the same batch. This could not be
>> processed
>> > > > using
>> > > > > > > > > > GlobalWindow option as we need more than one windows. In
>> > this
>> > > > > > case, I
>> > > > > > > > > > configured the windowed operator to have time windows of
>> > 1ms
>> > > > each
>> > > > > > and
>> > > > > > > > > > passed data for each file with increasing timestamps:
>> > (file1,
>> > > > 1),
>> > > > > > > > (file2,
>> > > > > > > > > > 2) and so on. Is there a better way of handling this
>> > > scenario?
>> > > > > > > > > >
>> > > > > > > > > > Regarding (2 - count based windows), I think there is a
>> > > trigger
>> > > > > > > option
>> > > > > > > > to
>> > > > > > > > > > process count based windows. In case I want to process
>> > every
>> > > > 1000
>> > > > > > > > tuples
>> > > > > > > > > as
>> > > > > > > > > > a batch, I could set the Trigger option to CountTrigger
>> > with
>> > > > the
>> > > > > > > > > > accumulation set to Discarding. Is this correct?
>> > > > > > > > > >
>> > > > > > > > > > I agree that (4. Final Watermark) can be done using
>> Global
>> > > > > window.
>> > > > > > > > > >
>> > > > > > > > > > ​~ Bhupesh​
>> > > > > > > > > >
>> > > > > > > > > > _______________________________________________________
>> > > > > > > > > >
>> > > > > > > > > > Bhupesh Chawda
>> > > > > > > > > >
>> > > > > > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>> > > > > > > > > >
>> > > > > > > > > > www.datatorrent.com  |  apex.apache.org
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Mon, Feb 27, 2017 at 12:18 PM, David Yan <
>> > > > davidyan@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > I'm worried that we are making the watermark concept
>> too
>> > > > > > > complicated.
>> > > > > > > > > > >
>> > > > > > > > > > > Watermarks should simply just tell you what windows
>> can
>> > be
>> > > > > > > considered
>> > > > > > > > > > > complete.
>> > > > > > > > > > >
>> > > > > > > > > > > Point 2 is basically a count-based window. Watermarks
>> do
>> > > not
>> > > > > > play a
>> > > > > > > > > role
>> > > > > > > > > > > here because the window is always complete at the n-th
>> > > tuple.
>> > > > > > > > > > >
>> > > > > > > > > > > If I understand correctly, point 3 is for batch
>> > processing
>> > > of
>> > > > > > > files.
>> > > > > > > > > > Unless
>> > > > > > > > > > > the files contain timed events, it sounds to be that
>> this
>> > > can
>> > > > > be
>> > > > > > > > > achieved
>> > > > > > > > > > > with just a Global Window. For signaling EOF, a
>> watermark
>> > > > with
>> > > > > a
>> > > > > > > > > > +infinity
>> > > > > > > > > > > timestamp can be used so that triggers will be fired
>> upon
>> > > > > receipt
>> > > > > > > of
>> > > > > > > > > that
>> > > > > > > > > > > watermark.
>> > > > > > > > > > >
>> > > > > > > > > > > For point 4, just like what I mentioned above, can be
>> > > > achieved
>> > > > > > > with a
>> > > > > > > > > > > watermark with a +infinity timestamp.
>> > > > > > > > > > >
>> > > > > > > > > > > David
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
>> > > > > > > > > bhupesh@datatorrent.com
>> > > > > > > > > > >
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi Thomas,
>> > > > > > > > > > > >
>> > > > > > > > > > > > For an input operator which is supposed to generate
>> > > > > watermarks
>> > > > > > > for
>> > > > > > > > > > > > downstream operators, I can think about the
>> following
>> > > > > > watermarks
>> > > > > > > > that
>> > > > > > > > > > the
>> > > > > > > > > > > > operator can emit:
>> > > > > > > > > > > > 1. Time based watermarks (the high watermark / low
>> > > > watermark)
>> > > > > > > > > > > > 2. Number of tuple based watermarks (Every n tuples)
>> > > > > > > > > > > > 3. File based watermarks (Start file, end file)
>> > > > > > > > > > > > 4. Final watermark
>> > > > > > > > > > > >
>> > > > > > > > > > > > File based watermarks seem to be applicable for
>> batch
>> > > (file
>> > > > > > > based)
>> > > > > > > > as
>> > > > > > > > > > > well,
>> > > > > > > > > > > > and hence I thought of looking at these first. Does
>> > this
>> > > > seem
>> > > > > > to
>> > > > > > > be
>> > > > > > > > > in
>> > > > > > > > > > > line
>> > > > > > > > > > > > with the thought process?
>> > > > > > > > > > > >
>> > > > > > > > > > > > ~ Bhupesh
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > ______________________________
>> > _________________________
>> > > > > > > > > > > >
>> > > > > > > > > > > > Bhupesh Chawda
>> > > > > > > > > > > >
>> > > > > > > > > > > > Software Engineer
>> > > > > > > > > > > >
>> > > > > > > > > > > > E: bhupesh@datatorrent.com | Twitter: @bhupeshsc
>> > > > > > > > > > > >
>> > > > > > > > > > > > www.datatorrent.com  |  apex.apache.org
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <
>> > > > > thw@apache.org
>> > > > > > >
>> > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > I don't think this should be designed based on a
>> > > > simplistic
>> > > > > > > file
>> > > > > > > > > > > > > input-output scenario. It would be good to
>> include a
>> > > > > stateful
>> > > > > > > > > > > > > transformation based on event time.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > More complex pipelines contain stateful
>> > transformations
>> > > > > that
>> > > > > > > > depend
>> > > > > > > > > > on
>> > > > > > > > > > > > > windowing and watermarks. I think we need a
>> watermark
>> > > > > concept
>> > > > > > > > that
>> > > > > > > > > is
>> > > > > > > > > > > > based
>> > > > > > > > > > > > > on progress in event time (or other monotonic
>> > > increasing
>> > > > > > > > sequence)
>> > > > > > > > > > that
>> > > > > > > > > > > > > other operators can generically work with.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Note that even file input in many cases can
>> produce
>> > > time
>> > > > > > based
>> > > > > > > > > > > > watermarks,
>> > > > > > > > > > > > > for example when you read part files that are
>> bound
>> > by
>> > > > > event
>> > > > > > > > time.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > Thomas
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda <
>> > > > > > > > > > > bhupesh@datatorrent.com
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > For better understanding the use case for
>> control
>> > > > tuples
>> > > > > in
>> > > > > > > > > batch,
>> > > > > > > > > > ​I
>> > > > > > > > > > > > am
>> > > > > > > > > > > > > > creating a prototype for a batch application
>> using
>> > > File
>> > > > > > Input
>> > > > > > > > and
>> > > > > > > > > > > File
>> > > > > > > > > > > > > > Output operators.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > To enable basic batch processing for File IO
>> > > > operators, I
>> > > > > > am
>> > > > > > > > > > > proposing
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > following changes to File input and output
>> > operators:
>> > > > > > > > > > > > > > 1. File Input operator emits a watermark each
>> time
>> > it
>> > > > > opens
>> > > > > > > and
>> > > > > > > > > > > closes
>> > > > > > > > > > > > a
>> > > > > > > > > > > > > > file. These can be "start file" and "end file"
>> > > > watermarks
>> > > > > > > which
>> > > > > > > > > > > include
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > corresponding file names. The "start file" tuple
>> > > should
>> > > > > be
>> > > > > > > sent
>> > > > > > > > > > > before
>> > > > > > > > > > > > > any
>> > > > > > > > > > > > > > of the data from that file flows.
>> > > > > > > > > > > > > > 2. File Input operator can be configured to end
>> the
>> > > > > > > application
>> > > > > > > > > > > after a
>> > > > > > > > > > > > > > single or n scans of the directory (a batch).
>> This
>> > is
>> > > > > where
>> > > > > > > the
>> > > > > > > > > > > > operator
>> > > > > > > > > > > > > > emits the final watermark (the end of
>> application
>> > > > control
>> > > > > > > > tuple).
>> > > > > > > > > > > This
>> > > > > > > > > > > > > will
>> > > > > > > > > > > > > > also shutdown the application.
>> > > > > > > > > > > > > > 3. The File output operator handles these
>> control
>> > > > tuples.
>> > > > > > > > "Start
>> > > > > > > > > > > file"
>> > > > > > > > > > > > > > initializes the file name for the incoming
>> tuples.
>> > > "End
>> > > > > > file"
>> > > > > > > > > > > watermark
>> > > > > > > > > > > > > > forces a finalize on that file.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > The user would be able to enable the operators
>> to
>> > > send
>> > > > > only
>> > > > > > > > those
>> > > > > > > > > > > > > > watermarks that are needed in the application.
>> If
>> > > none
>> > > > of
>> > > > > > the
>> > > > > > > > > > options
>> > > > > > > > > > > > are
>> > > > > > > > > > > > > > configured, the operators behave as in a
>> streaming
>> > > > > > > application.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > There are a few challenges in the implementation
>> > > where
>> > > > > the
>> > > > > > > > input
>> > > > > > > > > > > > operator
>> > > > > > > > > > > > > > is partitioned. In this case, the correlation
>> > between
>> > > > the
>> > > > > > > > > start/end
>> > > > > > > > > > > > for a
>> > > > > > > > > > > > > > file and the data tuples for that file is lost.
>> > Hence
>> > > > we
>> > > > > > need
>> > > > > > > > to
>> > > > > > > > > > > > maintain
>> > > > > > > > > > > > > > the filename as part of each tuple in the
>> pipeline.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > The "start file" and "end file" control tuples
>> in
>> > > this
>> > > > > > > example
>> > > > > > > > > are
>> > > > > > > > > > > > > > temporary names for watermarks. We can have
>> generic
>> > > > > "start
>> > > > > > > > > batch" /
>> > > > > > > > > > > > "end
>> > > > > > > > > > > > > > batch" tuples which could be used for other use
>> > cases
>> > > > as
>> > > > > > > well.
>> > > > > > > > > The
>> > > > > > > > > > > > Final
>> > > > > > > > > > > > > > watermark is common and serves the same purpose
>> in
>> > > each
>> > > > > > case.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Please let me know your thoughts on this.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > ~ Bhupesh
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh
>> Chawda <
>> > > > > > > > > > > > > bhupesh@datatorrent.com>
>> > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Yes, this can be part of operator
>> configuration.
>> > > > Given
>> > > > > > > this,
>> > > > > > > > > for
>> > > > > > > > > > a
>> > > > > > > > > > > > user
>> > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > define a batch application, would mean
>> > configuring
>> > > > the
>> > > > > > > > > connectors
>> > > > > > > > > > > > > (mostly
>> > > > > > > > > > > > > > > the input operator) in the application for the
>> > > > desired
>> > > > > > > > > behavior.
>> > > > > > > > > > > > > > Similarly,
>> > > > > > > > > > > > > > > there can be other use cases that can be
>> achieved
>> > > > other
>> > > > > > > than
>> > > > > > > > > > batch.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > We may also need to take care of the
>> following:
>> > > > > > > > > > > > > > > 1. Make sure that the watermarks or control
>> > tuples
>> > > > are
>> > > > > > > > > consistent
>> > > > > > > > > > > > > across
>> > > > > > > > > > > > > > > sources. Meaning an HDFS sink should be able
>> to
>> > > > > interpret
>> > > > > > > the
>> > > > > > > > > > > > watermark
>> > > > > > > > > > > > > > > tuple sent out by, say, a JDBC source.
>> > > > > > > > > > > > > > > 2. In addition to I/O connectors, we should
>> also
>> > > look
>> > > > > at
>> > > > > > > the
>> > > > > > > > > need
>> > > > > > > > > > > for
>> > > > > > > > > > > > > > > processing operators to understand some of the
>> > > > control
>> > > > > > > > tuples /
>> > > > > > > > > > > > > > watermarks.
>> > > > > > > > > > > > > > > For example, we may want to reset the operator
>> > > > behavior
>> > > > > > on
>> > > > > > > > > > arrival
>> > > > > > > > > > > of
>> > > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > watermark tuple.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > ~ Bhupesh
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise
>> <
>> > > > > > > > thw@apache.org>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >> The HDFS source can operate in two modes,
>> > bounded
>> > > or
>> > > > > > > > > unbounded.
>> > > > > > > > > > If
>> > > > > > > > > > > > you
>> > > > > > > > > > > > > > >> scan
>> > > > > > > > > > > > > > >> only once, then it should emit the final
>> > watermark
>> > > > > after
>> > > > > > > it
>> > > > > > > > is
>> > > > > > > > > > > done.
>> > > > > > > > > > > > > > >> Otherwise it would emit watermarks based on a
>> > > policy
>> > > > > > > (files
>> > > > > > > > > > names
>> > > > > > > > > > > > > etc.).
>> > > > > > > > > > > > > > >> The mechanism to generate the marks may
>> depend
>> > on
>> > > > the
>> > > > > > type
>> > > > > > > > of
>> > > > > > > > > > > source
>> > > > > > > > > > > > > and
>> > > > > > > > > > > > > > >> the user needs to be able to
>> influence/configure
>> > > it.
>> > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > >> Thomas
>> > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh
>> Chawda
>> > <
>> > > > > > > > > > > > > > bhupesh@datatorrent.com>
>> > > > > > > > > > > > > > >> wrote:
>> > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > >> > Hi Thomas,
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >> > I am not sure that I completely understand
>> > your
>> > > > > > > > suggestion.
>> > > > > > > > > > Are
>> > > > > > > > > > > > you
>> > > > > > > > > > > > > > >> > suggesting to broaden the scope of the
>> > proposal
>> > > to
>> > > > > > treat
>> > > > > > > > all
>> > > > > > > > > > > > sources
>> > > > > > > > > > > > > > as
>> > > > > > > > > > > > > > >> > bounded as well as unbounded?
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >> > In case of Apex, we treat all sources as
>> > > unbounded
>> > > > > > > > sources.
>> > > > > > > > > > Even
>> > > > > > > > > > > > > > bounded
>> > > > > > > > > > > > > > >> > sources like HDFS file source is treated as
>> > > > > unbounded
>> > > > > > by
>> > > > > > > > > means
>> > > > > > > > > > > of
>> > > > > > > > > > > > > > >> scanning
>> > > > > > > > > > > > > > >> > the input directory repeatedly.
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >> > Let's consider HDFS file source for
>> example:
>> > > > > > > > > > > > > > >> > In this case, if we treat it as a bounded
>> > > source,
>> > > > we
>> > > > > > can
>> > > > > > > > > > define
>> > > > > > > > > > > > > hooks
>> > > > > > > > > > > > > > >> which
>> > > > > > > > > > > > > > >> > allows us to detect the end of the file and
>> > send
>> > > > the
>> > > > > > > > "final
>> > > > > > > > > > > > > > watermark".
>> > > > > > > > > > > > > > >> We
>> > > > > > > > > > > > > > >> > could also consider HDFS file source as a
>> > > > streaming
>> > > > > > > source
>> > > > > > > > > and
>> > > > > > > > > > > > > define
>> > > > > > > > > > > > > > >> hooks
>> > > > > > > > > > > > > > >> > which send watermarks based on different
>> kinds
>> > > of
>> > > > > > > windows.
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >> > Please correct me if I misunderstand.
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >> > ~ Bhupesh
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas
>> Weise
>> > <
>> > > > > > > > > thw@apache.org
>> > > > > > > > > > >
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >> > > Bhupesh,
>> > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > >> > > Please see how that can be solved in a
>> > unified
>> > > > way
>> > > > > > > using
>> > > > > > > > > > > windows
>> > > > > > > > > > > > > and
>> > > > > > > > > > > > > > >> > > watermarks. It is bounded data vs.
>> unbounded
>> > > > data.
>> > > > > > In
>> > > > > > > > Beam
>> > > > > > > > > > for
>> > > > > > > > > > > > > > >> example,
>> > > > > > > > > > > > > > >> > you
>> > > > > > > > > > > > > > >> > > can use the "global window" and the final
>> > > > > watermark
>> > > > > > to
>> > > > > > > > > > > > accomplish
>> > > > > > > > > > > > > > what
>> > > > > > > > > > > > > > >> > you
>> > > > > > > > > > > > > > >> > > are looking for. Batch is just a special
>> > case
>> > > of
>> > > > > > > > streaming
>> > > > > > > > > > > where
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > >> > source
>> > > > > > > > > > > > > > >> > > emits the final watermark.
>> > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > >> > > Thanks,
>> > > > > > > > > > > > > > >> > > Thomas
>> > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh
>> > > Chawda
>> > > > <
>> > > > > > > > > > > > > > >> bhupesh@datatorrent.com
>> > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > >> > > wrote:
>> > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > >> > > > Yes, if the user needs to develop a
>> batch
>> > > > > > > application,
>> > > > > > > > > > then
>> > > > > > > > > > > > > batch
>> > > > > > > > > > > > > > >> aware
>> > > > > > > > > > > > > > >> > > > operators need to be used in the
>> > > application.
>> > > > > > > > > > > > > > >> > > > The nature of the application is mostly
>> > > > > controlled
>> > > > > > > by
>> > > > > > > > > the
>> > > > > > > > > > > > input
>> > > > > > > > > > > > > > and
>> > > > > > > > > > > > > > >> the
>> > > > > > > > > > > > > > >> > > > output operators used in the
>> application.
>> > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > >> > > > For example, consider an application
>> which
>> > > > needs
>> > > > > > to
>> > > > > > > > > filter
>> > > > > > > > > > > > > records
>> > > > > > > > > > > > > > >> in a
>> > > > > > > > > > > > > > >> > > > input file and store the filtered
>> records
>> > in
>> > > > > > another
>> > > > > > > > > file.
>> > > > > > > > > > > The
>> > > > > > > > > > > > > > >> nature
>> > > > > > > > > > > > > > >> > of
>> > > > > > > > > > > > > > >> > > > this app is to end once the entire
>> file is
>> > > > > > > processed.
>> > > > > > > > > > > > Following
>> > > > > > > > > > > > > > >> things
>> > > > > > > > > > > > > > >> > > are
>> > > > > > > > > > > > > > >> > > > expected of the application:
>> > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > >> > > >    1. Once the input data is over,
>> > finalize
>> > > > the
>> > > > > > > output
>> > > > > > > > > > file
>> > > > > > > > > > > > from
>> > > > > > > > > > > > > > >> .tmp
>> > > > > > > > > > > > > > >> > > >    files. - Responsibility of output
>> > > operator
>> > > > > > > > > > > > > > >> > > >    2. End the application, once the
>> data
>> > is
>> > > > read
>> > > > > > and
>> > > > > > > > > > > > processed -
>> > > > > > > > > > > > > > >> > > >    Responsibility of input operator
>> > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > >> > > > These functions are essential to allow
>> the
>> > > > user
>> > > > > to
>> > > > > > > do
>> > > > > > > > > > higher
>> > > > > > > > > > > > > level
>> > > > > > > > > > > > > > >> > > > operations like scheduling or running a
>> > > > workflow
>> > > > > > of
>> > > > > > > > > batch
>> > > > > > > > > > > > > > >> applications.
>> > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > >> > > > I am not sure about intermediate
>> > > (processing)
>> > > > > > > > operators,
>> > > > > > > > > > as
>> > > > > > > > > > > > > there
>> > > > > > > > > > > > > > >> is no
>> > > > > > > > > > > > > > >> > > > change in their functionality for batch
>> > use
>> > > > > cases.
>> > > > > > > > > > Perhaps,
>> > > > > > > > > > > > > > allowing
>> > > > > > > > > > > > > > >> > > > running multiple batches in a single
>> > > > application
>> > > > > > may
>> > > > > > > > > > require
>> > > > > > > > > > > > > > similar
>> > > > > > > > > > > > > > >> > > > changes in processing operators as
>> well.
>> > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > >> > > > ~ Bhupesh
>> > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM,
>> Priyanka
>> > > > > Gugale <
>> > > > > > > > > > > > > > priyag@apache.org
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >> > > > wrote:
>> > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > >> > > > > Will it make an impression on user
>> that,
>> > > if
>> > > > he
>> > > > > > > has a
>> > > > > > > > > > batch
>> > > > > > > > > > > > > > >> usecase he
>> > > > > > > > > > > > > > >> > > has
>> > > > > > > > > > > > > > >> > > > > to use batch aware operators only? If
>> > so,
>> > > is
>> > > > > > that
>> > > > > > > > what
>> > > > > > > > > > we
>> > > > > > > > > > > > > > expect?
>> > > > > > > > > > > > > > >> I
>> > > > > > > > > > > > > > >> > am
>> > > > > > > > > > > > > > >> > > > not
>> > > > > > > > > > > > > > >> > > > > aware of how do we implement batch
>> > > scenario
>> > > > so
>> > > > > > > this
>> > > > > > > > > > might
>> > > > > > > > > > > > be a
>> > > > > > > > > > > > > > >> basic
>> > > > > > > > > > > > > > >> > > > > question.
>> > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > >> > > > > -Priyanka
>> > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM,
>> > Bhupesh
>> > > > > > Chawda <
>> > > > > > > > > > > > > > >> > > > bhupesh@datatorrent.com>
>> > > > > > > > > > > > > > >> > > > > wrote:
>> > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > >> > > > > > Hi All,
>> > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > >> > > > > > While design / implementation for
>> > custom
>> > > > > > control
>> > > > > > > > > > tuples
>> > > > > > > > > > > is
>> > > > > > > > > > > > > > >> > ongoing, I
>> > > > > > > > > > > > > > >> > > > > > thought it would be a good idea to
>> > > > consider
>> > > > > > its
>> > > > > > > > > > > usefulness
>> > > > > > > > > > > > > in
>> > > > > > > > > > > > > > >> one
>> > > > > > > > > > > > > > >> > of
>> > > > > > > > > > > > > > >> > > > the
>> > > > > > > > > > > > > > >> > > > > > use cases -  batch applications.
>> > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > >> > > > > > This is a proposal to adapt /
>> extend
>> > > > > existing
>> > > > > > > > > > operators
>> > > > > > > > > > > in
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > >> > Apache
>> > > > > > > > > > > > > > >> > > > > Apex
>> > > > > > > > > > > > > > >> > > > > > Malhar library so that it is easy
>> to
>> > use
>> > > > > them
>> > > > > > in
>> > > > > > > > > batch
>> > > > > > > > > > > use
>> > > > > > > > > > > > > > >> cases.
>> > > > > > > > > > > > > > >> > > > > > Naturally, this would be applicable
>> > for
>> > > > > only a
>> > > > > > > > > subset
>> > > > > > > > > > of
>> > > > > > > > > > > > > > >> operators
>> > > > > > > > > > > > > > >> > > like
>> > > > > > > > > > > > > > >> > > > > > File, JDBC and NoSQL databases.
>> > > > > > > > > > > > > > >> > > > > > For example, for a file based
>> store,
>> > > (say
>> > > > > HDFS
>> > > > > > > > > store),
>> > > > > > > > > > > we
>> > > > > > > > > > > > > > could
>> > > > > > > > > > > > > > >> > have
>> > > > > > > > > > > > > > >> > > > > > FileBatchInput and FileBatchOutput
>> > > > operators
>> > > > > > > which
>> > > > > > > > > > allow
>> > > > > > > > > > > > > easy
>> > > > > > > > > > > > > > >> > > > integration
>> > > > > > > > > > > > > > >> > > > > > into a batch application. These
>> > > operators
>> > > > > > would
>> > > > > > > be
>> > > > > > > > > > > > extended
>> > > > > > > > > > > > > > from
>> > > > > > > > > > > > > > >> > > their
>> > > > > > > > > > > > > > >> > > > > > existing implementations and would
>> be
>> > > > "Batch
>> > > > > > > > Aware",
>> > > > > > > > > > in
>> > > > > > > > > > > > that
>> > > > > > > > > > > > > > >> they
>> > > > > > > > > > > > > > >> > may
>> > > > > > > > > > > > > > >> > > > > > understand the meaning of some
>> > specific
>> > > > > > control
>> > > > > > > > > tuples
>> > > > > > > > > > > > that
>> > > > > > > > > > > > > > flow
>> > > > > > > > > > > > > > >> > > > through
>> > > > > > > > > > > > > > >> > > > > > the DAG. Start batch and end batch
>> > seem
>> > > to
>> > > > > be
>> > > > > > > the
>> > > > > > > > > > > obvious
>> > > > > > > > > > > > > > >> > candidates
>> > > > > > > > > > > > > > >> > > > that
>> > > > > > > > > > > > > > >> > > > > > come to mind. On receipt of such
>> > control
>> > > > > > tuples,
>> > > > > > > > > they
>> > > > > > > > > > > may
>> > > > > > > > > > > > > try
>> > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > >> > > modify
>> > > > > > > > > > > > > > >> > > > > the
>> > > > > > > > > > > > > > >> > > > > > behavior of the operator - to
>> > > reinitialize
>> > > > > > some
>> > > > > > > > > > metrics
>> > > > > > > > > > > or
>> > > > > > > > > > > > > > >> finalize
>> > > > > > > > > > > > > > >> > > an
>> > > > > > > > > > > > > > >> > > > > > output file for example.
>> > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > >> > > > > > We can discuss the potential
>> control
>> > > > tuples
>> > > > > > and
>> > > > > > > > > > actions
>> > > > > > > > > > > in
>> > > > > > > > > > > > > > >> detail,
>> > > > > > > > > > > > > > >> > > but
>> > > > > > > > > > > > > > >> > > > > > first I would like to understand
>> the
>> > > views
>> > > > > of
>> > > > > > > the
>> > > > > > > > > > > > community
>> > > > > > > > > > > > > > for
>> > > > > > > > > > > > > > >> > this
>> > > > > > > > > > > > > > >> > > > > > proposal.
>> > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > >> > > > > > ~ Bhupesh
>> > > > > > > > > > > > > > >> > > > > >
>> > > > > > > > > > > > > > >> > > > >
>> > > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > > >>
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>