You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Bhupesh Chawda <bh...@datatorrent.com> on 2016/11/03 06:58:33 UTC

Re: Enhance batch support - batch demarcation

Hi All,

Starting with the implementation, we are planning to take care of a single
batch job first. We will take up the scheduling aspect later.

The first requirement is the following:


A batch job is an Apex application which picks up data from the source, and
processes it. Once the data is completely processed, it should detect the
end of batch and shutdown the application. This will help separate the
scheduling aspect from the Apex batch job.


We have the following options to shut down the application -


   - First option is to throw a shutdown() exception from the input operator


   - End of batch can be detected by the input operator depending on source
      specific details.
      - Once the end of batch is detected, the operator itself can shutdown
      the application by throwing a shutdown signal.
      - Problem with this approach is that even though the batch has ended,
      the downstream (output) operators might have finalization
pending which is
      usually done in calls like committed(). Waiting for the committed call in
      the input operator may also not help since this call may happen for the
      output operator in a different window.
      - Another issue might be with multiple input operators where each may
      send the shutdown signal independently.


   - Second, allow the engine to identify whether the application is a
   batch application and if so, poll for ```isBatchDone()``` on the input
   operator until it is true. Once it returns true, we can wait for the
   committed() call and end the application via the Input operator. We can
   have an interface BatchInput which would be implemented by the Input
   Operator of a batch application.

```
public interface BatchInput extends InputOperator
{
boolean isBatchDone();
}
```

The isBatchDone() method can be implemented by the developer which can
identify when a batch has ended. This could be called by the engine to
identify whether the batch has ended and shut down the application.


Allow the engine to identify whether the application is a batch application
and if so, poll for ```isBatchDone()``` on the input operator until it is
true. Once it returns true, we can wait for the committed() call and end
the application via the Input operator. We can have an interface BatchInput
which would be implemented by the Input Operator of a batch application.


   - Third, use a shared Stats Listener to identify the end of a batch
   using some metric in the input operator and remove operators via a dag
   change request. The dag change request can be enabled by APEXCORE-408 which
   is in progress.


Please suggest.

~ Bhupesh




On Fri, Sep 16, 2016 at 2:10 PM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> Hi All,
>
> Resuming the discussion.
>
> After some discussion, I have created a document which captures the
> requirements and a high level design for supporting batch applications. The
> document consolidates different threads of discussion and aspects which are
> relevant to batch support.
>
> This is in no way a design document, but just captures the high level
> steps. I have tried to keep it very brief and to the point. I will keep
> refining the document depending on the comments to ultimately convert it to
> a design document.
>
> Here is the link to the document: https://docs.google.com/document/d/
> 1qlyQJP80dOlWZeHwICMFA3D3jGG_T2NLhMfzScbuTwQ/edit?usp=sharing
>
> Please provide your valuable feedback.
>
> ~ Bhupesh
>
> On Tue, Feb 23, 2016 at 7:24 AM, David Yan <da...@datatorrent.com> wrote:
>
>> For batch applications without checkpointing or iteration loops, what
>> would
>> be the significance of streaming windows and application windows?
>>
>>
>> On Sun, Feb 14, 2016 at 10:33 PM, Thomas Weise <th...@datatorrent.com>
>> wrote:
>>
>> > Time to resume this discussion. I think it makes sense to look at the
>> batch
>> > as execution of a DAG from setup to teardown for all its operators, as
>> > suggested by Bhupesh and Sandeep. The DAG comes into existence when the
>> > batch begins and terminates when it is done.
>> >
>> > We have also seen from customers that there is demand for having the
>> > scheduler function built in, when there is no external component already
>> > present. For example, a file or set of files could be identified as
>> > "batch". As the application is idle, there is only a scheduler operator
>> > which polls for files. Once work is ready, that operator would launch
>> the
>> > DAG for processing (within same application, but not connected through
>> > stream). When processing is complete, that DAG terminates and returns
>> the
>> > resources.
>> >
>> > As discussed, there is the need to be able to turn off checkpointing,
>> which
>> > is different from setting a large checkpoint window. No checkpointing
>> means
>> > no incremental recovery and hence no need to keep data in buffers.
>> >
>> > There is also the need to relay begin/end signal through the entire DAG.
>> > This is different from setup/shutdown. It is more like begin/endWindow,
>> but
>> > there is only a single "window" in a batch.
>> >
>> >
>> > On Mon, Dec 28, 2015 at 10:36 PM, Chinmay Kolhatkar <
>> > chinmay@datatorrent.com
>> > > wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > A comment on following in your previous mails:
>> > >
>> > >
>> > >
>> > > *An operator that identifies the batch boundary tells theengine about
>> it
>> > > and corresponding control tuples are submitted through thestream,
>> leading
>> > > to callbacks on downstream operators*
>> > >
>> > > This would mean there will be a single boundary definition of a batch
>> in
>> > > the application DAG.
>> > > I think we should give freedom to individual operator to define what a
>> > > batch is and produce a callbacks accordingly.
>> > >
>> > > Considering that in mind, here is a quick sketch/suggestion of how it
>> can
>> > > be done:
>> > >
>> > > 1) The operator that needs to work on a batch can implement an
>> interface,
>> > > lets say BatchListener.
>> > >
>> > > 2) This will have 4 methods:
>> > > *    startBatch*
>> > > *    endBatch*
>> > > *    configureBatch*
>> > > *    callAtApplicationWindowBoundary *(maybe some better name??)
>> > >
>> > > 3) *configureBatch* will tell what tell what is the boundary of a
>> batch.
>> > > This will be called right after setup OR activate, basically before
>> > > beginning of the stream. The return value will be set with operator
>> > thread.
>> > >
>> > > 4) Based on configuration, the *startBatch* and *endBatch* will be
>> > called.
>> > >
>> > > 5) the *callAtApplicationWindowBoundary* should return *true/false*
>> based
>> > > on whether start/end batch calls should happen at application window
>> > > boundary OR not. Here is where user can choose to take care of
>> > > checkpointing of tuples within a windows by platform OR whether user
>> > wants
>> > > to do that of his own.
>> > >
>> > >
>> > > Thoughts?
>> > >
>> > >
>> > > -Chinmay.
>> > >
>> > >
>> > > ~ Chinmay.
>> > >
>> > > On Tue, Dec 29, 2015 at 11:35 AM, Thomas Weise <
>> thomas@datatorrent.com>
>> > > wrote:
>> > >
>> > > > On Mon, Dec 28, 2015 at 7:01 AM, Sandeep Deshmukh <
>> > > sandeep@datatorrent.com
>> > > > >
>> > > > wrote:
>> > > >
>> > > > > +1 for batch support in Apex. I would be interested to be part of
>> > this
>> > > > > work.
>> > > > >
>> > > > > I would like to start with basics and would like to know how one
>> will
>> > > > > define "batch" in Apex context. Which of the following cases
>> would be
>> > > > > supported under batch:
>> > > > >
>> > > > >    1. A program completes a task and auto shutdown itself once the
>> > task
>> > > > is
>> > > > >    complete. E.g.  the program needs to copy a set of files from
>> > source
>> > > > to
>> > > > >    destination.
>> > > > >    2. A program completes a task and then waits for pre-defined
>> time
>> > to
>> > > > >    poll for something more to work on. E.g. the program copies all
>> > the
>> > > > > files
>> > > > >    from source location and then periodically checks, say every 1
>> > hour,
>> > > > if
>> > > > >    there are new files at the source and copies them.
>> > > > >    3. A program completes a task and then polls every 1 hr as in
>> > case 2
>> > > > but
>> > > > >    releases resources during wait time.
>> > > > >
>> > > > >
>> > > >
>> > > > Yes, both, 1. and 2. are valid use cases. I would not make a further
>> > > > distinction between 2. and 3. at this point.
>> > > >
>> > > > Ability to run an application that expands and shrinks as self
>> > contained
>> > > > unit can be a benefit, as otherwise you need an external scheduler
>> just
>> > > to
>> > > > launch jobs (such as Oozie). The associated extra integration work
>> may
>> > be
>> > > > brittle and an unwanted barrier for certain use cases.
>> > > >
>> > > >
>> > > >
>> > > > > Needs for each of the above will vary. I am putting down some
>> basic
>> > > > > requirements for each of them
>> > > > >
>> > > > > 1. This case will need a mechanism to shutdown automatically on
>> > > > completion
>> > > > > of the task.
>> > > > >
>> > > > > StartProgram()
>> > > > >     StartBatch()
>> > > > >         Streaming Application starts, runs and finishes
>> > > > >     EndBatch()
>> > > > > EndProgram()
>> > > > >
>> > > > > 2. This will simply need a construct to wait for some time ( say
>> 10
>> > > > > minutes) or till some time ( till 1pm) .
>> > > > >
>> > > > > StartProgram()
>> > > > > while(true)
>> > > > > {
>> > > > >     StartBatch()
>> > > > >         Streaming Application starts, runs and finishes
>> > > > >     EndBatch()
>> > > > >     WaitTill(time) or WaitFor(timeperiod)
>> > > > > }
>> > > > > EndProgram()
>> > > > >
>> > > > > 3. Apart from wait construct, we also need release resources
>> support
>> > > > >
>> > > > > StartProgram()
>> > > > > while(true)
>> > > > > {
>> > > > >     RestartFromSavedState() // if any state is saved previously.
>> > > > >     StartBatch()
>> > > > >         Streaming Application starts, runs and finishes
>> > > > >     EndBatch()
>> > > > >     SaveState()
>> > > > >     RelaseResources()
>> > > > >     WaitTill(time) or WaitFor(timeperiod)
>> > > > > }
>> > > > > EndProgram()
>> > > > >
>> > > > >
>> > > > > All the constructs : waitTime(), RestartFromSavedState(),
>> SaveState()
>> > > > > , RelaseResources()
>> > > > > could be very well be part of StartBatch() or EndBatch(). I have
>> put
>> > > them
>> > > > > separately for clear understanding only.
>> > > > >
>> > > > > Another point to think on would be scheduler. A batch job is
>> > generally
>> > > > > triggered as a cron job. Do we still see Apex jobs being
>> triggered by
>> > > > cron
>> > > > > or would like to include a scheduler within Apex that will trigger
>> > jobs
>> > > > > based on time or on some external trigger or even polling for
>> events.
>> > > > >
>> > > > > Regards
>> > > > > Sandeep
>> > > > >
>> > > > > On Mon, Dec 28, 2015 at 5:11 PM, Bhupesh Chawda <
>> > > bhupesh@datatorrent.com
>> > > > >
>> > > > > wrote:
>> > > > >
>> > > > > > +1
>> > > > > >
>> > > > > > I think in the batch case, application windows may be
>> transparent
>> > to
>> > > > the
>> > > > > > user application / operator logic.  A batch can be thought of as
>> > one
>> > > > > > instantiation of a Apex Dag, from setup() to teardown() for all
>> > > > > operators.
>> > > > > > May be we need to define a higher level API which encapsulates a
>> > > > > streaming
>> > > > > > application.
>> > > > > > Something like:
>> > > > > >
>> > > > > > StartBatch()
>> > > > > >   Streaming Application starts, runs and finishes
>> > > > > > EndBatch()
>> > > > > >
>> > > > > > The streaming application will run transparently with all the
>> > > > windowing /
>> > > > > > checkpointing logic that it currently does. Checkpointing large
>> > > amounts
>> > > > > of
>> > > > > > data may be avoided by either checkpointing at large intervals
>> or
>> > > even
>> > > > > > disabling checkpointing for the batch job.
>> > > > > > Additionally, the external trigger (existence of some file etc.
>> )
>> > can
>> > > > be
>> > > > > > controlled by the StartBatch() and EndBatch() calls. In all the
>> > batch
>> > > > use
>> > > > > > cases, it is usually the case that once the input is processed
>> > > > > completely,
>> > > > > > the batch is done. Example: In map reduce all splits processed
>> > means
>> > > > > batch
>> > > > > > job is done. Similar primitives can be supported by Apex in
>> order
>> > to
>> > > > > > facilitate the control management in the StartBatch() and
>> > EndBatch()
>> > > > > > methods.
>> > > > > >
>> > > > > > -Bhupesh
>> > > > > >
>> > > > > > On Mon, Dec 28, 2015 at 1:34 PM, Thomas Weise <
>> > > thomas@datatorrent.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Following JIRA is open to enhance the support for batch:
>> > > > > > >
>> > > > > > > https://issues.apache.org/jira/browse/APEXCORE-235
>> > > > > > >
>> > > > > > > One of the challenges with batch on Apex today is that there
>> > isn't
>> > > > any
>> > > > > > > native support to identify begin/end of batch and associate
>> > actions
>> > > > to
>> > > > > > it.
>> > > > > > > For example, at the beginning we may want to fetch some data
>> > needed
>> > > > for
>> > > > > > all
>> > > > > > > subsequent processing and at the end perform some finalization
>> > > action
>> > > > > or
>> > > > > > > push to external system (add partition to Hive table or
>> similar).
>> > > > > > >
>> > > > > > > Absent native support, the workaround is to add a bunch of
>> ports
>> > > and
>> > > > > > extra
>> > > > > > > operators for propagation and synchronization purposes, which
>> > makes
>> > > > > > > building the batch application with standard operators or
>> > > development
>> > > > > of
>> > > > > > > custom operators rather difficult and inefficient.
>> > > > > > >
>> > > > > > > The span of a batch can also be seen as a user defined window,
>> > with
>> > > > > logic
>> > > > > > > for begin and end. The current "application window" support is
>> > > > limited
>> > > > > > to a
>> > > > > > > multiple of streaming window on a per operator basis. In the
>> > batch
>> > > > > case,
>> > > > > > > the boundary needs to be more flexible - user code needs to be
>> > able
>> > > > to
>> > > > > > > determine begin/endWindow based on external data (existence of
>> > > files
>> > > > > > etc.).
>> > > > > > >
>> > > > > > > There is another commonality with application window, and
>> that's
>> > > > > > alignment
>> > > > > > > of checkpointing. For batches where it is more efficient to
>> redo
>> > > the
>> > > > > > > processing instead of checkpointing potentially large amounts
>> of
>> > > > > > > intermediate state for incremental recovery, it would be nice
>> to
>> > be
>> > > > > able
>> > > > > > to
>> > > > > > > say "user window == checkpoint interval".
>> > > > > > >
>> > > > > > > This is to float the idea of having a window control that can
>> be
>> > > > > > influenced
>> > > > > > > by user code. An operator that identifies the batch boundary
>> > tells
>> > > > the
>> > > > > > > engine about it and corresponding control tuples are submitted
>> > > > through
>> > > > > > the
>> > > > > > > stream, leading to callbacks on downstream operators. These
>> > control
>> > > > > > > tuples should
>> > > > > > > be able to carry contextual information that can be used in
>> > > > downstream
>> > > > > > > operator logic (file names, schema information etc.)
>> > > > > > >
>> > > > > > > I don't expect the current beginWindow/endWindow can be
>> augmented
>> > > in
>> > > > a
>> > > > > > > backward compatible way to accommodate this, but a similar
>> > optional
>> > > > > > > interface could be supported to enable batch aware operators
>> and
>> > > > > > > checkpointing optimization.
>> > > > > > >
>> > > > > > > Thoughts?
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Re: Enhance batch support - batch demarcation

Posted by Thomas Weise <th...@apache.org>.
I don't think it is necessary to do special things for "batch" here.

Input can be bounded or unbounded. When input is bounded, then the input
operator should raise the shutdown exception, which will gracefully
terminate the downstream operators. Downstream operators may want to know
about the end of input also, IMO the correct way to implement this is
custom control tuple or watermark support.

Regarding shutdown, have you looked when the operators are actually
undeployed (see StreamingContainerManager.processEvents)?


On Thu, Nov 3, 2016 at 7:58 AM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> Hi All,
>
> Starting with the implementation, we are planning to take care of a single
> batch job first. We will take up the scheduling aspect later.
>
> The first requirement is the following:
>
>
> A batch job is an Apex application which picks up data from the source, and
> processes it. Once the data is completely processed, it should detect the
> end of batch and shutdown the application. This will help separate the
> scheduling aspect from the Apex batch job.
>
>
> We have the following options to shut down the application -
>
>
>    - First option is to throw a shutdown() exception from the input
> operator
>
>
>    - End of batch can be detected by the input operator depending on source
>       specific details.
>       - Once the end of batch is detected, the operator itself can shutdown
>       the application by throwing a shutdown signal.
>       - Problem with this approach is that even though the batch has ended,
>       the downstream (output) operators might have finalization
> pending which is
>       usually done in calls like committed(). Waiting for the committed
> call in
>       the input operator may also not help since this call may happen for
> the
>       output operator in a different window.
>       - Another issue might be with multiple input operators where each may
>       send the shutdown signal independently.
>
>
>    - Second, allow the engine to identify whether the application is a
>    batch application and if so, poll for ```isBatchDone()``` on the input
>    operator until it is true. Once it returns true, we can wait for the
>    committed() call and end the application via the Input operator. We can
>    have an interface BatchInput which would be implemented by the Input
>    Operator of a batch application.
>
> ```
> public interface BatchInput extends InputOperator
> {
> boolean isBatchDone();
> }
> ```
>
> The isBatchDone() method can be implemented by the developer which can
> identify when a batch has ended. This could be called by the engine to
> identify whether the batch has ended and shut down the application.
>
>
> Allow the engine to identify whether the application is a batch application
> and if so, poll for ```isBatchDone()``` on the input operator until it is
> true. Once it returns true, we can wait for the committed() call and end
> the application via the Input operator. We can have an interface BatchInput
> which would be implemented by the Input Operator of a batch application.
>
>
>    - Third, use a shared Stats Listener to identify the end of a batch
>    using some metric in the input operator and remove operators via a dag
>    change request. The dag change request can be enabled by APEXCORE-408
> which
>    is in progress.
>
>
> Please suggest.
>
> ~ Bhupesh
>
>
>
>
> On Fri, Sep 16, 2016 at 2:10 PM, Bhupesh Chawda <bh...@datatorrent.com>
> wrote:
>
> > Hi All,
> >
> > Resuming the discussion.
> >
> > After some discussion, I have created a document which captures the
> > requirements and a high level design for supporting batch applications.
> The
> > document consolidates different threads of discussion and aspects which
> are
> > relevant to batch support.
> >
> > This is in no way a design document, but just captures the high level
> > steps. I have tried to keep it very brief and to the point. I will keep
> > refining the document depending on the comments to ultimately convert it
> to
> > a design document.
> >
> > Here is the link to the document: https://docs.google.com/document/d/
> > 1qlyQJP80dOlWZeHwICMFA3D3jGG_T2NLhMfzScbuTwQ/edit?usp=sharing
> >
> > Please provide your valuable feedback.
> >
> > ~ Bhupesh
> >
> > On Tue, Feb 23, 2016 at 7:24 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> >> For batch applications without checkpointing or iteration loops, what
> >> would
> >> be the significance of streaming windows and application windows?
> >>
> >>
> >> On Sun, Feb 14, 2016 at 10:33 PM, Thomas Weise <th...@datatorrent.com>
> >> wrote:
> >>
> >> > Time to resume this discussion. I think it makes sense to look at the
> >> batch
> >> > as execution of a DAG from setup to teardown for all its operators, as
> >> > suggested by Bhupesh and Sandeep. The DAG comes into existence when
> the
> >> > batch begins and terminates when it is done.
> >> >
> >> > We have also seen from customers that there is demand for having the
> >> > scheduler function built in, when there is no external component
> already
> >> > present. For example, a file or set of files could be identified as
> >> > "batch". As the application is idle, there is only a scheduler
> operator
> >> > which polls for files. Once work is ready, that operator would launch
> >> the
> >> > DAG for processing (within same application, but not connected through
> >> > stream). When processing is complete, that DAG terminates and returns
> >> the
> >> > resources.
> >> >
> >> > As discussed, there is the need to be able to turn off checkpointing,
> >> which
> >> > is different from setting a large checkpoint window. No checkpointing
> >> means
> >> > no incremental recovery and hence no need to keep data in buffers.
> >> >
> >> > There is also the need to relay begin/end signal through the entire
> DAG.
> >> > This is different from setup/shutdown. It is more like
> begin/endWindow,
> >> but
> >> > there is only a single "window" in a batch.
> >> >
> >> >
> >> > On Mon, Dec 28, 2015 at 10:36 PM, Chinmay Kolhatkar <
> >> > chinmay@datatorrent.com
> >> > > wrote:
> >> >
> >> > > Hi Thomas,
> >> > >
> >> > > A comment on following in your previous mails:
> >> > >
> >> > >
> >> > >
> >> > > *An operator that identifies the batch boundary tells theengine
> about
> >> it
> >> > > and corresponding control tuples are submitted through thestream,
> >> leading
> >> > > to callbacks on downstream operators*
> >> > >
> >> > > This would mean there will be a single boundary definition of a
> batch
> >> in
> >> > > the application DAG.
> >> > > I think we should give freedom to individual operator to define
> what a
> >> > > batch is and produce a callbacks accordingly.
> >> > >
> >> > > Considering that in mind, here is a quick sketch/suggestion of how
> it
> >> can
> >> > > be done:
> >> > >
> >> > > 1) The operator that needs to work on a batch can implement an
> >> interface,
> >> > > lets say BatchListener.
> >> > >
> >> > > 2) This will have 4 methods:
> >> > > *    startBatch*
> >> > > *    endBatch*
> >> > > *    configureBatch*
> >> > > *    callAtApplicationWindowBoundary *(maybe some better name??)
> >> > >
> >> > > 3) *configureBatch* will tell what tell what is the boundary of a
> >> batch.
> >> > > This will be called right after setup OR activate, basically before
> >> > > beginning of the stream. The return value will be set with operator
> >> > thread.
> >> > >
> >> > > 4) Based on configuration, the *startBatch* and *endBatch* will be
> >> > called.
> >> > >
> >> > > 5) the *callAtApplicationWindowBoundary* should return *true/false*
> >> based
> >> > > on whether start/end batch calls should happen at application window
> >> > > boundary OR not. Here is where user can choose to take care of
> >> > > checkpointing of tuples within a windows by platform OR whether user
> >> > wants
> >> > > to do that of his own.
> >> > >
> >> > >
> >> > > Thoughts?
> >> > >
> >> > >
> >> > > -Chinmay.
> >> > >
> >> > >
> >> > > ~ Chinmay.
> >> > >
> >> > > On Tue, Dec 29, 2015 at 11:35 AM, Thomas Weise <
> >> thomas@datatorrent.com>
> >> > > wrote:
> >> > >
> >> > > > On Mon, Dec 28, 2015 at 7:01 AM, Sandeep Deshmukh <
> >> > > sandeep@datatorrent.com
> >> > > > >
> >> > > > wrote:
> >> > > >
> >> > > > > +1 for batch support in Apex. I would be interested to be part
> of
> >> > this
> >> > > > > work.
> >> > > > >
> >> > > > > I would like to start with basics and would like to know how one
> >> will
> >> > > > > define "batch" in Apex context. Which of the following cases
> >> would be
> >> > > > > supported under batch:
> >> > > > >
> >> > > > >    1. A program completes a task and auto shutdown itself once
> the
> >> > task
> >> > > > is
> >> > > > >    complete. E.g.  the program needs to copy a set of files from
> >> > source
> >> > > > to
> >> > > > >    destination.
> >> > > > >    2. A program completes a task and then waits for pre-defined
> >> time
> >> > to
> >> > > > >    poll for something more to work on. E.g. the program copies
> all
> >> > the
> >> > > > > files
> >> > > > >    from source location and then periodically checks, say every
> 1
> >> > hour,
> >> > > > if
> >> > > > >    there are new files at the source and copies them.
> >> > > > >    3. A program completes a task and then polls every 1 hr as in
> >> > case 2
> >> > > > but
> >> > > > >    releases resources during wait time.
> >> > > > >
> >> > > > >
> >> > > >
> >> > > > Yes, both, 1. and 2. are valid use cases. I would not make a
> further
> >> > > > distinction between 2. and 3. at this point.
> >> > > >
> >> > > > Ability to run an application that expands and shrinks as self
> >> > contained
> >> > > > unit can be a benefit, as otherwise you need an external scheduler
> >> just
> >> > > to
> >> > > > launch jobs (such as Oozie). The associated extra integration work
> >> may
> >> > be
> >> > > > brittle and an unwanted barrier for certain use cases.
> >> > > >
> >> > > >
> >> > > >
> >> > > > > Needs for each of the above will vary. I am putting down some
> >> basic
> >> > > > > requirements for each of them
> >> > > > >
> >> > > > > 1. This case will need a mechanism to shutdown automatically on
> >> > > > completion
> >> > > > > of the task.
> >> > > > >
> >> > > > > StartProgram()
> >> > > > >     StartBatch()
> >> > > > >         Streaming Application starts, runs and finishes
> >> > > > >     EndBatch()
> >> > > > > EndProgram()
> >> > > > >
> >> > > > > 2. This will simply need a construct to wait for some time ( say
> >> 10
> >> > > > > minutes) or till some time ( till 1pm) .
> >> > > > >
> >> > > > > StartProgram()
> >> > > > > while(true)
> >> > > > > {
> >> > > > >     StartBatch()
> >> > > > >         Streaming Application starts, runs and finishes
> >> > > > >     EndBatch()
> >> > > > >     WaitTill(time) or WaitFor(timeperiod)
> >> > > > > }
> >> > > > > EndProgram()
> >> > > > >
> >> > > > > 3. Apart from wait construct, we also need release resources
> >> support
> >> > > > >
> >> > > > > StartProgram()
> >> > > > > while(true)
> >> > > > > {
> >> > > > >     RestartFromSavedState() // if any state is saved previously.
> >> > > > >     StartBatch()
> >> > > > >         Streaming Application starts, runs and finishes
> >> > > > >     EndBatch()
> >> > > > >     SaveState()
> >> > > > >     RelaseResources()
> >> > > > >     WaitTill(time) or WaitFor(timeperiod)
> >> > > > > }
> >> > > > > EndProgram()
> >> > > > >
> >> > > > >
> >> > > > > All the constructs : waitTime(), RestartFromSavedState(),
> >> SaveState()
> >> > > > > , RelaseResources()
> >> > > > > could be very well be part of StartBatch() or EndBatch(). I have
> >> put
> >> > > them
> >> > > > > separately for clear understanding only.
> >> > > > >
> >> > > > > Another point to think on would be scheduler. A batch job is
> >> > generally
> >> > > > > triggered as a cron job. Do we still see Apex jobs being
> >> triggered by
> >> > > > cron
> >> > > > > or would like to include a scheduler within Apex that will
> trigger
> >> > jobs
> >> > > > > based on time or on some external trigger or even polling for
> >> events.
> >> > > > >
> >> > > > > Regards
> >> > > > > Sandeep
> >> > > > >
> >> > > > > On Mon, Dec 28, 2015 at 5:11 PM, Bhupesh Chawda <
> >> > > bhupesh@datatorrent.com
> >> > > > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > +1
> >> > > > > >
> >> > > > > > I think in the batch case, application windows may be
> >> transparent
> >> > to
> >> > > > the
> >> > > > > > user application / operator logic.  A batch can be thought of
> as
> >> > one
> >> > > > > > instantiation of a Apex Dag, from setup() to teardown() for
> all
> >> > > > > operators.
> >> > > > > > May be we need to define a higher level API which
> encapsulates a
> >> > > > > streaming
> >> > > > > > application.
> >> > > > > > Something like:
> >> > > > > >
> >> > > > > > StartBatch()
> >> > > > > >   Streaming Application starts, runs and finishes
> >> > > > > > EndBatch()
> >> > > > > >
> >> > > > > > The streaming application will run transparently with all the
> >> > > > windowing /
> >> > > > > > checkpointing logic that it currently does. Checkpointing
> large
> >> > > amounts
> >> > > > > of
> >> > > > > > data may be avoided by either checkpointing at large intervals
> >> or
> >> > > even
> >> > > > > > disabling checkpointing for the batch job.
> >> > > > > > Additionally, the external trigger (existence of some file
> etc.
> >> )
> >> > can
> >> > > > be
> >> > > > > > controlled by the StartBatch() and EndBatch() calls. In all
> the
> >> > batch
> >> > > > use
> >> > > > > > cases, it is usually the case that once the input is processed
> >> > > > > completely,
> >> > > > > > the batch is done. Example: In map reduce all splits processed
> >> > means
> >> > > > > batch
> >> > > > > > job is done. Similar primitives can be supported by Apex in
> >> order
> >> > to
> >> > > > > > facilitate the control management in the StartBatch() and
> >> > EndBatch()
> >> > > > > > methods.
> >> > > > > >
> >> > > > > > -Bhupesh
> >> > > > > >
> >> > > > > > On Mon, Dec 28, 2015 at 1:34 PM, Thomas Weise <
> >> > > thomas@datatorrent.com>
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Following JIRA is open to enhance the support for batch:
> >> > > > > > >
> >> > > > > > > https://issues.apache.org/jira/browse/APEXCORE-235
> >> > > > > > >
> >> > > > > > > One of the challenges with batch on Apex today is that there
> >> > isn't
> >> > > > any
> >> > > > > > > native support to identify begin/end of batch and associate
> >> > actions
> >> > > > to
> >> > > > > > it.
> >> > > > > > > For example, at the beginning we may want to fetch some data
> >> > needed
> >> > > > for
> >> > > > > > all
> >> > > > > > > subsequent processing and at the end perform some
> finalization
> >> > > action
> >> > > > > or
> >> > > > > > > push to external system (add partition to Hive table or
> >> similar).
> >> > > > > > >
> >> > > > > > > Absent native support, the workaround is to add a bunch of
> >> ports
> >> > > and
> >> > > > > > extra
> >> > > > > > > operators for propagation and synchronization purposes,
> which
> >> > makes
> >> > > > > > > building the batch application with standard operators or
> >> > > development
> >> > > > > of
> >> > > > > > > custom operators rather difficult and inefficient.
> >> > > > > > >
> >> > > > > > > The span of a batch can also be seen as a user defined
> window,
> >> > with
> >> > > > > logic
> >> > > > > > > for begin and end. The current "application window" support
> is
> >> > > > limited
> >> > > > > > to a
> >> > > > > > > multiple of streaming window on a per operator basis. In the
> >> > batch
> >> > > > > case,
> >> > > > > > > the boundary needs to be more flexible - user code needs to
> be
> >> > able
> >> > > > to
> >> > > > > > > determine begin/endWindow based on external data (existence
> of
> >> > > files
> >> > > > > > etc.).
> >> > > > > > >
> >> > > > > > > There is another commonality with application window, and
> >> that's
> >> > > > > > alignment
> >> > > > > > > of checkpointing. For batches where it is more efficient to
> >> redo
> >> > > the
> >> > > > > > > processing instead of checkpointing potentially large
> amounts
> >> of
> >> > > > > > > intermediate state for incremental recovery, it would be
> nice
> >> to
> >> > be
> >> > > > > able
> >> > > > > > to
> >> > > > > > > say "user window == checkpoint interval".
> >> > > > > > >
> >> > > > > > > This is to float the idea of having a window control that
> can
> >> be
> >> > > > > > influenced
> >> > > > > > > by user code. An operator that identifies the batch boundary
> >> > tells
> >> > > > the
> >> > > > > > > engine about it and corresponding control tuples are
> submitted
> >> > > > through
> >> > > > > > the
> >> > > > > > > stream, leading to callbacks on downstream operators. These
> >> > control
> >> > > > > > > tuples should
> >> > > > > > > be able to carry contextual information that can be used in
> >> > > > downstream
> >> > > > > > > operator logic (file names, schema information etc.)
> >> > > > > > >
> >> > > > > > > I don't expect the current beginWindow/endWindow can be
> >> augmented
> >> > > in
> >> > > > a
> >> > > > > > > backward compatible way to accommodate this, but a similar
> >> > optional
> >> > > > > > > interface could be supported to enable batch aware operators
> >> and
> >> > > > > > > checkpointing optimization.
> >> > > > > > >
> >> > > > > > > Thoughts?
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>