You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Tushar Gosavi <tu...@datatorrent.com> on 2017/03/06 14:05:15 UTC

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Hi All,

I have started working on this again. The top level design document is
located at

https://docs.google.com/document/d/1OugxutMYI-JwB9Z7hxjTWwiAKQsqbYPujDmgwlpPOXM/edit?usp=sharing

The WIP branch is
https://github.com/tushargosavi/apex-core/tree/dag_schedule.redesign

@Thomas can you please review the document and comment on design approach.

Regards,
-Tushar.


On Tue, Jan 24, 2017 at 4:47 PM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi All,
>
> I have updated the design document as per review comments on pull
> request (https://issues.apache.org/jira/secure/attachment/
> 12849094/dag.docx).
> Please provide your suggestion.
>
> - Tushar.
>
>
> On Fri, Sep 16, 2016 at 6:40 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
> > Hi All,
> >
> > I have opened a review pull request for dynamic dag modification
> > through stats listener (https://github.com/apache/apex-core/pull/393).
> > Please review and provide
> > comments/suggestions.
> >
> > It provides following functionality
> > - StatsListener can access the opearator name for easily detecting
> > which opearator stats are being processed.
> > - StatsListener can create a instance of object through which it can
> > submit dag modifications to the engine.
> > - StatsListener can return dag changes as a response to engine.
> > - PlanModifier is modified to take a DAG and apply it on the existing
> > running DAG and deploy the changes.
> >
> > The following functionality is not working yet.
> >
> > - The new opearator does not start from the correct windowId
> > (https://issues.apache.org/jira/browse/APEXCORE-532)
> > - Relanched application failed to start when it was killed after
> > dynamic dag modification.
> > - There is no support for resuming operator from previous state when
> > they were removed. This could be achived through
> >   readig state through external storage on setup.
> > - persist operator support is not present for newly added streams.
> >
> > The demo application using the feature is available at
> > https://github.com/tushargosavi/apex-dynamic-scheduling
> >
> > There are two variations of WordCount application. The first variation
> > detects the presence of
> > new files and start a disconnected DAG to process the data.
> > (https://github.com/tushargosavi/apex-dynamic-
> scheduling/blob/master/src/main/java/com/datatorrent/
> wordcount/WordCountApp.java)
> >
> > The second application
> > (https://github.com/tushargosavi/apex-dynamic-
> scheduling/blob/master/src/main/java/com/datatorrent/
> wordcount/ExtendApp.java),
> > starts with reader operator, and provides pendingFiles as auto-metric
> > to stat listener. On detecting pending files it attaches splitter
> > counter and output
> > operator to the read operator. Once files are processed the splitter,
> > counter and output operators are removed and
> > added back again if new data files are added into the directory.
> >
> > Regards,
> > -Tushar.
> >
> >
> > On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
> >> Hi All,
> >>
> >> I was able to prototype an simple word count application, which will
> >> start with just a single file reader operator. File reader operator
> >> will emit pendingFiles as metric to StatsListener. The statslistener
> >> will change DAG once enough files are available. The listener will
> return
> >> plan change to add word splitter, counter and console operator to the
> >> reader and complete the DAG for wordcount.
> >>
> >> After 120 windows of inactivity, the three operators will be removed
> >> from DAG again. When new set of files are added these operators are
> >> added back again.
> >>
> >> The high level proposal document:
> >> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-
> 2r8WHRjg83zEM8aqpPAXivFRQ/edit?usp=sharing
> >>
> >> The prototype code is at :
> >> https://github.com/tushargosavi/apex-dynamic-scheduling/
> >> The Application files are
> >> https://github.com/tushargosavi/apex-dynamic-
> scheduling/blob/master/src/main/java/com/datatorrent/wordcount/
> FileStatListenerSameDag.java
> >> https://github.com/tushargosavi/apex-dynamic-
> scheduling/blob/master/src/main/java/com/datatorrent/
> wordcount/ExtendApp.java
> >>
> >> Please provide your feedback.
> >>
> >> Some challenges yet to resolve are
> >> - Restoring operator state from previously removed operator.
> >> - Handling cuncurrent modifications to DAG from multiple StatsListener.
> >> - Making DAG changes persistent, user should be able to restart the
> >> application if application was killed with modified dag.
> >>
> >> Thanks,
> >> -Tushar.
> >>
> >> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
> >>> Hi All,
> >>>
> >>> I have dome some initial prototype which allows stat listener to
> >>> specify dag changes, and the dag changes are applied asynchronously.
> >>>
> >>> The changes involved are
> >>> - Add DagChangeSet object which is inherited from DAG, supporting
> >>> methods to remove
> >>>   operator and streams.
> >>>
> >>> - The stat listener will return this object in Response, and platform
> >>> will apply changes specified in response to the DAG.
> >>>
> >>>
> >>> The Apex changes
> >>> https://github.com/apache/apex-core/compare/master...
> tushargosavi:scheduler?expand=1
> >>>
> >>> The correspondign Demo application, which one operator monitors the
> >>> directory for files, and launch the wordcount DAG in
> >>> same application master when files are available.
> >>> https://github.com/tushargosavi/incubator-apex-malhar/tree/
> 178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/
> src/main/java/com/datatorrent/demos/wordcount/schedular
> >>>
> >>> Example of stat listerner which monitors a metric and instruct master
> >>> to start a dag.
> >>>
> >>>  /** look for more than 100 files in a directory, before lauching the
> DAG */
> >>> @Override
> >>>   public Response processStats(BatchedOperatorStats stats)
> >>>   {
> >>>     for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
> >>>       // pendingFiles is autometric.
> >>>       Integer value = (Integer)ws.metrics.get("pendingFiles");
> >>>       LOG.info("stats recevied for {} pendingFiles {}",
> >>> stats.getOperatorId(), value);
> >>>       if (value != null  && value > 100 && !dagStarted) {
> >>>         dagStarted = true;
> >>>         Response resp = new Response();
> >>>         resp.dag = getWordCountDag((String)ws.
> metrics.get("directory"));
> >>>         counter = 0;
> >>>         return resp;
> >>>       }
> >>>     }
> >>>     return null;
> >>>   }
> >>>
> >>>   DAGChangeSet getWordCountDag(String dir)
> >>>     {
> >>>       DAGChangeSet dag = new DAGChangeSet();
> >>>       LineByLineFileInputOperator reader = dag.addOperator("Reader",
> >>> new LineByLineFileInputOperator());
> >>>       List<StatsListener> listeners = new ArrayList<>();
> >>>       listeners.add(this);
> >>>       dag.getMeta(reader).getAttributes().put(Context.
> OperatorContext.STATS_LISTENERS,
> >>> listeners);
> >>>       reader.setDirectory(dir);
> >>>       LineSplitter splitter = dag.addOperator("SplitteR", new
> LineSplitter());
> >>>       UniqueCounter<String> counter = dag.addOperator("Counter", new
> >>> UniqueCounter<String>());
> >>>       ConsoleOutputOperator out = dag.addOperator("Output", new
> >>> ConsoleOutputOperator());
> >>>       dag.addStream("s1", reader.output, splitter.input);
> >>>       dag.addStream("s2", splitter.words, counter.data);
> >>>       dag.addStream("s3", counter.count, out.input);
> >>>       return dag;
> >>>     }
> >>>
> >>> Let me know if this type of API is acceptable for launching the DAG.
> >>> This is an API to specify DAG changes. The scheduler functionality
> >>> will use
> >>> this API.
> >>>
> >>>
> >>> Regards,
> >>> -Tushar.
> >>>
> >>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
> >>>> I like the idea of keeping heavy lifting and custom code out of the
> master,
> >>>> if possible. You find that split in responsibilities even in the case
> of
> >>>> partitioning (Kafka connector for example). The change that requires
> >>>> partitioning may be detected as byproduct of the regular processing
> in the
> >>>> container, the information relayed to the master, the action being
> taken
> >>>> there.
> >>>>
> >>>> We should separate all the different pieces and then decide where
> they run.
> >>>> There is detecting the need for a plan change, then effecting the
> change
> >>>> (which requires full DAG view and absolutely has to/should be in the
> >>>> master).
> >>>>
> >>>> Thomas
> >>>>
> >>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
> >>>> Chandni.Singh@capitalone.com> wrote:
> >>>>
> >>>>> We have couple of components that already run in  master -
> partitioners,
> >>>>> stats listeners,  metrics aggregators.  The problem of crashing the
> master
> >>>>> is not specific to just scheduler, isn't it?
> >>>>> ________________________________
> >>>>> From: Tushar Gosavi <tu...@datatorrent.com>
> >>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM
> >>>>> To: dev@apex.apache.org
> >>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
> >>>>> application
> >>>>>
> >>>>> I was thinking about avoiding running user code in master, As a crash
> >>>>> in master takes down all containers with it. hence was going for
> >>>>> scheduler as an operator, crash in scheduler won't kill the
> >>>>> application, master can restart the scheduler back and it can start
> >>>>> monitoring the job again and change the DAG when required. But this
> >>>>> will require communication between master and scheduler for
> monitoring
> >>>>> of operator status/stats.
> >>>>>
> >>>>> It is considerably easy to put scheduling functionality in master, as
> >>>>> we have access to operator stats and there is communication channel
> >>>>> already opened between master and operators. And custom scheduler can
> >>>>> be written as shared stat listener, with additional API available to
> >>>>> listener to add/remove/deploy/undeploy etc.. operators.
> >>>>>
> >>>>> Regards,
> >>>>> - Tushar.
> >>>>>
> >>>>>
> >>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <
> thomas@datatorrent.com>
> >>>>> wrote:
> >>>>> > Right, if it runs in the app master and does not rely on unmanaged
> >>>>> external
> >>>>> > processes, then these requirements can be met.
> >>>>> >
> >>>>> > This capability seeks to avoid users having to deal with external
> >>>>> > schedulers or workflows if all they want is to split a DAG that is
> >>>>> > logically one application into multiple stages for resource
> optimization.
> >>>>> > This is not very different from the need to have elasticity in
> terms of
> >>>>> > partitions depending to the availability of input, as you point
> out.
> >>>>> >
> >>>>> >
> >>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
> >>>>> > Chandni.Singh@capitalone.com> wrote:
> >>>>> >
> >>>>> >> Scheduling IMO belongs to App master. Operators can influence it,
> for
> >>>>> eg.
> >>>>> >> File splitter can indicate that no more file to process.
> >>>>> >>
> >>>>> >> I don’t understand how that can not integrate with all the
> aspects-
> >>>>> >> operability, fault tolerance and security.
> >>>>> >>
> >>>>> >> Chandni
> >>>>> >>
> >>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <
> chinmay@datatorrent.com>
> >>>>> wrote:
> >>>>> >>
> >>>>> >> >I think its a good idea to have a scheduling operator when you
> need to
> >>>>> >> >start a part of the DAG when some trigger happens (for eg.
> FileSplitter
> >>>>> >> >identifying new files in FS) and otherwise bring it down to save
> >>>>> >> >resources.
> >>>>> >> >
> >>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
> >>>>> >> >timothytiborfarkas@gmail.com> wrote:
> >>>>> >> >
> >>>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an
> API
> >>>>> >> >>completely
> >>>>> >> >> independent of a DAG or an operator. It could be used by a
> >>>>> commandline
> >>>>> >> >>tool
> >>>>> >> >> running on your laptop, a script, or it could happen to be
> used by an
> >>>>> >> >> Operator running in a DAG and a StatsListener.
> >>>>> >> >>
> >>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
> >>>>> thomas@datatorrent.com>
> >>>>> >> >> wrote:
> >>>>> >> >>
> >>>>> >> >> > Scheduling can be independent, although we have use cases
> where the
> >>>>> >> >> > scheduling depends on completion of processing (multi-staged
> batch
> >>>>> >> >>jobs
> >>>>> >> >> > where unused resources need to be freed).
> >>>>> >> >> >
> >>>>> >> >> > Both can be accomplished with a stats listener.
> >>>>> >> >> >
> >>>>> >> >> > There can be a "scheduling operator" that brings up and
> removes DAG
> >>>>> >> >> > fragments as needed.
> >>>>> >> >> >
> >>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
> >>>>> >> >><si...@gmail.com>
> >>>>> >> >> > wrote:
> >>>>> >> >> >
> >>>>> >> >> > > Hi,
> >>>>> >> >> > > IMO scheduling a job can be independent of any operator
> while
> >>>>> >> >> > > StatsListeners are not.  I understand that in a lot of
> cases
> >>>>> >> >> input/output
> >>>>> >> >> > > operators will decide when the job ends but there can be
> cases
> >>>>> when
> >>>>> >> >> > > scheduling can be independent of it.
> >>>>> >> >> > >
> >>>>> >> >> > > Thanks,
> >>>>> >> >> > > Chandni
> >>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <
> thomas@datatorrent.com
> >>>>> >
> >>>>> >> >> wrote:
> >>>>> >> >> > >
> >>>>> >> >> > > > This looks like something that coordination wise belongs
> into
> >>>>> the
> >>>>> >> >> > master
> >>>>> >> >> > > > and can be done with a shared stats listener.
> >>>>> >> >> > > >
> >>>>> >> >> > > > The operator request/response protocol could be used the
> relay
> >>>>> the
> >>>>> >> >> data
> >>>>> >> >> > > for
> >>>>> >> >> > > > the scheduling decisions.
> >>>>> >> >> > > >
> >>>>> >> >> > > > Thomas
> >>>>> >> >> > > >
> >>>>> >> >> > > >
> >>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> >>>>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
> >>>>> >> >> > > >
> >>>>> >> >> > > > > Hi Tushar,
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > I have some questions about the use case 2: Batch
> Support
> >>>>> >> >> > > > > I don¹t understand the advantages of providing batch
> support
> >>>>> by
> >>>>> >> >> > having
> >>>>> >> >> > > an
> >>>>> >> >> > > > > operator as a scheduler.
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > An approach that seemed a little more straightforward
> to me
> >>>>> was
> >>>>> >> >>to
> >>>>> >> >> > > expose
> >>>>> >> >> > > > > an API for scheduler. If there is a scheduler set then
> the
> >>>>> >> >>master
> >>>>> >> >> > uses
> >>>>> >> >> > > > and
> >>>>> >> >> > > > > schedules operators. By default there isn¹t any
> scheduler and
> >>>>> >> >>the
> >>>>> >> >> job
> >>>>> >> >> > > is
> >>>>> >> >> > > > > run as it is now.
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > Maybe this is too simplistic but can you please let me
> know
> >>>>> why
> >>>>> >> >> > having
> >>>>> >> >> > > an
> >>>>> >> >> > > > > operator as a scheduler is a better way?
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > Thanks,
> >>>>> >> >> > > > > Chandni
> >>>>> >> >> > > > >
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
> >>>>> tushar@datatorrent.com>
> >>>>> >> >> > wrote:
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > >Hi All,
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >We have seen few use cases in field which require Apex
> >>>>> >> >>application
> >>>>> >> >> > > > > >scheduling based on some condition. This has also
> came up as
> >>>>> >> >>part
> >>>>> >> >> of
> >>>>> >> >> > > > > >Batch Support in Apex previously
> >>>>> >> >> > > > > >(
> >>>>> >> >> > > > >
> >>>>> >> >> > > >
> >>>>> >> >> > >
> >>>>> >> >> >
> >>>>> >> >>
> >>>>> >> >>
> >>>>> >>
> >>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.
> mbox/%3CCAKJfLDP
> >>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
> >>>>> >> >> 40mail.gmail.com
> >>>>> >> >> > > %3E)
> >>>>> >> >> > > > > >. I am proposing following functionality in Apex to
> help
> >>>>> >> >> scheduling
> >>>>> >> >> > > > > >and better resource utilization for batch jobs. Please
> >>>>> provide
> >>>>> >> >> your
> >>>>> >> >> > > > > >comments.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Each operator in DAG consumes yarn resources,
> sometimes it
> >>>>> is
> >>>>> >> >> > > > > >desirable to return the resources to yarn when no
> data is
> >>>>> >> >> available
> >>>>> >> >> > > > > >for processing, and deploy whole DAG once data starts
> to
> >>>>> >> >>appear.
> >>>>> >> >> For
> >>>>> >> >> > > > > >this to happen automatically, we will need some data
> >>>>> monitoring
> >>>>> >> >> > > > > >operators running in the DAG to trigger restart and
> >>>>> shutdown of
> >>>>> >> >> the
> >>>>> >> >> > > > > >operators in the DAG.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Apex already have such api to dynamically change the
> running
> >>>>> >> >>dag
> >>>>> >> >> > > > > >through cli. We could provide similar API available to
> >>>>> >> >>operators
> >>>>> >> >> > which
> >>>>> >> >> > > > > >will trigger dag modification at runtime. This
> information
> >>>>> can
> >>>>> >> >>be
> >>>>> >> >> > > > > >passed to master using heartbeat RPC and master will
> make
> >>>>> >> >> > > > > >required changed to the DAG. let me know what do you
> think
> >>>>> >> >>about
> >>>>> >> >> > it..
> >>>>> >> >> > > > > >something like below.
> >>>>> >> >> > > > > >Context.beginDagChange();
> >>>>> >> >> > > > > >context.addOperator("o1") <== launch operator from
> previous
> >>>>> >> >> > > > check-pointed
> >>>>> >> >> > > > > >state.
> >>>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create
> new
> >>>>> >> >>operator
> >>>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
> >>>>> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream
> >>>>> >> >>operators
> >>>>> >> >> > from
> >>>>> >> >> > > > the
> >>>>> >> >> > > > > >DAG.
> >>>>> >> >> > > > > >context.apply();  <== dag changes will be send to
> master,
> >>>>> and
> >>>>> >> >> master
> >>>>> >> >> > > > > >will apply these changes.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Similarly API for other functionalities such as
> locality
> >>>>> >> >>settings
> >>>>> >> >> > > > > >needs to be provided.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Provide an API callable from operator to launch a
> DAG. The
> >>>>> >> >> operator
> >>>>> >> >> > > > > >will prepare an dag object and submit it to the yarn,
> the
> >>>>> DAG
> >>>>> >> >>will
> >>>>> >> >> > be
> >>>>> >> >> > > > > >scheduled as a new application. This way complex
> schedulers
> >>>>> >> >>can be
> >>>>> >> >> > > > > >written as operators.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >public SchedulerOperator implements Operator {
> >>>>> >> >> > > > > >   void handleIdleTime() {
> >>>>> >> >> > > > > >      // check of conditions to start a job (for
> example
> >>>>> enough
> >>>>> >> >> > files
> >>>>> >> >> > > > > >available, enough items are available in kafa, or
> time has
> >>>>> >> >>reached
> >>>>> >> >> > > > > >     Dag dag = context.createDAG();
> >>>>> >> >> > > > > >     dag.addOperator();
> >>>>> >> >> > > > > >     dag.addOperator();
> >>>>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
> >>>>> >> >> > > > > >     lOptions.oldId = ""; // start for this
> checkpoint.
> >>>>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag,
> lOptions);
> >>>>> >> >> > > > > >   }
> >>>>> >> >> > > > > >}
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >DagHandler will have methods to monitor the final
> state of
> >>>>> >> >> > > > > >application, or to kill the DAG
> >>>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
> >>>>> terminates
> >>>>> >> >> > > > > >dagHandler.status()  <== get the status of
> application.
> >>>>> >> >> > > > > >dagHandler.kill() <== kill the running application.
> >>>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >The more complex Scheduler operators could be written
> to
> >>>>> manage
> >>>>> >> >> the
> >>>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Regards,
> >>>>> >> >> > > > > >-Tushar.
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > ______________________________
> __________________________
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > The information contained in this e-mail is
> confidential
> >>>>> and/or
> >>>>> >> >> > > > > proprietary to Capital One and/or its affiliates and
> may
> >>>>> only be
> >>>>> >> >> used
> >>>>> >> >> > > > > solely in performance of work or services for Capital
> One.
> >>>>> The
> >>>>> >> >> > > > information
> >>>>> >> >> > > > > transmitted herewith is intended only for use by the
> >>>>> individual
> >>>>> >> >>or
> >>>>> >> >> > > entity
> >>>>> >> >> > > > > to which it is addressed. If the reader of this
> message is
> >>>>> not
> >>>>> >> >>the
> >>>>> >> >> > > > intended
> >>>>> >> >> > > > > recipient, you are hereby notified that any review,
> >>>>> >> >>retransmission,
> >>>>> >> >> > > > > dissemination, distribution, copying or other use of,
> or
> >>>>> taking
> >>>>> >> >>of
> >>>>> >> >> > any
> >>>>> >> >> > > > > action in reliance upon this information is strictly
> >>>>> >> >>prohibited. If
> >>>>> >> >> > you
> >>>>> >> >> > > > > have received this communication in error, please
> contact the
> >>>>> >> >> sender
> >>>>> >> >> > > and
> >>>>> >> >> > > > > delete the material from your computer.
> >>>>> >> >> > > > >
> >>>>> >> >> > > > >
> >>>>> >> >> > > >
> >>>>> >> >> > >
> >>>>> >> >> >
> >>>>> >> >>
> >>>>> >>
> >>>>> >> ________________________________________________________
> >>>>> >>
> >>>>> >> The information contained in this e-mail is confidential and/or
> >>>>> >> proprietary to Capital One and/or its affiliates and may only be
> used
> >>>>> >> solely in performance of work or services for Capital One. The
> >>>>> information
> >>>>> >> transmitted herewith is intended only for use by the individual or
> >>>>> entity
> >>>>> >> to which it is addressed. If the reader of this message is not the
> >>>>> intended
> >>>>> >> recipient, you are hereby notified that any review,
> retransmission,
> >>>>> >> dissemination, distribution, copying or other use of, or taking
> of any
> >>>>> >> action in reliance upon this information is strictly prohibited.
> If you
> >>>>> >> have received this communication in error, please contact the
> sender and
> >>>>> >> delete the material from your computer.
> >>>>> >>
> >>>>> ________________________________________________________
> >>>>>
> >>>>> The information contained in this e-mail is confidential and/or
> >>>>> proprietary to Capital One and/or its affiliates and may only be used
> >>>>> solely in performance of work or services for Capital One. The
> information
> >>>>> transmitted herewith is intended only for use by the individual or
> entity
> >>>>> to which it is addressed. If the reader of this message is not the
> intended
> >>>>> recipient, you are hereby notified that any review, retransmission,
> >>>>> dissemination, distribution, copying or other use of, or taking of
> any
> >>>>> action in reliance upon this information is strictly prohibited. If
> you
> >>>>> have received this communication in error, please contact the sender
> and
> >>>>> delete the material from your computer.
> >>>>>
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Tushar Gosavi <tu...@datatorrent.com>.
@Pramod and @Thomas​, I have updated the document with latest progress and
approach. The simple application which runs one dag after another is
located at.

https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/lindag/DagSchedulingApp.java

The apex-core chagnes are
https://github.com/apache/apex-core/compare/master...tushargosavi:APEXCORE-408
.
This also contains changes from pull request
https://github.com/apache/apex-core/pull/476. The code needs lot of cleanup
but it is just an early attempt to implement the functionality. Can you go
over and see if overall approach is fine or not. I will start working on it
accordingly.

Thanks,
- Tushar.


On Mon, Mar 6, 2017 at 7:35 PM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi All,
>
> I have started working on this again. The top level design document is
> located at
>
> https://docs.google.com/document/d/1OugxutMYI-
> JwB9Z7hxjTWwiAKQsqbYPujDmgwlpPOXM/edit?usp=sharing
>
> The WIP branch is
> https://github.com/tushargosavi/apex-core/tree/dag_schedule.redesign
>
> @Thomas can you please review the document and comment on design approach.
>
> Regards,
> -Tushar.
>
>
> On Tue, Jan 24, 2017 at 4:47 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
>> Hi All,
>>
>> I have updated the design document as per review comments on pull
>> request (https://issues.apache.org/jira/secure/attachment/12849094/
>> dag.docx).
>> Please provide your suggestion.
>>
>> - Tushar.
>>
>>
>> On Fri, Sep 16, 2016 at 6:40 PM, Tushar Gosavi <tu...@datatorrent.com>
>> wrote:
>> > Hi All,
>> >
>> > I have opened a review pull request for dynamic dag modification
>> > through stats listener (https://github.com/apache/apex-core/pull/393).
>> > Please review and provide
>> > comments/suggestions.
>> >
>> > It provides following functionality
>> > - StatsListener can access the opearator name for easily detecting
>> > which opearator stats are being processed.
>> > - StatsListener can create a instance of object through which it can
>> > submit dag modifications to the engine.
>> > - StatsListener can return dag changes as a response to engine.
>> > - PlanModifier is modified to take a DAG and apply it on the existing
>> > running DAG and deploy the changes.
>> >
>> > The following functionality is not working yet.
>> >
>> > - The new opearator does not start from the correct windowId
>> > (https://issues.apache.org/jira/browse/APEXCORE-532)
>> > - Relanched application failed to start when it was killed after
>> > dynamic dag modification.
>> > - There is no support for resuming operator from previous state when
>> > they were removed. This could be achived through
>> >   readig state through external storage on setup.
>> > - persist operator support is not present for newly added streams.
>> >
>> > The demo application using the feature is available at
>> > https://github.com/tushargosavi/apex-dynamic-scheduling
>> >
>> > There are two variations of WordCount application. The first variation
>> > detects the presence of
>> > new files and start a disconnected DAG to process the data.
>> > (https://github.com/tushargosavi/apex-dynamic-scheduling/
>> blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java)
>> >
>> > The second application
>> > (https://github.com/tushargosavi/apex-dynamic-scheduling/
>> blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java),
>> > starts with reader operator, and provides pendingFiles as auto-metric
>> > to stat listener. On detecting pending files it attaches splitter
>> > counter and output
>> > operator to the read operator. Once files are processed the splitter,
>> > counter and output operators are removed and
>> > added back again if new data files are added into the directory.
>> >
>> > Regards,
>> > -Tushar.
>> >
>> >
>> > On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <tu...@datatorrent.com>
>> wrote:
>> >> Hi All,
>> >>
>> >> I was able to prototype an simple word count application, which will
>> >> start with just a single file reader operator. File reader operator
>> >> will emit pendingFiles as metric to StatsListener. The statslistener
>> >> will change DAG once enough files are available. The listener will
>> return
>> >> plan change to add word splitter, counter and console operator to the
>> >> reader and complete the DAG for wordcount.
>> >>
>> >> After 120 windows of inactivity, the three operators will be removed
>> >> from DAG again. When new set of files are added these operators are
>> >> added back again.
>> >>
>> >> The high level proposal document:
>> >> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-2r8WHR
>> jg83zEM8aqpPAXivFRQ/edit?usp=sharing
>> >>
>> >> The prototype code is at :
>> >> https://github.com/tushargosavi/apex-dynamic-scheduling/
>> >> The Application files are
>> >> https://github.com/tushargosavi/apex-dynamic-scheduling/
>> blob/master/src/main/java/com/datatorrent/wordcount/FileStat
>> ListenerSameDag.java
>> >> https://github.com/tushargosavi/apex-dynamic-scheduling/
>> blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java
>> >>
>> >> Please provide your feedback.
>> >>
>> >> Some challenges yet to resolve are
>> >> - Restoring operator state from previously removed operator.
>> >> - Handling cuncurrent modifications to DAG from multiple StatsListener.
>> >> - Making DAG changes persistent, user should be able to restart the
>> >> application if application was killed with modified dag.
>> >>
>> >> Thanks,
>> >> -Tushar.
>> >>
>> >> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tu...@datatorrent.com>
>> wrote:
>> >>> Hi All,
>> >>>
>> >>> I have dome some initial prototype which allows stat listener to
>> >>> specify dag changes, and the dag changes are applied asynchronously.
>> >>>
>> >>> The changes involved are
>> >>> - Add DagChangeSet object which is inherited from DAG, supporting
>> >>> methods to remove
>> >>>   operator and streams.
>> >>>
>> >>> - The stat listener will return this object in Response, and platform
>> >>> will apply changes specified in response to the DAG.
>> >>>
>> >>>
>> >>> The Apex changes
>> >>> https://github.com/apache/apex-core/compare/master...tusharg
>> osavi:scheduler?expand=1
>> >>>
>> >>> The correspondign Demo application, which one operator monitors the
>> >>> directory for files, and launch the wordcount DAG in
>> >>> same application master when files are available.
>> >>> https://github.com/tushargosavi/incubator-apex-malhar/tree/1
>> 78ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/
>> main/java/com/datatorrent/demos/wordcount/schedular
>> >>>
>> >>> Example of stat listerner which monitors a metric and instruct master
>> >>> to start a dag.
>> >>>
>> >>>  /** look for more than 100 files in a directory, before lauching the
>> DAG */
>> >>> @Override
>> >>>   public Response processStats(BatchedOperatorStats stats)
>> >>>   {
>> >>>     for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
>> >>>       // pendingFiles is autometric.
>> >>>       Integer value = (Integer)ws.metrics.get("pendingFiles");
>> >>>       LOG.info("stats recevied for {} pendingFiles {}",
>> >>> stats.getOperatorId(), value);
>> >>>       if (value != null  && value > 100 && !dagStarted) {
>> >>>         dagStarted = true;
>> >>>         Response resp = new Response();
>> >>>         resp.dag = getWordCountDag((String)ws.met
>> rics.get("directory"));
>> >>>         counter = 0;
>> >>>         return resp;
>> >>>       }
>> >>>     }
>> >>>     return null;
>> >>>   }
>> >>>
>> >>>   DAGChangeSet getWordCountDag(String dir)
>> >>>     {
>> >>>       DAGChangeSet dag = new DAGChangeSet();
>> >>>       LineByLineFileInputOperator reader = dag.addOperator("Reader",
>> >>> new LineByLineFileInputOperator());
>> >>>       List<StatsListener> listeners = new ArrayList<>();
>> >>>       listeners.add(this);
>> >>>       dag.getMeta(reader).getAttributes().put(Context.OperatorCon
>> text.STATS_LISTENERS,
>> >>> listeners);
>> >>>       reader.setDirectory(dir);
>> >>>       LineSplitter splitter = dag.addOperator("SplitteR", new
>> LineSplitter());
>> >>>       UniqueCounter<String> counter = dag.addOperator("Counter", new
>> >>> UniqueCounter<String>());
>> >>>       ConsoleOutputOperator out = dag.addOperator("Output", new
>> >>> ConsoleOutputOperator());
>> >>>       dag.addStream("s1", reader.output, splitter.input);
>> >>>       dag.addStream("s2", splitter.words, counter.data);
>> >>>       dag.addStream("s3", counter.count, out.input);
>> >>>       return dag;
>> >>>     }
>> >>>
>> >>> Let me know if this type of API is acceptable for launching the DAG.
>> >>> This is an API to specify DAG changes. The scheduler functionality
>> >>> will use
>> >>> this API.
>> >>>
>> >>>
>> >>> Regards,
>> >>> -Tushar.
>> >>>
>> >>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <th...@datatorrent.com>
>> wrote:
>> >>>> I like the idea of keeping heavy lifting and custom code out of the
>> master,
>> >>>> if possible. You find that split in responsibilities even in the
>> case of
>> >>>> partitioning (Kafka connector for example). The change that requires
>> >>>> partitioning may be detected as byproduct of the regular processing
>> in the
>> >>>> container, the information relayed to the master, the action being
>> taken
>> >>>> there.
>> >>>>
>> >>>> We should separate all the different pieces and then decide where
>> they run.
>> >>>> There is detecting the need for a plan change, then effecting the
>> change
>> >>>> (which requires full DAG view and absolutely has to/should be in the
>> >>>> master).
>> >>>>
>> >>>> Thomas
>> >>>>
>> >>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
>> >>>> Chandni.Singh@capitalone.com> wrote:
>> >>>>
>> >>>>> We have couple of components that already run in  master -
>> partitioners,
>> >>>>> stats listeners,  metrics aggregators.  The problem of crashing the
>> master
>> >>>>> is not specific to just scheduler, isn't it?
>> >>>>> ________________________________
>> >>>>> From: Tushar Gosavi <tu...@datatorrent.com>
>> >>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM
>> >>>>> To: dev@apex.apache.org
>> >>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
>> >>>>> application
>> >>>>>
>> >>>>> I was thinking about avoiding running user code in master, As a
>> crash
>> >>>>> in master takes down all containers with it. hence was going for
>> >>>>> scheduler as an operator, crash in scheduler won't kill the
>> >>>>> application, master can restart the scheduler back and it can start
>> >>>>> monitoring the job again and change the DAG when required. But this
>> >>>>> will require communication between master and scheduler for
>> monitoring
>> >>>>> of operator status/stats.
>> >>>>>
>> >>>>> It is considerably easy to put scheduling functionality in master,
>> as
>> >>>>> we have access to operator stats and there is communication channel
>> >>>>> already opened between master and operators. And custom scheduler
>> can
>> >>>>> be written as shared stat listener, with additional API available to
>> >>>>> listener to add/remove/deploy/undeploy etc.. operators.
>> >>>>>
>> >>>>> Regards,
>> >>>>> - Tushar.
>> >>>>>
>> >>>>>
>> >>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <
>> thomas@datatorrent.com>
>> >>>>> wrote:
>> >>>>> > Right, if it runs in the app master and does not rely on unmanaged
>> >>>>> external
>> >>>>> > processes, then these requirements can be met.
>> >>>>> >
>> >>>>> > This capability seeks to avoid users having to deal with external
>> >>>>> > schedulers or workflows if all they want is to split a DAG that is
>> >>>>> > logically one application into multiple stages for resource
>> optimization.
>> >>>>> > This is not very different from the need to have elasticity in
>> terms of
>> >>>>> > partitions depending to the availability of input, as you point
>> out.
>> >>>>> >
>> >>>>> >
>> >>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
>> >>>>> > Chandni.Singh@capitalone.com> wrote:
>> >>>>> >
>> >>>>> >> Scheduling IMO belongs to App master. Operators can influence
>> it, for
>> >>>>> eg.
>> >>>>> >> File splitter can indicate that no more file to process.
>> >>>>> >>
>> >>>>> >> I don’t understand how that can not integrate with all the
>> aspects-
>> >>>>> >> operability, fault tolerance and security.
>> >>>>> >>
>> >>>>> >> Chandni
>> >>>>> >>
>> >>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <
>> chinmay@datatorrent.com>
>> >>>>> wrote:
>> >>>>> >>
>> >>>>> >> >I think its a good idea to have a scheduling operator when you
>> need to
>> >>>>> >> >start a part of the DAG when some trigger happens (for eg.
>> FileSplitter
>> >>>>> >> >identifying new files in FS) and otherwise bring it down to save
>> >>>>> >> >resources.
>> >>>>> >> >
>> >>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>> >>>>> >> >timothytiborfarkas@gmail.com> wrote:
>> >>>>> >> >
>> >>>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an
>> API
>> >>>>> >> >>completely
>> >>>>> >> >> independent of a DAG or an operator. It could be used by a
>> >>>>> commandline
>> >>>>> >> >>tool
>> >>>>> >> >> running on your laptop, a script, or it could happen to be
>> used by an
>> >>>>> >> >> Operator running in a DAG and a StatsListener.
>> >>>>> >> >>
>> >>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
>> >>>>> thomas@datatorrent.com>
>> >>>>> >> >> wrote:
>> >>>>> >> >>
>> >>>>> >> >> > Scheduling can be independent, although we have use cases
>> where the
>> >>>>> >> >> > scheduling depends on completion of processing
>> (multi-staged batch
>> >>>>> >> >>jobs
>> >>>>> >> >> > where unused resources need to be freed).
>> >>>>> >> >> >
>> >>>>> >> >> > Both can be accomplished with a stats listener.
>> >>>>> >> >> >
>> >>>>> >> >> > There can be a "scheduling operator" that brings up and
>> removes DAG
>> >>>>> >> >> > fragments as needed.
>> >>>>> >> >> >
>> >>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>> >>>>> >> >><si...@gmail.com>
>> >>>>> >> >> > wrote:
>> >>>>> >> >> >
>> >>>>> >> >> > > Hi,
>> >>>>> >> >> > > IMO scheduling a job can be independent of any operator
>> while
>> >>>>> >> >> > > StatsListeners are not.  I understand that in a lot of
>> cases
>> >>>>> >> >> input/output
>> >>>>> >> >> > > operators will decide when the job ends but there can be
>> cases
>> >>>>> when
>> >>>>> >> >> > > scheduling can be independent of it.
>> >>>>> >> >> > >
>> >>>>> >> >> > > Thanks,
>> >>>>> >> >> > > Chandni
>> >>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <
>> thomas@datatorrent.com
>> >>>>> >
>> >>>>> >> >> wrote:
>> >>>>> >> >> > >
>> >>>>> >> >> > > > This looks like something that coordination wise
>> belongs into
>> >>>>> the
>> >>>>> >> >> > master
>> >>>>> >> >> > > > and can be done with a shared stats listener.
>> >>>>> >> >> > > >
>> >>>>> >> >> > > > The operator request/response protocol could be used
>> the relay
>> >>>>> the
>> >>>>> >> >> data
>> >>>>> >> >> > > for
>> >>>>> >> >> > > > the scheduling decisions.
>> >>>>> >> >> > > >
>> >>>>> >> >> > > > Thomas
>> >>>>> >> >> > > >
>> >>>>> >> >> > > >
>> >>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>> >>>>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
>> >>>>> >> >> > > >
>> >>>>> >> >> > > > > Hi Tushar,
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > I have some questions about the use case 2: Batch
>> Support
>> >>>>> >> >> > > > > I don¹t understand the advantages of providing batch
>> support
>> >>>>> by
>> >>>>> >> >> > having
>> >>>>> >> >> > > an
>> >>>>> >> >> > > > > operator as a scheduler.
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > An approach that seemed a little more straightforward
>> to me
>> >>>>> was
>> >>>>> >> >>to
>> >>>>> >> >> > > expose
>> >>>>> >> >> > > > > an API for scheduler. If there is a scheduler set
>> then the
>> >>>>> >> >>master
>> >>>>> >> >> > uses
>> >>>>> >> >> > > > and
>> >>>>> >> >> > > > > schedules operators. By default there isn¹t any
>> scheduler and
>> >>>>> >> >>the
>> >>>>> >> >> job
>> >>>>> >> >> > > is
>> >>>>> >> >> > > > > run as it is now.
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > Maybe this is too simplistic but can you please let
>> me know
>> >>>>> why
>> >>>>> >> >> > having
>> >>>>> >> >> > > an
>> >>>>> >> >> > > > > operator as a scheduler is a better way?
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > Thanks,
>> >>>>> >> >> > > > > Chandni
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
>> >>>>> tushar@datatorrent.com>
>> >>>>> >> >> > wrote:
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > >Hi All,
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >We have seen few use cases in field which require
>> Apex
>> >>>>> >> >>application
>> >>>>> >> >> > > > > >scheduling based on some condition. This has also
>> came up as
>> >>>>> >> >>part
>> >>>>> >> >> of
>> >>>>> >> >> > > > > >Batch Support in Apex previously
>> >>>>> >> >> > > > > >(
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > >
>> >>>>> >> >> > >
>> >>>>> >> >> >
>> >>>>> >> >>
>> >>>>> >> >>
>> >>>>> >>
>> >>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbo
>> x/%3CCAKJfLDP
>> >>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>> >>>>> >> >> 40mail.gmail.com
>> >>>>> >> >> > > %3E)
>> >>>>> >> >> > > > > >. I am proposing following functionality in Apex to
>> help
>> >>>>> >> >> scheduling
>> >>>>> >> >> > > > > >and better resource utilization for batch jobs.
>> Please
>> >>>>> provide
>> >>>>> >> >> your
>> >>>>> >> >> > > > > >comments.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Each operator in DAG consumes yarn resources,
>> sometimes it
>> >>>>> is
>> >>>>> >> >> > > > > >desirable to return the resources to yarn when no
>> data is
>> >>>>> >> >> available
>> >>>>> >> >> > > > > >for processing, and deploy whole DAG once data
>> starts to
>> >>>>> >> >>appear.
>> >>>>> >> >> For
>> >>>>> >> >> > > > > >this to happen automatically, we will need some data
>> >>>>> monitoring
>> >>>>> >> >> > > > > >operators running in the DAG to trigger restart and
>> >>>>> shutdown of
>> >>>>> >> >> the
>> >>>>> >> >> > > > > >operators in the DAG.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Apex already have such api to dynamically change the
>> running
>> >>>>> >> >>dag
>> >>>>> >> >> > > > > >through cli. We could provide similar API available
>> to
>> >>>>> >> >>operators
>> >>>>> >> >> > which
>> >>>>> >> >> > > > > >will trigger dag modification at runtime. This
>> information
>> >>>>> can
>> >>>>> >> >>be
>> >>>>> >> >> > > > > >passed to master using heartbeat RPC and master will
>> make
>> >>>>> >> >> > > > > >required changed to the DAG. let me know what do you
>> think
>> >>>>> >> >>about
>> >>>>> >> >> > it..
>> >>>>> >> >> > > > > >something like below.
>> >>>>> >> >> > > > > >Context.beginDagChange();
>> >>>>> >> >> > > > > >context.addOperator("o1") <== launch operator from
>> previous
>> >>>>> >> >> > > > check-pointed
>> >>>>> >> >> > > > > >state.
>> >>>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <==
>> create new
>> >>>>> >> >>operator
>> >>>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>> >>>>> >> >> > > > > >context.shutdown("o3"); <== delete this and
>> downstream
>> >>>>> >> >>operators
>> >>>>> >> >> > from
>> >>>>> >> >> > > > the
>> >>>>> >> >> > > > > >DAG.
>> >>>>> >> >> > > > > >context.apply();  <== dag changes will be send to
>> master,
>> >>>>> and
>> >>>>> >> >> master
>> >>>>> >> >> > > > > >will apply these changes.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Similarly API for other functionalities such as
>> locality
>> >>>>> >> >>settings
>> >>>>> >> >> > > > > >needs to be provided.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Provide an API callable from operator to launch a
>> DAG. The
>> >>>>> >> >> operator
>> >>>>> >> >> > > > > >will prepare an dag object and submit it to the
>> yarn, the
>> >>>>> DAG
>> >>>>> >> >>will
>> >>>>> >> >> > be
>> >>>>> >> >> > > > > >scheduled as a new application. This way complex
>> schedulers
>> >>>>> >> >>can be
>> >>>>> >> >> > > > > >written as operators.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >public SchedulerOperator implements Operator {
>> >>>>> >> >> > > > > >   void handleIdleTime() {
>> >>>>> >> >> > > > > >      // check of conditions to start a job (for
>> example
>> >>>>> enough
>> >>>>> >> >> > files
>> >>>>> >> >> > > > > >available, enough items are available in kafa, or
>> time has
>> >>>>> >> >>reached
>> >>>>> >> >> > > > > >     Dag dag = context.createDAG();
>> >>>>> >> >> > > > > >     dag.addOperator();
>> >>>>> >> >> > > > > >     dag.addOperator();
>> >>>>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>> >>>>> >> >> > > > > >     lOptions.oldId = ""; // start for this
>> checkpoint.
>> >>>>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag,
>> lOptions);
>> >>>>> >> >> > > > > >   }
>> >>>>> >> >> > > > > >}
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >DagHandler will have methods to monitor the final
>> state of
>> >>>>> >> >> > > > > >application, or to kill the DAG
>> >>>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
>> >>>>> terminates
>> >>>>> >> >> > > > > >dagHandler.status()  <== get the status of
>> application.
>> >>>>> >> >> > > > > >dagHandler.kill() <== kill the running application.
>> >>>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >The more complex Scheduler operators could be
>> written to
>> >>>>> manage
>> >>>>> >> >> the
>> >>>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Regards,
>> >>>>> >> >> > > > > >-Tushar.
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > ______________________________
>> __________________________
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > The information contained in this e-mail is
>> confidential
>> >>>>> and/or
>> >>>>> >> >> > > > > proprietary to Capital One and/or its affiliates and
>> may
>> >>>>> only be
>> >>>>> >> >> used
>> >>>>> >> >> > > > > solely in performance of work or services for Capital
>> One.
>> >>>>> The
>> >>>>> >> >> > > > information
>> >>>>> >> >> > > > > transmitted herewith is intended only for use by the
>> >>>>> individual
>> >>>>> >> >>or
>> >>>>> >> >> > > entity
>> >>>>> >> >> > > > > to which it is addressed. If the reader of this
>> message is
>> >>>>> not
>> >>>>> >> >>the
>> >>>>> >> >> > > > intended
>> >>>>> >> >> > > > > recipient, you are hereby notified that any review,
>> >>>>> >> >>retransmission,
>> >>>>> >> >> > > > > dissemination, distribution, copying or other use of,
>> or
>> >>>>> taking
>> >>>>> >> >>of
>> >>>>> >> >> > any
>> >>>>> >> >> > > > > action in reliance upon this information is strictly
>> >>>>> >> >>prohibited. If
>> >>>>> >> >> > you
>> >>>>> >> >> > > > > have received this communication in error, please
>> contact the
>> >>>>> >> >> sender
>> >>>>> >> >> > > and
>> >>>>> >> >> > > > > delete the material from your computer.
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > >
>> >>>>> >> >> > >
>> >>>>> >> >> >
>> >>>>> >> >>
>> >>>>> >>
>> >>>>> >> ________________________________________________________
>> >>>>> >>
>> >>>>> >> The information contained in this e-mail is confidential and/or
>> >>>>> >> proprietary to Capital One and/or its affiliates and may only be
>> used
>> >>>>> >> solely in performance of work or services for Capital One. The
>> >>>>> information
>> >>>>> >> transmitted herewith is intended only for use by the individual
>> or
>> >>>>> entity
>> >>>>> >> to which it is addressed. If the reader of this message is not
>> the
>> >>>>> intended
>> >>>>> >> recipient, you are hereby notified that any review,
>> retransmission,
>> >>>>> >> dissemination, distribution, copying or other use of, or taking
>> of any
>> >>>>> >> action in reliance upon this information is strictly prohibited.
>> If you
>> >>>>> >> have received this communication in error, please contact the
>> sender and
>> >>>>> >> delete the material from your computer.
>> >>>>> >>
>> >>>>> ________________________________________________________
>> >>>>>
>> >>>>> The information contained in this e-mail is confidential and/or
>> >>>>> proprietary to Capital One and/or its affiliates and may only be
>> used
>> >>>>> solely in performance of work or services for Capital One. The
>> information
>> >>>>> transmitted herewith is intended only for use by the individual or
>> entity
>> >>>>> to which it is addressed. If the reader of this message is not the
>> intended
>> >>>>> recipient, you are hereby notified that any review, retransmission,
>> >>>>> dissemination, distribution, copying or other use of, or taking of
>> any
>> >>>>> action in reliance upon this information is strictly prohibited. If
>> you
>> >>>>> have received this communication in error, please contact the
>> sender and
>> >>>>> delete the material from your computer.
>> >>>>>
>>
>
>