You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Tushar Gosavi <tu...@datatorrent.com> on 2016/06/21 18:09:04 UTC

APEXCORE-408 : Ability to schedule Sub-DAG from running application

Hi All,

We have seen few use cases in field which require Apex application
scheduling based on some condition. This has also came up as part of
Batch Support in Apex previously
(http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDPXNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E)
. I am proposing following functionality in Apex to help scheduling
and better resource utilization for batch jobs. Please provide your
comments.

Usecase 1 - Dynamic Dag modification.

Each operator in DAG consumes yarn resources, sometimes it is
desirable to return the resources to yarn when no data is available
for processing, and deploy whole DAG once data starts to appear. For
this to happen automatically, we will need some data monitoring
operators running in the DAG to trigger restart and shutdown of the
operators in the DAG.

Apex already have such api to dynamically change the running dag
through cli. We could provide similar API available to operators which
will trigger dag modification at runtime. This information can be
passed to master using heartbeat RPC and master will make
required changed to the DAG. let me know what do you think about it..
something like below.
Context.beginDagChange();
context.addOperator("o1") <== launch operator from previous check-pointed state.
context.addOperator("o2", new Operator2()) <== create new operator
context.addStream("s1", "reader.output", "o1.input");
context.shutdown("o3"); <== delete this and downstream operators from the DAG.
context.apply();  <== dag changes will be send to master, and master
will apply these changes.

Similarly API for other functionalities such as locality settings
needs to be provided.


Usecase 2 - Classic Batch Scheduling.

Provide an API callable from operator to launch a DAG. The operator
will prepare an dag object and submit it to the yarn, the DAG will be
scheduled as a new application. This way complex schedulers can be
written as operators.

public SchedulerOperator implements Operator {
   void handleIdleTime() {
      // check of conditions to start a job (for example enough files
available, enough items are available in kafa, or time has reached
     Dag dag = context.createDAG();
     dag.addOperator();
     dag.addOperator();
     LaunchOptions lOptions = new LaunchOptions();
     lOptions.oldId = ""; // start for this checkpoint.
     DagHandler dagHandler = context.submit(dag, lOptions);
   }
}

DagHandler will have methods to monitor the final state of
application, or to kill the DAG
dagHandler.waitForCompletion() <== wait till the DAG terminates
dagHandler.status()  <== get the status of application.
dagHandler.kill() <== kill the running application.
dagHandler.shutdown() <== shutdown the application.

The more complex Scheduler operators could be written to manage the
workflows, i.e DAG of DAGs. using these APIs.

Regards,
-Tushar.

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Tushar Gosavi <tu...@datatorrent.com>.
@Pramod and @Thomas​, I have updated the document with latest progress and
approach. The simple application which runs one dag after another is
located at.

https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/lindag/DagSchedulingApp.java

The apex-core chagnes are
https://github.com/apache/apex-core/compare/master...tushargosavi:APEXCORE-408
.
This also contains changes from pull request
https://github.com/apache/apex-core/pull/476. The code needs lot of cleanup
but it is just an early attempt to implement the functionality. Can you go
over and see if overall approach is fine or not. I will start working on it
accordingly.

Thanks,
- Tushar.


On Mon, Mar 6, 2017 at 7:35 PM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi All,
>
> I have started working on this again. The top level design document is
> located at
>
> https://docs.google.com/document/d/1OugxutMYI-
> JwB9Z7hxjTWwiAKQsqbYPujDmgwlpPOXM/edit?usp=sharing
>
> The WIP branch is
> https://github.com/tushargosavi/apex-core/tree/dag_schedule.redesign
>
> @Thomas can you please review the document and comment on design approach.
>
> Regards,
> -Tushar.
>
>
> On Tue, Jan 24, 2017 at 4:47 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
>
>> Hi All,
>>
>> I have updated the design document as per review comments on pull
>> request (https://issues.apache.org/jira/secure/attachment/12849094/
>> dag.docx).
>> Please provide your suggestion.
>>
>> - Tushar.
>>
>>
>> On Fri, Sep 16, 2016 at 6:40 PM, Tushar Gosavi <tu...@datatorrent.com>
>> wrote:
>> > Hi All,
>> >
>> > I have opened a review pull request for dynamic dag modification
>> > through stats listener (https://github.com/apache/apex-core/pull/393).
>> > Please review and provide
>> > comments/suggestions.
>> >
>> > It provides following functionality
>> > - StatsListener can access the opearator name for easily detecting
>> > which opearator stats are being processed.
>> > - StatsListener can create a instance of object through which it can
>> > submit dag modifications to the engine.
>> > - StatsListener can return dag changes as a response to engine.
>> > - PlanModifier is modified to take a DAG and apply it on the existing
>> > running DAG and deploy the changes.
>> >
>> > The following functionality is not working yet.
>> >
>> > - The new opearator does not start from the correct windowId
>> > (https://issues.apache.org/jira/browse/APEXCORE-532)
>> > - Relanched application failed to start when it was killed after
>> > dynamic dag modification.
>> > - There is no support for resuming operator from previous state when
>> > they were removed. This could be achived through
>> >   readig state through external storage on setup.
>> > - persist operator support is not present for newly added streams.
>> >
>> > The demo application using the feature is available at
>> > https://github.com/tushargosavi/apex-dynamic-scheduling
>> >
>> > There are two variations of WordCount application. The first variation
>> > detects the presence of
>> > new files and start a disconnected DAG to process the data.
>> > (https://github.com/tushargosavi/apex-dynamic-scheduling/
>> blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java)
>> >
>> > The second application
>> > (https://github.com/tushargosavi/apex-dynamic-scheduling/
>> blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java),
>> > starts with reader operator, and provides pendingFiles as auto-metric
>> > to stat listener. On detecting pending files it attaches splitter
>> > counter and output
>> > operator to the read operator. Once files are processed the splitter,
>> > counter and output operators are removed and
>> > added back again if new data files are added into the directory.
>> >
>> > Regards,
>> > -Tushar.
>> >
>> >
>> > On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <tu...@datatorrent.com>
>> wrote:
>> >> Hi All,
>> >>
>> >> I was able to prototype an simple word count application, which will
>> >> start with just a single file reader operator. File reader operator
>> >> will emit pendingFiles as metric to StatsListener. The statslistener
>> >> will change DAG once enough files are available. The listener will
>> return
>> >> plan change to add word splitter, counter and console operator to the
>> >> reader and complete the DAG for wordcount.
>> >>
>> >> After 120 windows of inactivity, the three operators will be removed
>> >> from DAG again. When new set of files are added these operators are
>> >> added back again.
>> >>
>> >> The high level proposal document:
>> >> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-2r8WHR
>> jg83zEM8aqpPAXivFRQ/edit?usp=sharing
>> >>
>> >> The prototype code is at :
>> >> https://github.com/tushargosavi/apex-dynamic-scheduling/
>> >> The Application files are
>> >> https://github.com/tushargosavi/apex-dynamic-scheduling/
>> blob/master/src/main/java/com/datatorrent/wordcount/FileStat
>> ListenerSameDag.java
>> >> https://github.com/tushargosavi/apex-dynamic-scheduling/
>> blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java
>> >>
>> >> Please provide your feedback.
>> >>
>> >> Some challenges yet to resolve are
>> >> - Restoring operator state from previously removed operator.
>> >> - Handling cuncurrent modifications to DAG from multiple StatsListener.
>> >> - Making DAG changes persistent, user should be able to restart the
>> >> application if application was killed with modified dag.
>> >>
>> >> Thanks,
>> >> -Tushar.
>> >>
>> >> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tu...@datatorrent.com>
>> wrote:
>> >>> Hi All,
>> >>>
>> >>> I have dome some initial prototype which allows stat listener to
>> >>> specify dag changes, and the dag changes are applied asynchronously.
>> >>>
>> >>> The changes involved are
>> >>> - Add DagChangeSet object which is inherited from DAG, supporting
>> >>> methods to remove
>> >>>   operator and streams.
>> >>>
>> >>> - The stat listener will return this object in Response, and platform
>> >>> will apply changes specified in response to the DAG.
>> >>>
>> >>>
>> >>> The Apex changes
>> >>> https://github.com/apache/apex-core/compare/master...tusharg
>> osavi:scheduler?expand=1
>> >>>
>> >>> The correspondign Demo application, which one operator monitors the
>> >>> directory for files, and launch the wordcount DAG in
>> >>> same application master when files are available.
>> >>> https://github.com/tushargosavi/incubator-apex-malhar/tree/1
>> 78ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/
>> main/java/com/datatorrent/demos/wordcount/schedular
>> >>>
>> >>> Example of stat listerner which monitors a metric and instruct master
>> >>> to start a dag.
>> >>>
>> >>>  /** look for more than 100 files in a directory, before lauching the
>> DAG */
>> >>> @Override
>> >>>   public Response processStats(BatchedOperatorStats stats)
>> >>>   {
>> >>>     for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
>> >>>       // pendingFiles is autometric.
>> >>>       Integer value = (Integer)ws.metrics.get("pendingFiles");
>> >>>       LOG.info("stats recevied for {} pendingFiles {}",
>> >>> stats.getOperatorId(), value);
>> >>>       if (value != null  && value > 100 && !dagStarted) {
>> >>>         dagStarted = true;
>> >>>         Response resp = new Response();
>> >>>         resp.dag = getWordCountDag((String)ws.met
>> rics.get("directory"));
>> >>>         counter = 0;
>> >>>         return resp;
>> >>>       }
>> >>>     }
>> >>>     return null;
>> >>>   }
>> >>>
>> >>>   DAGChangeSet getWordCountDag(String dir)
>> >>>     {
>> >>>       DAGChangeSet dag = new DAGChangeSet();
>> >>>       LineByLineFileInputOperator reader = dag.addOperator("Reader",
>> >>> new LineByLineFileInputOperator());
>> >>>       List<StatsListener> listeners = new ArrayList<>();
>> >>>       listeners.add(this);
>> >>>       dag.getMeta(reader).getAttributes().put(Context.OperatorCon
>> text.STATS_LISTENERS,
>> >>> listeners);
>> >>>       reader.setDirectory(dir);
>> >>>       LineSplitter splitter = dag.addOperator("SplitteR", new
>> LineSplitter());
>> >>>       UniqueCounter<String> counter = dag.addOperator("Counter", new
>> >>> UniqueCounter<String>());
>> >>>       ConsoleOutputOperator out = dag.addOperator("Output", new
>> >>> ConsoleOutputOperator());
>> >>>       dag.addStream("s1", reader.output, splitter.input);
>> >>>       dag.addStream("s2", splitter.words, counter.data);
>> >>>       dag.addStream("s3", counter.count, out.input);
>> >>>       return dag;
>> >>>     }
>> >>>
>> >>> Let me know if this type of API is acceptable for launching the DAG.
>> >>> This is an API to specify DAG changes. The scheduler functionality
>> >>> will use
>> >>> this API.
>> >>>
>> >>>
>> >>> Regards,
>> >>> -Tushar.
>> >>>
>> >>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <th...@datatorrent.com>
>> wrote:
>> >>>> I like the idea of keeping heavy lifting and custom code out of the
>> master,
>> >>>> if possible. You find that split in responsibilities even in the
>> case of
>> >>>> partitioning (Kafka connector for example). The change that requires
>> >>>> partitioning may be detected as byproduct of the regular processing
>> in the
>> >>>> container, the information relayed to the master, the action being
>> taken
>> >>>> there.
>> >>>>
>> >>>> We should separate all the different pieces and then decide where
>> they run.
>> >>>> There is detecting the need for a plan change, then effecting the
>> change
>> >>>> (which requires full DAG view and absolutely has to/should be in the
>> >>>> master).
>> >>>>
>> >>>> Thomas
>> >>>>
>> >>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
>> >>>> Chandni.Singh@capitalone.com> wrote:
>> >>>>
>> >>>>> We have couple of components that already run in  master -
>> partitioners,
>> >>>>> stats listeners,  metrics aggregators.  The problem of crashing the
>> master
>> >>>>> is not specific to just scheduler, isn't it?
>> >>>>> ________________________________
>> >>>>> From: Tushar Gosavi <tu...@datatorrent.com>
>> >>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM
>> >>>>> To: dev@apex.apache.org
>> >>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
>> >>>>> application
>> >>>>>
>> >>>>> I was thinking about avoiding running user code in master, As a
>> crash
>> >>>>> in master takes down all containers with it. hence was going for
>> >>>>> scheduler as an operator, crash in scheduler won't kill the
>> >>>>> application, master can restart the scheduler back and it can start
>> >>>>> monitoring the job again and change the DAG when required. But this
>> >>>>> will require communication between master and scheduler for
>> monitoring
>> >>>>> of operator status/stats.
>> >>>>>
>> >>>>> It is considerably easy to put scheduling functionality in master,
>> as
>> >>>>> we have access to operator stats and there is communication channel
>> >>>>> already opened between master and operators. And custom scheduler
>> can
>> >>>>> be written as shared stat listener, with additional API available to
>> >>>>> listener to add/remove/deploy/undeploy etc.. operators.
>> >>>>>
>> >>>>> Regards,
>> >>>>> - Tushar.
>> >>>>>
>> >>>>>
>> >>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <
>> thomas@datatorrent.com>
>> >>>>> wrote:
>> >>>>> > Right, if it runs in the app master and does not rely on unmanaged
>> >>>>> external
>> >>>>> > processes, then these requirements can be met.
>> >>>>> >
>> >>>>> > This capability seeks to avoid users having to deal with external
>> >>>>> > schedulers or workflows if all they want is to split a DAG that is
>> >>>>> > logically one application into multiple stages for resource
>> optimization.
>> >>>>> > This is not very different from the need to have elasticity in
>> terms of
>> >>>>> > partitions depending to the availability of input, as you point
>> out.
>> >>>>> >
>> >>>>> >
>> >>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
>> >>>>> > Chandni.Singh@capitalone.com> wrote:
>> >>>>> >
>> >>>>> >> Scheduling IMO belongs to App master. Operators can influence
>> it, for
>> >>>>> eg.
>> >>>>> >> File splitter can indicate that no more file to process.
>> >>>>> >>
>> >>>>> >> I don’t understand how that can not integrate with all the
>> aspects-
>> >>>>> >> operability, fault tolerance and security.
>> >>>>> >>
>> >>>>> >> Chandni
>> >>>>> >>
>> >>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <
>> chinmay@datatorrent.com>
>> >>>>> wrote:
>> >>>>> >>
>> >>>>> >> >I think its a good idea to have a scheduling operator when you
>> need to
>> >>>>> >> >start a part of the DAG when some trigger happens (for eg.
>> FileSplitter
>> >>>>> >> >identifying new files in FS) and otherwise bring it down to save
>> >>>>> >> >resources.
>> >>>>> >> >
>> >>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>> >>>>> >> >timothytiborfarkas@gmail.com> wrote:
>> >>>>> >> >
>> >>>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an
>> API
>> >>>>> >> >>completely
>> >>>>> >> >> independent of a DAG or an operator. It could be used by a
>> >>>>> commandline
>> >>>>> >> >>tool
>> >>>>> >> >> running on your laptop, a script, or it could happen to be
>> used by an
>> >>>>> >> >> Operator running in a DAG and a StatsListener.
>> >>>>> >> >>
>> >>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
>> >>>>> thomas@datatorrent.com>
>> >>>>> >> >> wrote:
>> >>>>> >> >>
>> >>>>> >> >> > Scheduling can be independent, although we have use cases
>> where the
>> >>>>> >> >> > scheduling depends on completion of processing
>> (multi-staged batch
>> >>>>> >> >>jobs
>> >>>>> >> >> > where unused resources need to be freed).
>> >>>>> >> >> >
>> >>>>> >> >> > Both can be accomplished with a stats listener.
>> >>>>> >> >> >
>> >>>>> >> >> > There can be a "scheduling operator" that brings up and
>> removes DAG
>> >>>>> >> >> > fragments as needed.
>> >>>>> >> >> >
>> >>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>> >>>>> >> >><si...@gmail.com>
>> >>>>> >> >> > wrote:
>> >>>>> >> >> >
>> >>>>> >> >> > > Hi,
>> >>>>> >> >> > > IMO scheduling a job can be independent of any operator
>> while
>> >>>>> >> >> > > StatsListeners are not.  I understand that in a lot of
>> cases
>> >>>>> >> >> input/output
>> >>>>> >> >> > > operators will decide when the job ends but there can be
>> cases
>> >>>>> when
>> >>>>> >> >> > > scheduling can be independent of it.
>> >>>>> >> >> > >
>> >>>>> >> >> > > Thanks,
>> >>>>> >> >> > > Chandni
>> >>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <
>> thomas@datatorrent.com
>> >>>>> >
>> >>>>> >> >> wrote:
>> >>>>> >> >> > >
>> >>>>> >> >> > > > This looks like something that coordination wise
>> belongs into
>> >>>>> the
>> >>>>> >> >> > master
>> >>>>> >> >> > > > and can be done with a shared stats listener.
>> >>>>> >> >> > > >
>> >>>>> >> >> > > > The operator request/response protocol could be used
>> the relay
>> >>>>> the
>> >>>>> >> >> data
>> >>>>> >> >> > > for
>> >>>>> >> >> > > > the scheduling decisions.
>> >>>>> >> >> > > >
>> >>>>> >> >> > > > Thomas
>> >>>>> >> >> > > >
>> >>>>> >> >> > > >
>> >>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>> >>>>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
>> >>>>> >> >> > > >
>> >>>>> >> >> > > > > Hi Tushar,
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > I have some questions about the use case 2: Batch
>> Support
>> >>>>> >> >> > > > > I don¹t understand the advantages of providing batch
>> support
>> >>>>> by
>> >>>>> >> >> > having
>> >>>>> >> >> > > an
>> >>>>> >> >> > > > > operator as a scheduler.
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > An approach that seemed a little more straightforward
>> to me
>> >>>>> was
>> >>>>> >> >>to
>> >>>>> >> >> > > expose
>> >>>>> >> >> > > > > an API for scheduler. If there is a scheduler set
>> then the
>> >>>>> >> >>master
>> >>>>> >> >> > uses
>> >>>>> >> >> > > > and
>> >>>>> >> >> > > > > schedules operators. By default there isn¹t any
>> scheduler and
>> >>>>> >> >>the
>> >>>>> >> >> job
>> >>>>> >> >> > > is
>> >>>>> >> >> > > > > run as it is now.
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > Maybe this is too simplistic but can you please let
>> me know
>> >>>>> why
>> >>>>> >> >> > having
>> >>>>> >> >> > > an
>> >>>>> >> >> > > > > operator as a scheduler is a better way?
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > Thanks,
>> >>>>> >> >> > > > > Chandni
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
>> >>>>> tushar@datatorrent.com>
>> >>>>> >> >> > wrote:
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > >Hi All,
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >We have seen few use cases in field which require
>> Apex
>> >>>>> >> >>application
>> >>>>> >> >> > > > > >scheduling based on some condition. This has also
>> came up as
>> >>>>> >> >>part
>> >>>>> >> >> of
>> >>>>> >> >> > > > > >Batch Support in Apex previously
>> >>>>> >> >> > > > > >(
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > >
>> >>>>> >> >> > >
>> >>>>> >> >> >
>> >>>>> >> >>
>> >>>>> >> >>
>> >>>>> >>
>> >>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbo
>> x/%3CCAKJfLDP
>> >>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>> >>>>> >> >> 40mail.gmail.com
>> >>>>> >> >> > > %3E)
>> >>>>> >> >> > > > > >. I am proposing following functionality in Apex to
>> help
>> >>>>> >> >> scheduling
>> >>>>> >> >> > > > > >and better resource utilization for batch jobs.
>> Please
>> >>>>> provide
>> >>>>> >> >> your
>> >>>>> >> >> > > > > >comments.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Each operator in DAG consumes yarn resources,
>> sometimes it
>> >>>>> is
>> >>>>> >> >> > > > > >desirable to return the resources to yarn when no
>> data is
>> >>>>> >> >> available
>> >>>>> >> >> > > > > >for processing, and deploy whole DAG once data
>> starts to
>> >>>>> >> >>appear.
>> >>>>> >> >> For
>> >>>>> >> >> > > > > >this to happen automatically, we will need some data
>> >>>>> monitoring
>> >>>>> >> >> > > > > >operators running in the DAG to trigger restart and
>> >>>>> shutdown of
>> >>>>> >> >> the
>> >>>>> >> >> > > > > >operators in the DAG.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Apex already have such api to dynamically change the
>> running
>> >>>>> >> >>dag
>> >>>>> >> >> > > > > >through cli. We could provide similar API available
>> to
>> >>>>> >> >>operators
>> >>>>> >> >> > which
>> >>>>> >> >> > > > > >will trigger dag modification at runtime. This
>> information
>> >>>>> can
>> >>>>> >> >>be
>> >>>>> >> >> > > > > >passed to master using heartbeat RPC and master will
>> make
>> >>>>> >> >> > > > > >required changed to the DAG. let me know what do you
>> think
>> >>>>> >> >>about
>> >>>>> >> >> > it..
>> >>>>> >> >> > > > > >something like below.
>> >>>>> >> >> > > > > >Context.beginDagChange();
>> >>>>> >> >> > > > > >context.addOperator("o1") <== launch operator from
>> previous
>> >>>>> >> >> > > > check-pointed
>> >>>>> >> >> > > > > >state.
>> >>>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <==
>> create new
>> >>>>> >> >>operator
>> >>>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>> >>>>> >> >> > > > > >context.shutdown("o3"); <== delete this and
>> downstream
>> >>>>> >> >>operators
>> >>>>> >> >> > from
>> >>>>> >> >> > > > the
>> >>>>> >> >> > > > > >DAG.
>> >>>>> >> >> > > > > >context.apply();  <== dag changes will be send to
>> master,
>> >>>>> and
>> >>>>> >> >> master
>> >>>>> >> >> > > > > >will apply these changes.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Similarly API for other functionalities such as
>> locality
>> >>>>> >> >>settings
>> >>>>> >> >> > > > > >needs to be provided.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Provide an API callable from operator to launch a
>> DAG. The
>> >>>>> >> >> operator
>> >>>>> >> >> > > > > >will prepare an dag object and submit it to the
>> yarn, the
>> >>>>> DAG
>> >>>>> >> >>will
>> >>>>> >> >> > be
>> >>>>> >> >> > > > > >scheduled as a new application. This way complex
>> schedulers
>> >>>>> >> >>can be
>> >>>>> >> >> > > > > >written as operators.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >public SchedulerOperator implements Operator {
>> >>>>> >> >> > > > > >   void handleIdleTime() {
>> >>>>> >> >> > > > > >      // check of conditions to start a job (for
>> example
>> >>>>> enough
>> >>>>> >> >> > files
>> >>>>> >> >> > > > > >available, enough items are available in kafa, or
>> time has
>> >>>>> >> >>reached
>> >>>>> >> >> > > > > >     Dag dag = context.createDAG();
>> >>>>> >> >> > > > > >     dag.addOperator();
>> >>>>> >> >> > > > > >     dag.addOperator();
>> >>>>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>> >>>>> >> >> > > > > >     lOptions.oldId = ""; // start for this
>> checkpoint.
>> >>>>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag,
>> lOptions);
>> >>>>> >> >> > > > > >   }
>> >>>>> >> >> > > > > >}
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >DagHandler will have methods to monitor the final
>> state of
>> >>>>> >> >> > > > > >application, or to kill the DAG
>> >>>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
>> >>>>> terminates
>> >>>>> >> >> > > > > >dagHandler.status()  <== get the status of
>> application.
>> >>>>> >> >> > > > > >dagHandler.kill() <== kill the running application.
>> >>>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >The more complex Scheduler operators could be
>> written to
>> >>>>> manage
>> >>>>> >> >> the
>> >>>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>> >>>>> >> >> > > > > >
>> >>>>> >> >> > > > > >Regards,
>> >>>>> >> >> > > > > >-Tushar.
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > ______________________________
>> __________________________
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > > The information contained in this e-mail is
>> confidential
>> >>>>> and/or
>> >>>>> >> >> > > > > proprietary to Capital One and/or its affiliates and
>> may
>> >>>>> only be
>> >>>>> >> >> used
>> >>>>> >> >> > > > > solely in performance of work or services for Capital
>> One.
>> >>>>> The
>> >>>>> >> >> > > > information
>> >>>>> >> >> > > > > transmitted herewith is intended only for use by the
>> >>>>> individual
>> >>>>> >> >>or
>> >>>>> >> >> > > entity
>> >>>>> >> >> > > > > to which it is addressed. If the reader of this
>> message is
>> >>>>> not
>> >>>>> >> >>the
>> >>>>> >> >> > > > intended
>> >>>>> >> >> > > > > recipient, you are hereby notified that any review,
>> >>>>> >> >>retransmission,
>> >>>>> >> >> > > > > dissemination, distribution, copying or other use of,
>> or
>> >>>>> taking
>> >>>>> >> >>of
>> >>>>> >> >> > any
>> >>>>> >> >> > > > > action in reliance upon this information is strictly
>> >>>>> >> >>prohibited. If
>> >>>>> >> >> > you
>> >>>>> >> >> > > > > have received this communication in error, please
>> contact the
>> >>>>> >> >> sender
>> >>>>> >> >> > > and
>> >>>>> >> >> > > > > delete the material from your computer.
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > > >
>> >>>>> >> >> > > >
>> >>>>> >> >> > >
>> >>>>> >> >> >
>> >>>>> >> >>
>> >>>>> >>
>> >>>>> >> ________________________________________________________
>> >>>>> >>
>> >>>>> >> The information contained in this e-mail is confidential and/or
>> >>>>> >> proprietary to Capital One and/or its affiliates and may only be
>> used
>> >>>>> >> solely in performance of work or services for Capital One. The
>> >>>>> information
>> >>>>> >> transmitted herewith is intended only for use by the individual
>> or
>> >>>>> entity
>> >>>>> >> to which it is addressed. If the reader of this message is not
>> the
>> >>>>> intended
>> >>>>> >> recipient, you are hereby notified that any review,
>> retransmission,
>> >>>>> >> dissemination, distribution, copying or other use of, or taking
>> of any
>> >>>>> >> action in reliance upon this information is strictly prohibited.
>> If you
>> >>>>> >> have received this communication in error, please contact the
>> sender and
>> >>>>> >> delete the material from your computer.
>> >>>>> >>
>> >>>>> ________________________________________________________
>> >>>>>
>> >>>>> The information contained in this e-mail is confidential and/or
>> >>>>> proprietary to Capital One and/or its affiliates and may only be
>> used
>> >>>>> solely in performance of work or services for Capital One. The
>> information
>> >>>>> transmitted herewith is intended only for use by the individual or
>> entity
>> >>>>> to which it is addressed. If the reader of this message is not the
>> intended
>> >>>>> recipient, you are hereby notified that any review, retransmission,
>> >>>>> dissemination, distribution, copying or other use of, or taking of
>> any
>> >>>>> action in reliance upon this information is strictly prohibited. If
>> you
>> >>>>> have received this communication in error, please contact the
>> sender and
>> >>>>> delete the material from your computer.
>> >>>>>
>>
>
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi All,

I have started working on this again. The top level design document is
located at

https://docs.google.com/document/d/1OugxutMYI-JwB9Z7hxjTWwiAKQsqbYPujDmgwlpPOXM/edit?usp=sharing

The WIP branch is
https://github.com/tushargosavi/apex-core/tree/dag_schedule.redesign

@Thomas can you please review the document and comment on design approach.

Regards,
-Tushar.


On Tue, Jan 24, 2017 at 4:47 PM, Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi All,
>
> I have updated the design document as per review comments on pull
> request (https://issues.apache.org/jira/secure/attachment/
> 12849094/dag.docx).
> Please provide your suggestion.
>
> - Tushar.
>
>
> On Fri, Sep 16, 2016 at 6:40 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
> > Hi All,
> >
> > I have opened a review pull request for dynamic dag modification
> > through stats listener (https://github.com/apache/apex-core/pull/393).
> > Please review and provide
> > comments/suggestions.
> >
> > It provides following functionality
> > - StatsListener can access the opearator name for easily detecting
> > which opearator stats are being processed.
> > - StatsListener can create a instance of object through which it can
> > submit dag modifications to the engine.
> > - StatsListener can return dag changes as a response to engine.
> > - PlanModifier is modified to take a DAG and apply it on the existing
> > running DAG and deploy the changes.
> >
> > The following functionality is not working yet.
> >
> > - The new opearator does not start from the correct windowId
> > (https://issues.apache.org/jira/browse/APEXCORE-532)
> > - Relanched application failed to start when it was killed after
> > dynamic dag modification.
> > - There is no support for resuming operator from previous state when
> > they were removed. This could be achived through
> >   readig state through external storage on setup.
> > - persist operator support is not present for newly added streams.
> >
> > The demo application using the feature is available at
> > https://github.com/tushargosavi/apex-dynamic-scheduling
> >
> > There are two variations of WordCount application. The first variation
> > detects the presence of
> > new files and start a disconnected DAG to process the data.
> > (https://github.com/tushargosavi/apex-dynamic-
> scheduling/blob/master/src/main/java/com/datatorrent/
> wordcount/WordCountApp.java)
> >
> > The second application
> > (https://github.com/tushargosavi/apex-dynamic-
> scheduling/blob/master/src/main/java/com/datatorrent/
> wordcount/ExtendApp.java),
> > starts with reader operator, and provides pendingFiles as auto-metric
> > to stat listener. On detecting pending files it attaches splitter
> > counter and output
> > operator to the read operator. Once files are processed the splitter,
> > counter and output operators are removed and
> > added back again if new data files are added into the directory.
> >
> > Regards,
> > -Tushar.
> >
> >
> > On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
> >> Hi All,
> >>
> >> I was able to prototype an simple word count application, which will
> >> start with just a single file reader operator. File reader operator
> >> will emit pendingFiles as metric to StatsListener. The statslistener
> >> will change DAG once enough files are available. The listener will
> return
> >> plan change to add word splitter, counter and console operator to the
> >> reader and complete the DAG for wordcount.
> >>
> >> After 120 windows of inactivity, the three operators will be removed
> >> from DAG again. When new set of files are added these operators are
> >> added back again.
> >>
> >> The high level proposal document:
> >> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-
> 2r8WHRjg83zEM8aqpPAXivFRQ/edit?usp=sharing
> >>
> >> The prototype code is at :
> >> https://github.com/tushargosavi/apex-dynamic-scheduling/
> >> The Application files are
> >> https://github.com/tushargosavi/apex-dynamic-
> scheduling/blob/master/src/main/java/com/datatorrent/wordcount/
> FileStatListenerSameDag.java
> >> https://github.com/tushargosavi/apex-dynamic-
> scheduling/blob/master/src/main/java/com/datatorrent/
> wordcount/ExtendApp.java
> >>
> >> Please provide your feedback.
> >>
> >> Some challenges yet to resolve are
> >> - Restoring operator state from previously removed operator.
> >> - Handling cuncurrent modifications to DAG from multiple StatsListener.
> >> - Making DAG changes persistent, user should be able to restart the
> >> application if application was killed with modified dag.
> >>
> >> Thanks,
> >> -Tushar.
> >>
> >> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tu...@datatorrent.com>
> wrote:
> >>> Hi All,
> >>>
> >>> I have dome some initial prototype which allows stat listener to
> >>> specify dag changes, and the dag changes are applied asynchronously.
> >>>
> >>> The changes involved are
> >>> - Add DagChangeSet object which is inherited from DAG, supporting
> >>> methods to remove
> >>>   operator and streams.
> >>>
> >>> - The stat listener will return this object in Response, and platform
> >>> will apply changes specified in response to the DAG.
> >>>
> >>>
> >>> The Apex changes
> >>> https://github.com/apache/apex-core/compare/master...
> tushargosavi:scheduler?expand=1
> >>>
> >>> The correspondign Demo application, which one operator monitors the
> >>> directory for files, and launch the wordcount DAG in
> >>> same application master when files are available.
> >>> https://github.com/tushargosavi/incubator-apex-malhar/tree/
> 178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/
> src/main/java/com/datatorrent/demos/wordcount/schedular
> >>>
> >>> Example of stat listerner which monitors a metric and instruct master
> >>> to start a dag.
> >>>
> >>>  /** look for more than 100 files in a directory, before lauching the
> DAG */
> >>> @Override
> >>>   public Response processStats(BatchedOperatorStats stats)
> >>>   {
> >>>     for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
> >>>       // pendingFiles is autometric.
> >>>       Integer value = (Integer)ws.metrics.get("pendingFiles");
> >>>       LOG.info("stats recevied for {} pendingFiles {}",
> >>> stats.getOperatorId(), value);
> >>>       if (value != null  && value > 100 && !dagStarted) {
> >>>         dagStarted = true;
> >>>         Response resp = new Response();
> >>>         resp.dag = getWordCountDag((String)ws.
> metrics.get("directory"));
> >>>         counter = 0;
> >>>         return resp;
> >>>       }
> >>>     }
> >>>     return null;
> >>>   }
> >>>
> >>>   DAGChangeSet getWordCountDag(String dir)
> >>>     {
> >>>       DAGChangeSet dag = new DAGChangeSet();
> >>>       LineByLineFileInputOperator reader = dag.addOperator("Reader",
> >>> new LineByLineFileInputOperator());
> >>>       List<StatsListener> listeners = new ArrayList<>();
> >>>       listeners.add(this);
> >>>       dag.getMeta(reader).getAttributes().put(Context.
> OperatorContext.STATS_LISTENERS,
> >>> listeners);
> >>>       reader.setDirectory(dir);
> >>>       LineSplitter splitter = dag.addOperator("SplitteR", new
> LineSplitter());
> >>>       UniqueCounter<String> counter = dag.addOperator("Counter", new
> >>> UniqueCounter<String>());
> >>>       ConsoleOutputOperator out = dag.addOperator("Output", new
> >>> ConsoleOutputOperator());
> >>>       dag.addStream("s1", reader.output, splitter.input);
> >>>       dag.addStream("s2", splitter.words, counter.data);
> >>>       dag.addStream("s3", counter.count, out.input);
> >>>       return dag;
> >>>     }
> >>>
> >>> Let me know if this type of API is acceptable for launching the DAG.
> >>> This is an API to specify DAG changes. The scheduler functionality
> >>> will use
> >>> this API.
> >>>
> >>>
> >>> Regards,
> >>> -Tushar.
> >>>
> >>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
> >>>> I like the idea of keeping heavy lifting and custom code out of the
> master,
> >>>> if possible. You find that split in responsibilities even in the case
> of
> >>>> partitioning (Kafka connector for example). The change that requires
> >>>> partitioning may be detected as byproduct of the regular processing
> in the
> >>>> container, the information relayed to the master, the action being
> taken
> >>>> there.
> >>>>
> >>>> We should separate all the different pieces and then decide where
> they run.
> >>>> There is detecting the need for a plan change, then effecting the
> change
> >>>> (which requires full DAG view and absolutely has to/should be in the
> >>>> master).
> >>>>
> >>>> Thomas
> >>>>
> >>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
> >>>> Chandni.Singh@capitalone.com> wrote:
> >>>>
> >>>>> We have couple of components that already run in  master -
> partitioners,
> >>>>> stats listeners,  metrics aggregators.  The problem of crashing the
> master
> >>>>> is not specific to just scheduler, isn't it?
> >>>>> ________________________________
> >>>>> From: Tushar Gosavi <tu...@datatorrent.com>
> >>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM
> >>>>> To: dev@apex.apache.org
> >>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
> >>>>> application
> >>>>>
> >>>>> I was thinking about avoiding running user code in master, As a crash
> >>>>> in master takes down all containers with it. hence was going for
> >>>>> scheduler as an operator, crash in scheduler won't kill the
> >>>>> application, master can restart the scheduler back and it can start
> >>>>> monitoring the job again and change the DAG when required. But this
> >>>>> will require communication between master and scheduler for
> monitoring
> >>>>> of operator status/stats.
> >>>>>
> >>>>> It is considerably easy to put scheduling functionality in master, as
> >>>>> we have access to operator stats and there is communication channel
> >>>>> already opened between master and operators. And custom scheduler can
> >>>>> be written as shared stat listener, with additional API available to
> >>>>> listener to add/remove/deploy/undeploy etc.. operators.
> >>>>>
> >>>>> Regards,
> >>>>> - Tushar.
> >>>>>
> >>>>>
> >>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <
> thomas@datatorrent.com>
> >>>>> wrote:
> >>>>> > Right, if it runs in the app master and does not rely on unmanaged
> >>>>> external
> >>>>> > processes, then these requirements can be met.
> >>>>> >
> >>>>> > This capability seeks to avoid users having to deal with external
> >>>>> > schedulers or workflows if all they want is to split a DAG that is
> >>>>> > logically one application into multiple stages for resource
> optimization.
> >>>>> > This is not very different from the need to have elasticity in
> terms of
> >>>>> > partitions depending to the availability of input, as you point
> out.
> >>>>> >
> >>>>> >
> >>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
> >>>>> > Chandni.Singh@capitalone.com> wrote:
> >>>>> >
> >>>>> >> Scheduling IMO belongs to App master. Operators can influence it,
> for
> >>>>> eg.
> >>>>> >> File splitter can indicate that no more file to process.
> >>>>> >>
> >>>>> >> I don’t understand how that can not integrate with all the
> aspects-
> >>>>> >> operability, fault tolerance and security.
> >>>>> >>
> >>>>> >> Chandni
> >>>>> >>
> >>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <
> chinmay@datatorrent.com>
> >>>>> wrote:
> >>>>> >>
> >>>>> >> >I think its a good idea to have a scheduling operator when you
> need to
> >>>>> >> >start a part of the DAG when some trigger happens (for eg.
> FileSplitter
> >>>>> >> >identifying new files in FS) and otherwise bring it down to save
> >>>>> >> >resources.
> >>>>> >> >
> >>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
> >>>>> >> >timothytiborfarkas@gmail.com> wrote:
> >>>>> >> >
> >>>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an
> API
> >>>>> >> >>completely
> >>>>> >> >> independent of a DAG or an operator. It could be used by a
> >>>>> commandline
> >>>>> >> >>tool
> >>>>> >> >> running on your laptop, a script, or it could happen to be
> used by an
> >>>>> >> >> Operator running in a DAG and a StatsListener.
> >>>>> >> >>
> >>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
> >>>>> thomas@datatorrent.com>
> >>>>> >> >> wrote:
> >>>>> >> >>
> >>>>> >> >> > Scheduling can be independent, although we have use cases
> where the
> >>>>> >> >> > scheduling depends on completion of processing (multi-staged
> batch
> >>>>> >> >>jobs
> >>>>> >> >> > where unused resources need to be freed).
> >>>>> >> >> >
> >>>>> >> >> > Both can be accomplished with a stats listener.
> >>>>> >> >> >
> >>>>> >> >> > There can be a "scheduling operator" that brings up and
> removes DAG
> >>>>> >> >> > fragments as needed.
> >>>>> >> >> >
> >>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
> >>>>> >> >><si...@gmail.com>
> >>>>> >> >> > wrote:
> >>>>> >> >> >
> >>>>> >> >> > > Hi,
> >>>>> >> >> > > IMO scheduling a job can be independent of any operator
> while
> >>>>> >> >> > > StatsListeners are not.  I understand that in a lot of
> cases
> >>>>> >> >> input/output
> >>>>> >> >> > > operators will decide when the job ends but there can be
> cases
> >>>>> when
> >>>>> >> >> > > scheduling can be independent of it.
> >>>>> >> >> > >
> >>>>> >> >> > > Thanks,
> >>>>> >> >> > > Chandni
> >>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <
> thomas@datatorrent.com
> >>>>> >
> >>>>> >> >> wrote:
> >>>>> >> >> > >
> >>>>> >> >> > > > This looks like something that coordination wise belongs
> into
> >>>>> the
> >>>>> >> >> > master
> >>>>> >> >> > > > and can be done with a shared stats listener.
> >>>>> >> >> > > >
> >>>>> >> >> > > > The operator request/response protocol could be used the
> relay
> >>>>> the
> >>>>> >> >> data
> >>>>> >> >> > > for
> >>>>> >> >> > > > the scheduling decisions.
> >>>>> >> >> > > >
> >>>>> >> >> > > > Thomas
> >>>>> >> >> > > >
> >>>>> >> >> > > >
> >>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> >>>>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
> >>>>> >> >> > > >
> >>>>> >> >> > > > > Hi Tushar,
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > I have some questions about the use case 2: Batch
> Support
> >>>>> >> >> > > > > I don¹t understand the advantages of providing batch
> support
> >>>>> by
> >>>>> >> >> > having
> >>>>> >> >> > > an
> >>>>> >> >> > > > > operator as a scheduler.
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > An approach that seemed a little more straightforward
> to me
> >>>>> was
> >>>>> >> >>to
> >>>>> >> >> > > expose
> >>>>> >> >> > > > > an API for scheduler. If there is a scheduler set then
> the
> >>>>> >> >>master
> >>>>> >> >> > uses
> >>>>> >> >> > > > and
> >>>>> >> >> > > > > schedules operators. By default there isn¹t any
> scheduler and
> >>>>> >> >>the
> >>>>> >> >> job
> >>>>> >> >> > > is
> >>>>> >> >> > > > > run as it is now.
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > Maybe this is too simplistic but can you please let me
> know
> >>>>> why
> >>>>> >> >> > having
> >>>>> >> >> > > an
> >>>>> >> >> > > > > operator as a scheduler is a better way?
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > Thanks,
> >>>>> >> >> > > > > Chandni
> >>>>> >> >> > > > >
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
> >>>>> tushar@datatorrent.com>
> >>>>> >> >> > wrote:
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > >Hi All,
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >We have seen few use cases in field which require Apex
> >>>>> >> >>application
> >>>>> >> >> > > > > >scheduling based on some condition. This has also
> came up as
> >>>>> >> >>part
> >>>>> >> >> of
> >>>>> >> >> > > > > >Batch Support in Apex previously
> >>>>> >> >> > > > > >(
> >>>>> >> >> > > > >
> >>>>> >> >> > > >
> >>>>> >> >> > >
> >>>>> >> >> >
> >>>>> >> >>
> >>>>> >> >>
> >>>>> >>
> >>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.
> mbox/%3CCAKJfLDP
> >>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
> >>>>> >> >> 40mail.gmail.com
> >>>>> >> >> > > %3E)
> >>>>> >> >> > > > > >. I am proposing following functionality in Apex to
> help
> >>>>> >> >> scheduling
> >>>>> >> >> > > > > >and better resource utilization for batch jobs. Please
> >>>>> provide
> >>>>> >> >> your
> >>>>> >> >> > > > > >comments.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Each operator in DAG consumes yarn resources,
> sometimes it
> >>>>> is
> >>>>> >> >> > > > > >desirable to return the resources to yarn when no
> data is
> >>>>> >> >> available
> >>>>> >> >> > > > > >for processing, and deploy whole DAG once data starts
> to
> >>>>> >> >>appear.
> >>>>> >> >> For
> >>>>> >> >> > > > > >this to happen automatically, we will need some data
> >>>>> monitoring
> >>>>> >> >> > > > > >operators running in the DAG to trigger restart and
> >>>>> shutdown of
> >>>>> >> >> the
> >>>>> >> >> > > > > >operators in the DAG.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Apex already have such api to dynamically change the
> running
> >>>>> >> >>dag
> >>>>> >> >> > > > > >through cli. We could provide similar API available to
> >>>>> >> >>operators
> >>>>> >> >> > which
> >>>>> >> >> > > > > >will trigger dag modification at runtime. This
> information
> >>>>> can
> >>>>> >> >>be
> >>>>> >> >> > > > > >passed to master using heartbeat RPC and master will
> make
> >>>>> >> >> > > > > >required changed to the DAG. let me know what do you
> think
> >>>>> >> >>about
> >>>>> >> >> > it..
> >>>>> >> >> > > > > >something like below.
> >>>>> >> >> > > > > >Context.beginDagChange();
> >>>>> >> >> > > > > >context.addOperator("o1") <== launch operator from
> previous
> >>>>> >> >> > > > check-pointed
> >>>>> >> >> > > > > >state.
> >>>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create
> new
> >>>>> >> >>operator
> >>>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
> >>>>> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream
> >>>>> >> >>operators
> >>>>> >> >> > from
> >>>>> >> >> > > > the
> >>>>> >> >> > > > > >DAG.
> >>>>> >> >> > > > > >context.apply();  <== dag changes will be send to
> master,
> >>>>> and
> >>>>> >> >> master
> >>>>> >> >> > > > > >will apply these changes.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Similarly API for other functionalities such as
> locality
> >>>>> >> >>settings
> >>>>> >> >> > > > > >needs to be provided.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Provide an API callable from operator to launch a
> DAG. The
> >>>>> >> >> operator
> >>>>> >> >> > > > > >will prepare an dag object and submit it to the yarn,
> the
> >>>>> DAG
> >>>>> >> >>will
> >>>>> >> >> > be
> >>>>> >> >> > > > > >scheduled as a new application. This way complex
> schedulers
> >>>>> >> >>can be
> >>>>> >> >> > > > > >written as operators.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >public SchedulerOperator implements Operator {
> >>>>> >> >> > > > > >   void handleIdleTime() {
> >>>>> >> >> > > > > >      // check of conditions to start a job (for
> example
> >>>>> enough
> >>>>> >> >> > files
> >>>>> >> >> > > > > >available, enough items are available in kafa, or
> time has
> >>>>> >> >>reached
> >>>>> >> >> > > > > >     Dag dag = context.createDAG();
> >>>>> >> >> > > > > >     dag.addOperator();
> >>>>> >> >> > > > > >     dag.addOperator();
> >>>>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
> >>>>> >> >> > > > > >     lOptions.oldId = ""; // start for this
> checkpoint.
> >>>>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag,
> lOptions);
> >>>>> >> >> > > > > >   }
> >>>>> >> >> > > > > >}
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >DagHandler will have methods to monitor the final
> state of
> >>>>> >> >> > > > > >application, or to kill the DAG
> >>>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
> >>>>> terminates
> >>>>> >> >> > > > > >dagHandler.status()  <== get the status of
> application.
> >>>>> >> >> > > > > >dagHandler.kill() <== kill the running application.
> >>>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >The more complex Scheduler operators could be written
> to
> >>>>> manage
> >>>>> >> >> the
> >>>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
> >>>>> >> >> > > > > >
> >>>>> >> >> > > > > >Regards,
> >>>>> >> >> > > > > >-Tushar.
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > ______________________________
> __________________________
> >>>>> >> >> > > > >
> >>>>> >> >> > > > > The information contained in this e-mail is
> confidential
> >>>>> and/or
> >>>>> >> >> > > > > proprietary to Capital One and/or its affiliates and
> may
> >>>>> only be
> >>>>> >> >> used
> >>>>> >> >> > > > > solely in performance of work or services for Capital
> One.
> >>>>> The
> >>>>> >> >> > > > information
> >>>>> >> >> > > > > transmitted herewith is intended only for use by the
> >>>>> individual
> >>>>> >> >>or
> >>>>> >> >> > > entity
> >>>>> >> >> > > > > to which it is addressed. If the reader of this
> message is
> >>>>> not
> >>>>> >> >>the
> >>>>> >> >> > > > intended
> >>>>> >> >> > > > > recipient, you are hereby notified that any review,
> >>>>> >> >>retransmission,
> >>>>> >> >> > > > > dissemination, distribution, copying or other use of,
> or
> >>>>> taking
> >>>>> >> >>of
> >>>>> >> >> > any
> >>>>> >> >> > > > > action in reliance upon this information is strictly
> >>>>> >> >>prohibited. If
> >>>>> >> >> > you
> >>>>> >> >> > > > > have received this communication in error, please
> contact the
> >>>>> >> >> sender
> >>>>> >> >> > > and
> >>>>> >> >> > > > > delete the material from your computer.
> >>>>> >> >> > > > >
> >>>>> >> >> > > > >
> >>>>> >> >> > > >
> >>>>> >> >> > >
> >>>>> >> >> >
> >>>>> >> >>
> >>>>> >>
> >>>>> >> ________________________________________________________
> >>>>> >>
> >>>>> >> The information contained in this e-mail is confidential and/or
> >>>>> >> proprietary to Capital One and/or its affiliates and may only be
> used
> >>>>> >> solely in performance of work or services for Capital One. The
> >>>>> information
> >>>>> >> transmitted herewith is intended only for use by the individual or
> >>>>> entity
> >>>>> >> to which it is addressed. If the reader of this message is not the
> >>>>> intended
> >>>>> >> recipient, you are hereby notified that any review,
> retransmission,
> >>>>> >> dissemination, distribution, copying or other use of, or taking
> of any
> >>>>> >> action in reliance upon this information is strictly prohibited.
> If you
> >>>>> >> have received this communication in error, please contact the
> sender and
> >>>>> >> delete the material from your computer.
> >>>>> >>
> >>>>> ________________________________________________________
> >>>>>
> >>>>> The information contained in this e-mail is confidential and/or
> >>>>> proprietary to Capital One and/or its affiliates and may only be used
> >>>>> solely in performance of work or services for Capital One. The
> information
> >>>>> transmitted herewith is intended only for use by the individual or
> entity
> >>>>> to which it is addressed. If the reader of this message is not the
> intended
> >>>>> recipient, you are hereby notified that any review, retransmission,
> >>>>> dissemination, distribution, copying or other use of, or taking of
> any
> >>>>> action in reliance upon this information is strictly prohibited. If
> you
> >>>>> have received this communication in error, please contact the sender
> and
> >>>>> delete the material from your computer.
> >>>>>
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi All,

I have updated the design document as per review comments on pull
request (https://issues.apache.org/jira/secure/attachment/12849094/dag.docx).
Please provide your suggestion.

- Tushar.


On Fri, Sep 16, 2016 at 6:40 PM, Tushar Gosavi <tu...@datatorrent.com> wrote:
> Hi All,
>
> I have opened a review pull request for dynamic dag modification
> through stats listener (https://github.com/apache/apex-core/pull/393).
> Please review and provide
> comments/suggestions.
>
> It provides following functionality
> - StatsListener can access the opearator name for easily detecting
> which opearator stats are being processed.
> - StatsListener can create a instance of object through which it can
> submit dag modifications to the engine.
> - StatsListener can return dag changes as a response to engine.
> - PlanModifier is modified to take a DAG and apply it on the existing
> running DAG and deploy the changes.
>
> The following functionality is not working yet.
>
> - The new opearator does not start from the correct windowId
> (https://issues.apache.org/jira/browse/APEXCORE-532)
> - Relanched application failed to start when it was killed after
> dynamic dag modification.
> - There is no support for resuming operator from previous state when
> they were removed. This could be achived through
>   readig state through external storage on setup.
> - persist operator support is not present for newly added streams.
>
> The demo application using the feature is available at
> https://github.com/tushargosavi/apex-dynamic-scheduling
>
> There are two variations of WordCount application. The first variation
> detects the presence of
> new files and start a disconnected DAG to process the data.
> (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java)
>
> The second application
> (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java),
> starts with reader operator, and provides pendingFiles as auto-metric
> to stat listener. On detecting pending files it attaches splitter
> counter and output
> operator to the read operator. Once files are processed the splitter,
> counter and output operators are removed and
> added back again if new data files are added into the directory.
>
> Regards,
> -Tushar.
>
>
> On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <tu...@datatorrent.com> wrote:
>> Hi All,
>>
>> I was able to prototype an simple word count application, which will
>> start with just a single file reader operator. File reader operator
>> will emit pendingFiles as metric to StatsListener. The statslistener
>> will change DAG once enough files are available. The listener will return
>> plan change to add word splitter, counter and console operator to the
>> reader and complete the DAG for wordcount.
>>
>> After 120 windows of inactivity, the three operators will be removed
>> from DAG again. When new set of files are added these operators are
>> added back again.
>>
>> The high level proposal document:
>> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-2r8WHRjg83zEM8aqpPAXivFRQ/edit?usp=sharing
>>
>> The prototype code is at :
>> https://github.com/tushargosavi/apex-dynamic-scheduling/
>> The Application files are
>> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/FileStatListenerSameDag.java
>> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java
>>
>> Please provide your feedback.
>>
>> Some challenges yet to resolve are
>> - Restoring operator state from previously removed operator.
>> - Handling cuncurrent modifications to DAG from multiple StatsListener.
>> - Making DAG changes persistent, user should be able to restart the
>> application if application was killed with modified dag.
>>
>> Thanks,
>> -Tushar.
>>
>> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tu...@datatorrent.com> wrote:
>>> Hi All,
>>>
>>> I have dome some initial prototype which allows stat listener to
>>> specify dag changes, and the dag changes are applied asynchronously.
>>>
>>> The changes involved are
>>> - Add DagChangeSet object which is inherited from DAG, supporting
>>> methods to remove
>>>   operator and streams.
>>>
>>> - The stat listener will return this object in Response, and platform
>>> will apply changes specified in response to the DAG.
>>>
>>>
>>> The Apex changes
>>> https://github.com/apache/apex-core/compare/master...tushargosavi:scheduler?expand=1
>>>
>>> The correspondign Demo application, which one operator monitors the
>>> directory for files, and launch the wordcount DAG in
>>> same application master when files are available.
>>> https://github.com/tushargosavi/incubator-apex-malhar/tree/178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/schedular
>>>
>>> Example of stat listerner which monitors a metric and instruct master
>>> to start a dag.
>>>
>>>  /** look for more than 100 files in a directory, before lauching the DAG */
>>> @Override
>>>   public Response processStats(BatchedOperatorStats stats)
>>>   {
>>>     for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
>>>       // pendingFiles is autometric.
>>>       Integer value = (Integer)ws.metrics.get("pendingFiles");
>>>       LOG.info("stats recevied for {} pendingFiles {}",
>>> stats.getOperatorId(), value);
>>>       if (value != null  && value > 100 && !dagStarted) {
>>>         dagStarted = true;
>>>         Response resp = new Response();
>>>         resp.dag = getWordCountDag((String)ws.metrics.get("directory"));
>>>         counter = 0;
>>>         return resp;
>>>       }
>>>     }
>>>     return null;
>>>   }
>>>
>>>   DAGChangeSet getWordCountDag(String dir)
>>>     {
>>>       DAGChangeSet dag = new DAGChangeSet();
>>>       LineByLineFileInputOperator reader = dag.addOperator("Reader",
>>> new LineByLineFileInputOperator());
>>>       List<StatsListener> listeners = new ArrayList<>();
>>>       listeners.add(this);
>>>       dag.getMeta(reader).getAttributes().put(Context.OperatorContext.STATS_LISTENERS,
>>> listeners);
>>>       reader.setDirectory(dir);
>>>       LineSplitter splitter = dag.addOperator("SplitteR", new LineSplitter());
>>>       UniqueCounter<String> counter = dag.addOperator("Counter", new
>>> UniqueCounter<String>());
>>>       ConsoleOutputOperator out = dag.addOperator("Output", new
>>> ConsoleOutputOperator());
>>>       dag.addStream("s1", reader.output, splitter.input);
>>>       dag.addStream("s2", splitter.words, counter.data);
>>>       dag.addStream("s3", counter.count, out.input);
>>>       return dag;
>>>     }
>>>
>>> Let me know if this type of API is acceptable for launching the DAG.
>>> This is an API to specify DAG changes. The scheduler functionality
>>> will use
>>> this API.
>>>
>>>
>>> Regards,
>>> -Tushar.
>>>
>>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <th...@datatorrent.com> wrote:
>>>> I like the idea of keeping heavy lifting and custom code out of the master,
>>>> if possible. You find that split in responsibilities even in the case of
>>>> partitioning (Kafka connector for example). The change that requires
>>>> partitioning may be detected as byproduct of the regular processing in the
>>>> container, the information relayed to the master, the action being taken
>>>> there.
>>>>
>>>> We should separate all the different pieces and then decide where they run.
>>>> There is detecting the need for a plan change, then effecting the change
>>>> (which requires full DAG view and absolutely has to/should be in the
>>>> master).
>>>>
>>>> Thomas
>>>>
>>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
>>>> Chandni.Singh@capitalone.com> wrote:
>>>>
>>>>> We have couple of components that already run in  master - partitioners,
>>>>> stats listeners,  metrics aggregators.  The problem of crashing the master
>>>>> is not specific to just scheduler, isn't it?
>>>>> ________________________________
>>>>> From: Tushar Gosavi <tu...@datatorrent.com>
>>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM
>>>>> To: dev@apex.apache.org
>>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
>>>>> application
>>>>>
>>>>> I was thinking about avoiding running user code in master, As a crash
>>>>> in master takes down all containers with it. hence was going for
>>>>> scheduler as an operator, crash in scheduler won't kill the
>>>>> application, master can restart the scheduler back and it can start
>>>>> monitoring the job again and change the DAG when required. But this
>>>>> will require communication between master and scheduler for monitoring
>>>>> of operator status/stats.
>>>>>
>>>>> It is considerably easy to put scheduling functionality in master, as
>>>>> we have access to operator stats and there is communication channel
>>>>> already opened between master and operators. And custom scheduler can
>>>>> be written as shared stat listener, with additional API available to
>>>>> listener to add/remove/deploy/undeploy etc.. operators.
>>>>>
>>>>> Regards,
>>>>> - Tushar.
>>>>>
>>>>>
>>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <th...@datatorrent.com>
>>>>> wrote:
>>>>> > Right, if it runs in the app master and does not rely on unmanaged
>>>>> external
>>>>> > processes, then these requirements can be met.
>>>>> >
>>>>> > This capability seeks to avoid users having to deal with external
>>>>> > schedulers or workflows if all they want is to split a DAG that is
>>>>> > logically one application into multiple stages for resource optimization.
>>>>> > This is not very different from the need to have elasticity in terms of
>>>>> > partitions depending to the availability of input, as you point out.
>>>>> >
>>>>> >
>>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
>>>>> > Chandni.Singh@capitalone.com> wrote:
>>>>> >
>>>>> >> Scheduling IMO belongs to App master. Operators can influence it, for
>>>>> eg.
>>>>> >> File splitter can indicate that no more file to process.
>>>>> >>
>>>>> >> I don’t understand how that can not integrate with all the aspects-
>>>>> >> operability, fault tolerance and security.
>>>>> >>
>>>>> >> Chandni
>>>>> >>
>>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <ch...@datatorrent.com>
>>>>> wrote:
>>>>> >>
>>>>> >> >I think its a good idea to have a scheduling operator when you need to
>>>>> >> >start a part of the DAG when some trigger happens (for eg. FileSplitter
>>>>> >> >identifying new files in FS) and otherwise bring it down to save
>>>>> >> >resources.
>>>>> >> >
>>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>>>>> >> >timothytiborfarkas@gmail.com> wrote:
>>>>> >> >
>>>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an API
>>>>> >> >>completely
>>>>> >> >> independent of a DAG or an operator. It could be used by a
>>>>> commandline
>>>>> >> >>tool
>>>>> >> >> running on your laptop, a script, or it could happen to be used by an
>>>>> >> >> Operator running in a DAG and a StatsListener.
>>>>> >> >>
>>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
>>>>> thomas@datatorrent.com>
>>>>> >> >> wrote:
>>>>> >> >>
>>>>> >> >> > Scheduling can be independent, although we have use cases where the
>>>>> >> >> > scheduling depends on completion of processing (multi-staged batch
>>>>> >> >>jobs
>>>>> >> >> > where unused resources need to be freed).
>>>>> >> >> >
>>>>> >> >> > Both can be accomplished with a stats listener.
>>>>> >> >> >
>>>>> >> >> > There can be a "scheduling operator" that brings up and removes DAG
>>>>> >> >> > fragments as needed.
>>>>> >> >> >
>>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>>>>> >> >><si...@gmail.com>
>>>>> >> >> > wrote:
>>>>> >> >> >
>>>>> >> >> > > Hi,
>>>>> >> >> > > IMO scheduling a job can be independent of any operator while
>>>>> >> >> > > StatsListeners are not.  I understand that in a lot of cases
>>>>> >> >> input/output
>>>>> >> >> > > operators will decide when the job ends but there can be cases
>>>>> when
>>>>> >> >> > > scheduling can be independent of it.
>>>>> >> >> > >
>>>>> >> >> > > Thanks,
>>>>> >> >> > > Chandni
>>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <thomas@datatorrent.com
>>>>> >
>>>>> >> >> wrote:
>>>>> >> >> > >
>>>>> >> >> > > > This looks like something that coordination wise belongs into
>>>>> the
>>>>> >> >> > master
>>>>> >> >> > > > and can be done with a shared stats listener.
>>>>> >> >> > > >
>>>>> >> >> > > > The operator request/response protocol could be used the relay
>>>>> the
>>>>> >> >> data
>>>>> >> >> > > for
>>>>> >> >> > > > the scheduling decisions.
>>>>> >> >> > > >
>>>>> >> >> > > > Thomas
>>>>> >> >> > > >
>>>>> >> >> > > >
>>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>>>>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
>>>>> >> >> > > >
>>>>> >> >> > > > > Hi Tushar,
>>>>> >> >> > > > >
>>>>> >> >> > > > > I have some questions about the use case 2: Batch Support
>>>>> >> >> > > > > I don¹t understand the advantages of providing batch support
>>>>> by
>>>>> >> >> > having
>>>>> >> >> > > an
>>>>> >> >> > > > > operator as a scheduler.
>>>>> >> >> > > > >
>>>>> >> >> > > > > An approach that seemed a little more straightforward to me
>>>>> was
>>>>> >> >>to
>>>>> >> >> > > expose
>>>>> >> >> > > > > an API for scheduler. If there is a scheduler set then the
>>>>> >> >>master
>>>>> >> >> > uses
>>>>> >> >> > > > and
>>>>> >> >> > > > > schedules operators. By default there isn¹t any scheduler and
>>>>> >> >>the
>>>>> >> >> job
>>>>> >> >> > > is
>>>>> >> >> > > > > run as it is now.
>>>>> >> >> > > > >
>>>>> >> >> > > > > Maybe this is too simplistic but can you please let me know
>>>>> why
>>>>> >> >> > having
>>>>> >> >> > > an
>>>>> >> >> > > > > operator as a scheduler is a better way?
>>>>> >> >> > > > >
>>>>> >> >> > > > > Thanks,
>>>>> >> >> > > > > Chandni
>>>>> >> >> > > > >
>>>>> >> >> > > > >
>>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
>>>>> tushar@datatorrent.com>
>>>>> >> >> > wrote:
>>>>> >> >> > > > >
>>>>> >> >> > > > > >Hi All,
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >We have seen few use cases in field which require Apex
>>>>> >> >>application
>>>>> >> >> > > > > >scheduling based on some condition. This has also came up as
>>>>> >> >>part
>>>>> >> >> of
>>>>> >> >> > > > > >Batch Support in Apex previously
>>>>> >> >> > > > > >(
>>>>> >> >> > > > >
>>>>> >> >> > > >
>>>>> >> >> > >
>>>>> >> >> >
>>>>> >> >>
>>>>> >> >>
>>>>> >>
>>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>>>>> >> >> 40mail.gmail.com
>>>>> >> >> > > %3E)
>>>>> >> >> > > > > >. I am proposing following functionality in Apex to help
>>>>> >> >> scheduling
>>>>> >> >> > > > > >and better resource utilization for batch jobs. Please
>>>>> provide
>>>>> >> >> your
>>>>> >> >> > > > > >comments.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it
>>>>> is
>>>>> >> >> > > > > >desirable to return the resources to yarn when no data is
>>>>> >> >> available
>>>>> >> >> > > > > >for processing, and deploy whole DAG once data starts to
>>>>> >> >>appear.
>>>>> >> >> For
>>>>> >> >> > > > > >this to happen automatically, we will need some data
>>>>> monitoring
>>>>> >> >> > > > > >operators running in the DAG to trigger restart and
>>>>> shutdown of
>>>>> >> >> the
>>>>> >> >> > > > > >operators in the DAG.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Apex already have such api to dynamically change the running
>>>>> >> >>dag
>>>>> >> >> > > > > >through cli. We could provide similar API available to
>>>>> >> >>operators
>>>>> >> >> > which
>>>>> >> >> > > > > >will trigger dag modification at runtime. This information
>>>>> can
>>>>> >> >>be
>>>>> >> >> > > > > >passed to master using heartbeat RPC and master will make
>>>>> >> >> > > > > >required changed to the DAG. let me know what do you think
>>>>> >> >>about
>>>>> >> >> > it..
>>>>> >> >> > > > > >something like below.
>>>>> >> >> > > > > >Context.beginDagChange();
>>>>> >> >> > > > > >context.addOperator("o1") <== launch operator from previous
>>>>> >> >> > > > check-pointed
>>>>> >> >> > > > > >state.
>>>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create new
>>>>> >> >>operator
>>>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>>>>> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream
>>>>> >> >>operators
>>>>> >> >> > from
>>>>> >> >> > > > the
>>>>> >> >> > > > > >DAG.
>>>>> >> >> > > > > >context.apply();  <== dag changes will be send to master,
>>>>> and
>>>>> >> >> master
>>>>> >> >> > > > > >will apply these changes.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Similarly API for other functionalities such as locality
>>>>> >> >>settings
>>>>> >> >> > > > > >needs to be provided.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Provide an API callable from operator to launch a DAG. The
>>>>> >> >> operator
>>>>> >> >> > > > > >will prepare an dag object and submit it to the yarn, the
>>>>> DAG
>>>>> >> >>will
>>>>> >> >> > be
>>>>> >> >> > > > > >scheduled as a new application. This way complex schedulers
>>>>> >> >>can be
>>>>> >> >> > > > > >written as operators.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >public SchedulerOperator implements Operator {
>>>>> >> >> > > > > >   void handleIdleTime() {
>>>>> >> >> > > > > >      // check of conditions to start a job (for example
>>>>> enough
>>>>> >> >> > files
>>>>> >> >> > > > > >available, enough items are available in kafa, or time has
>>>>> >> >>reached
>>>>> >> >> > > > > >     Dag dag = context.createDAG();
>>>>> >> >> > > > > >     dag.addOperator();
>>>>> >> >> > > > > >     dag.addOperator();
>>>>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>>>>> >> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
>>>>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
>>>>> >> >> > > > > >   }
>>>>> >> >> > > > > >}
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >DagHandler will have methods to monitor the final state of
>>>>> >> >> > > > > >application, or to kill the DAG
>>>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
>>>>> terminates
>>>>> >> >> > > > > >dagHandler.status()  <== get the status of application.
>>>>> >> >> > > > > >dagHandler.kill() <== kill the running application.
>>>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >The more complex Scheduler operators could be written to
>>>>> manage
>>>>> >> >> the
>>>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>>>>> >> >> > > > > >
>>>>> >> >> > > > > >Regards,
>>>>> >> >> > > > > >-Tushar.
>>>>> >> >> > > > >
>>>>> >> >> > > > > ________________________________________________________
>>>>> >> >> > > > >
>>>>> >> >> > > > > The information contained in this e-mail is confidential
>>>>> and/or
>>>>> >> >> > > > > proprietary to Capital One and/or its affiliates and may
>>>>> only be
>>>>> >> >> used
>>>>> >> >> > > > > solely in performance of work or services for Capital One.
>>>>> The
>>>>> >> >> > > > information
>>>>> >> >> > > > > transmitted herewith is intended only for use by the
>>>>> individual
>>>>> >> >>or
>>>>> >> >> > > entity
>>>>> >> >> > > > > to which it is addressed. If the reader of this message is
>>>>> not
>>>>> >> >>the
>>>>> >> >> > > > intended
>>>>> >> >> > > > > recipient, you are hereby notified that any review,
>>>>> >> >>retransmission,
>>>>> >> >> > > > > dissemination, distribution, copying or other use of, or
>>>>> taking
>>>>> >> >>of
>>>>> >> >> > any
>>>>> >> >> > > > > action in reliance upon this information is strictly
>>>>> >> >>prohibited. If
>>>>> >> >> > you
>>>>> >> >> > > > > have received this communication in error, please contact the
>>>>> >> >> sender
>>>>> >> >> > > and
>>>>> >> >> > > > > delete the material from your computer.
>>>>> >> >> > > > >
>>>>> >> >> > > > >
>>>>> >> >> > > >
>>>>> >> >> > >
>>>>> >> >> >
>>>>> >> >>
>>>>> >>
>>>>> >> ________________________________________________________
>>>>> >>
>>>>> >> The information contained in this e-mail is confidential and/or
>>>>> >> proprietary to Capital One and/or its affiliates and may only be used
>>>>> >> solely in performance of work or services for Capital One. The
>>>>> information
>>>>> >> transmitted herewith is intended only for use by the individual or
>>>>> entity
>>>>> >> to which it is addressed. If the reader of this message is not the
>>>>> intended
>>>>> >> recipient, you are hereby notified that any review, retransmission,
>>>>> >> dissemination, distribution, copying or other use of, or taking of any
>>>>> >> action in reliance upon this information is strictly prohibited. If you
>>>>> >> have received this communication in error, please contact the sender and
>>>>> >> delete the material from your computer.
>>>>> >>
>>>>> ________________________________________________________
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>> solely in performance of work or services for Capital One. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed. If the reader of this message is not the intended
>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>> have received this communication in error, please contact the sender and
>>>>> delete the material from your computer.
>>>>>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi All,

I have opened a review pull request for dynamic dag modification
through stats listener (https://github.com/apache/apex-core/pull/393).
Please review and provide
comments/suggestions.

It provides following functionality
- StatsListener can access the opearator name for easily detecting
which opearator stats are being processed.
- StatsListener can create a instance of object through which it can
submit dag modifications to the engine.
- StatsListener can return dag changes as a response to engine.
- PlanModifier is modified to take a DAG and apply it on the existing
running DAG and deploy the changes.

The following functionality is not working yet.

- The new opearator does not start from the correct windowId
(https://issues.apache.org/jira/browse/APEXCORE-532)
- Relanched application failed to start when it was killed after
dynamic dag modification.
- There is no support for resuming operator from previous state when
they were removed. This could be achived through
  readig state through external storage on setup.
- persist operator support is not present for newly added streams.

The demo application using the feature is available at
https://github.com/tushargosavi/apex-dynamic-scheduling

There are two variations of WordCount application. The first variation
detects the presence of
new files and start a disconnected DAG to process the data.
(https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java)

The second application
(https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java),
starts with reader operator, and provides pendingFiles as auto-metric
to stat listener. On detecting pending files it attaches splitter
counter and output
operator to the read operator. Once files are processed the splitter,
counter and output operators are removed and
added back again if new data files are added into the directory.

Regards,
-Tushar.


On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <tu...@datatorrent.com> wrote:
> Hi All,
>
> I was able to prototype an simple word count application, which will
> start with just a single file reader operator. File reader operator
> will emit pendingFiles as metric to StatsListener. The statslistener
> will change DAG once enough files are available. The listener will return
> plan change to add word splitter, counter and console operator to the
> reader and complete the DAG for wordcount.
>
> After 120 windows of inactivity, the three operators will be removed
> from DAG again. When new set of files are added these operators are
> added back again.
>
> The high level proposal document:
> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-2r8WHRjg83zEM8aqpPAXivFRQ/edit?usp=sharing
>
> The prototype code is at :
> https://github.com/tushargosavi/apex-dynamic-scheduling/
> The Application files are
> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/FileStatListenerSameDag.java
> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java
>
> Please provide your feedback.
>
> Some challenges yet to resolve are
> - Restoring operator state from previously removed operator.
> - Handling cuncurrent modifications to DAG from multiple StatsListener.
> - Making DAG changes persistent, user should be able to restart the
> application if application was killed with modified dag.
>
> Thanks,
> -Tushar.
>
> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tu...@datatorrent.com> wrote:
>> Hi All,
>>
>> I have dome some initial prototype which allows stat listener to
>> specify dag changes, and the dag changes are applied asynchronously.
>>
>> The changes involved are
>> - Add DagChangeSet object which is inherited from DAG, supporting
>> methods to remove
>>   operator and streams.
>>
>> - The stat listener will return this object in Response, and platform
>> will apply changes specified in response to the DAG.
>>
>>
>> The Apex changes
>> https://github.com/apache/apex-core/compare/master...tushargosavi:scheduler?expand=1
>>
>> The correspondign Demo application, which one operator monitors the
>> directory for files, and launch the wordcount DAG in
>> same application master when files are available.
>> https://github.com/tushargosavi/incubator-apex-malhar/tree/178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/schedular
>>
>> Example of stat listerner which monitors a metric and instruct master
>> to start a dag.
>>
>>  /** look for more than 100 files in a directory, before lauching the DAG */
>> @Override
>>   public Response processStats(BatchedOperatorStats stats)
>>   {
>>     for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
>>       // pendingFiles is autometric.
>>       Integer value = (Integer)ws.metrics.get("pendingFiles");
>>       LOG.info("stats recevied for {} pendingFiles {}",
>> stats.getOperatorId(), value);
>>       if (value != null  && value > 100 && !dagStarted) {
>>         dagStarted = true;
>>         Response resp = new Response();
>>         resp.dag = getWordCountDag((String)ws.metrics.get("directory"));
>>         counter = 0;
>>         return resp;
>>       }
>>     }
>>     return null;
>>   }
>>
>>   DAGChangeSet getWordCountDag(String dir)
>>     {
>>       DAGChangeSet dag = new DAGChangeSet();
>>       LineByLineFileInputOperator reader = dag.addOperator("Reader",
>> new LineByLineFileInputOperator());
>>       List<StatsListener> listeners = new ArrayList<>();
>>       listeners.add(this);
>>       dag.getMeta(reader).getAttributes().put(Context.OperatorContext.STATS_LISTENERS,
>> listeners);
>>       reader.setDirectory(dir);
>>       LineSplitter splitter = dag.addOperator("SplitteR", new LineSplitter());
>>       UniqueCounter<String> counter = dag.addOperator("Counter", new
>> UniqueCounter<String>());
>>       ConsoleOutputOperator out = dag.addOperator("Output", new
>> ConsoleOutputOperator());
>>       dag.addStream("s1", reader.output, splitter.input);
>>       dag.addStream("s2", splitter.words, counter.data);
>>       dag.addStream("s3", counter.count, out.input);
>>       return dag;
>>     }
>>
>> Let me know if this type of API is acceptable for launching the DAG.
>> This is an API to specify DAG changes. The scheduler functionality
>> will use
>> this API.
>>
>>
>> Regards,
>> -Tushar.
>>
>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <th...@datatorrent.com> wrote:
>>> I like the idea of keeping heavy lifting and custom code out of the master,
>>> if possible. You find that split in responsibilities even in the case of
>>> partitioning (Kafka connector for example). The change that requires
>>> partitioning may be detected as byproduct of the regular processing in the
>>> container, the information relayed to the master, the action being taken
>>> there.
>>>
>>> We should separate all the different pieces and then decide where they run.
>>> There is detecting the need for a plan change, then effecting the change
>>> (which requires full DAG view and absolutely has to/should be in the
>>> master).
>>>
>>> Thomas
>>>
>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
>>> Chandni.Singh@capitalone.com> wrote:
>>>
>>>> We have couple of components that already run in  master - partitioners,
>>>> stats listeners,  metrics aggregators.  The problem of crashing the master
>>>> is not specific to just scheduler, isn't it?
>>>> ________________________________
>>>> From: Tushar Gosavi <tu...@datatorrent.com>
>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM
>>>> To: dev@apex.apache.org
>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
>>>> application
>>>>
>>>> I was thinking about avoiding running user code in master, As a crash
>>>> in master takes down all containers with it. hence was going for
>>>> scheduler as an operator, crash in scheduler won't kill the
>>>> application, master can restart the scheduler back and it can start
>>>> monitoring the job again and change the DAG when required. But this
>>>> will require communication between master and scheduler for monitoring
>>>> of operator status/stats.
>>>>
>>>> It is considerably easy to put scheduling functionality in master, as
>>>> we have access to operator stats and there is communication channel
>>>> already opened between master and operators. And custom scheduler can
>>>> be written as shared stat listener, with additional API available to
>>>> listener to add/remove/deploy/undeploy etc.. operators.
>>>>
>>>> Regards,
>>>> - Tushar.
>>>>
>>>>
>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <th...@datatorrent.com>
>>>> wrote:
>>>> > Right, if it runs in the app master and does not rely on unmanaged
>>>> external
>>>> > processes, then these requirements can be met.
>>>> >
>>>> > This capability seeks to avoid users having to deal with external
>>>> > schedulers or workflows if all they want is to split a DAG that is
>>>> > logically one application into multiple stages for resource optimization.
>>>> > This is not very different from the need to have elasticity in terms of
>>>> > partitions depending to the availability of input, as you point out.
>>>> >
>>>> >
>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
>>>> > Chandni.Singh@capitalone.com> wrote:
>>>> >
>>>> >> Scheduling IMO belongs to App master. Operators can influence it, for
>>>> eg.
>>>> >> File splitter can indicate that no more file to process.
>>>> >>
>>>> >> I don’t understand how that can not integrate with all the aspects-
>>>> >> operability, fault tolerance and security.
>>>> >>
>>>> >> Chandni
>>>> >>
>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <ch...@datatorrent.com>
>>>> wrote:
>>>> >>
>>>> >> >I think its a good idea to have a scheduling operator when you need to
>>>> >> >start a part of the DAG when some trigger happens (for eg. FileSplitter
>>>> >> >identifying new files in FS) and otherwise bring it down to save
>>>> >> >resources.
>>>> >> >
>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>>>> >> >timothytiborfarkas@gmail.com> wrote:
>>>> >> >
>>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an API
>>>> >> >>completely
>>>> >> >> independent of a DAG or an operator. It could be used by a
>>>> commandline
>>>> >> >>tool
>>>> >> >> running on your laptop, a script, or it could happen to be used by an
>>>> >> >> Operator running in a DAG and a StatsListener.
>>>> >> >>
>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
>>>> thomas@datatorrent.com>
>>>> >> >> wrote:
>>>> >> >>
>>>> >> >> > Scheduling can be independent, although we have use cases where the
>>>> >> >> > scheduling depends on completion of processing (multi-staged batch
>>>> >> >>jobs
>>>> >> >> > where unused resources need to be freed).
>>>> >> >> >
>>>> >> >> > Both can be accomplished with a stats listener.
>>>> >> >> >
>>>> >> >> > There can be a "scheduling operator" that brings up and removes DAG
>>>> >> >> > fragments as needed.
>>>> >> >> >
>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>>>> >> >><si...@gmail.com>
>>>> >> >> > wrote:
>>>> >> >> >
>>>> >> >> > > Hi,
>>>> >> >> > > IMO scheduling a job can be independent of any operator while
>>>> >> >> > > StatsListeners are not.  I understand that in a lot of cases
>>>> >> >> input/output
>>>> >> >> > > operators will decide when the job ends but there can be cases
>>>> when
>>>> >> >> > > scheduling can be independent of it.
>>>> >> >> > >
>>>> >> >> > > Thanks,
>>>> >> >> > > Chandni
>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <thomas@datatorrent.com
>>>> >
>>>> >> >> wrote:
>>>> >> >> > >
>>>> >> >> > > > This looks like something that coordination wise belongs into
>>>> the
>>>> >> >> > master
>>>> >> >> > > > and can be done with a shared stats listener.
>>>> >> >> > > >
>>>> >> >> > > > The operator request/response protocol could be used the relay
>>>> the
>>>> >> >> data
>>>> >> >> > > for
>>>> >> >> > > > the scheduling decisions.
>>>> >> >> > > >
>>>> >> >> > > > Thomas
>>>> >> >> > > >
>>>> >> >> > > >
>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>>>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
>>>> >> >> > > >
>>>> >> >> > > > > Hi Tushar,
>>>> >> >> > > > >
>>>> >> >> > > > > I have some questions about the use case 2: Batch Support
>>>> >> >> > > > > I don¹t understand the advantages of providing batch support
>>>> by
>>>> >> >> > having
>>>> >> >> > > an
>>>> >> >> > > > > operator as a scheduler.
>>>> >> >> > > > >
>>>> >> >> > > > > An approach that seemed a little more straightforward to me
>>>> was
>>>> >> >>to
>>>> >> >> > > expose
>>>> >> >> > > > > an API for scheduler. If there is a scheduler set then the
>>>> >> >>master
>>>> >> >> > uses
>>>> >> >> > > > and
>>>> >> >> > > > > schedules operators. By default there isn¹t any scheduler and
>>>> >> >>the
>>>> >> >> job
>>>> >> >> > > is
>>>> >> >> > > > > run as it is now.
>>>> >> >> > > > >
>>>> >> >> > > > > Maybe this is too simplistic but can you please let me know
>>>> why
>>>> >> >> > having
>>>> >> >> > > an
>>>> >> >> > > > > operator as a scheduler is a better way?
>>>> >> >> > > > >
>>>> >> >> > > > > Thanks,
>>>> >> >> > > > > Chandni
>>>> >> >> > > > >
>>>> >> >> > > > >
>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
>>>> tushar@datatorrent.com>
>>>> >> >> > wrote:
>>>> >> >> > > > >
>>>> >> >> > > > > >Hi All,
>>>> >> >> > > > > >
>>>> >> >> > > > > >We have seen few use cases in field which require Apex
>>>> >> >>application
>>>> >> >> > > > > >scheduling based on some condition. This has also came up as
>>>> >> >>part
>>>> >> >> of
>>>> >> >> > > > > >Batch Support in Apex previously
>>>> >> >> > > > > >(
>>>> >> >> > > > >
>>>> >> >> > > >
>>>> >> >> > >
>>>> >> >> >
>>>> >> >>
>>>> >> >>
>>>> >>
>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>>>> >> >> 40mail.gmail.com
>>>> >> >> > > %3E)
>>>> >> >> > > > > >. I am proposing following functionality in Apex to help
>>>> >> >> scheduling
>>>> >> >> > > > > >and better resource utilization for batch jobs. Please
>>>> provide
>>>> >> >> your
>>>> >> >> > > > > >comments.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it
>>>> is
>>>> >> >> > > > > >desirable to return the resources to yarn when no data is
>>>> >> >> available
>>>> >> >> > > > > >for processing, and deploy whole DAG once data starts to
>>>> >> >>appear.
>>>> >> >> For
>>>> >> >> > > > > >this to happen automatically, we will need some data
>>>> monitoring
>>>> >> >> > > > > >operators running in the DAG to trigger restart and
>>>> shutdown of
>>>> >> >> the
>>>> >> >> > > > > >operators in the DAG.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Apex already have such api to dynamically change the running
>>>> >> >>dag
>>>> >> >> > > > > >through cli. We could provide similar API available to
>>>> >> >>operators
>>>> >> >> > which
>>>> >> >> > > > > >will trigger dag modification at runtime. This information
>>>> can
>>>> >> >>be
>>>> >> >> > > > > >passed to master using heartbeat RPC and master will make
>>>> >> >> > > > > >required changed to the DAG. let me know what do you think
>>>> >> >>about
>>>> >> >> > it..
>>>> >> >> > > > > >something like below.
>>>> >> >> > > > > >Context.beginDagChange();
>>>> >> >> > > > > >context.addOperator("o1") <== launch operator from previous
>>>> >> >> > > > check-pointed
>>>> >> >> > > > > >state.
>>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create new
>>>> >> >>operator
>>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>>>> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream
>>>> >> >>operators
>>>> >> >> > from
>>>> >> >> > > > the
>>>> >> >> > > > > >DAG.
>>>> >> >> > > > > >context.apply();  <== dag changes will be send to master,
>>>> and
>>>> >> >> master
>>>> >> >> > > > > >will apply these changes.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Similarly API for other functionalities such as locality
>>>> >> >>settings
>>>> >> >> > > > > >needs to be provided.
>>>> >> >> > > > > >
>>>> >> >> > > > > >
>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Provide an API callable from operator to launch a DAG. The
>>>> >> >> operator
>>>> >> >> > > > > >will prepare an dag object and submit it to the yarn, the
>>>> DAG
>>>> >> >>will
>>>> >> >> > be
>>>> >> >> > > > > >scheduled as a new application. This way complex schedulers
>>>> >> >>can be
>>>> >> >> > > > > >written as operators.
>>>> >> >> > > > > >
>>>> >> >> > > > > >public SchedulerOperator implements Operator {
>>>> >> >> > > > > >   void handleIdleTime() {
>>>> >> >> > > > > >      // check of conditions to start a job (for example
>>>> enough
>>>> >> >> > files
>>>> >> >> > > > > >available, enough items are available in kafa, or time has
>>>> >> >>reached
>>>> >> >> > > > > >     Dag dag = context.createDAG();
>>>> >> >> > > > > >     dag.addOperator();
>>>> >> >> > > > > >     dag.addOperator();
>>>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>>>> >> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
>>>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
>>>> >> >> > > > > >   }
>>>> >> >> > > > > >}
>>>> >> >> > > > > >
>>>> >> >> > > > > >DagHandler will have methods to monitor the final state of
>>>> >> >> > > > > >application, or to kill the DAG
>>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
>>>> terminates
>>>> >> >> > > > > >dagHandler.status()  <== get the status of application.
>>>> >> >> > > > > >dagHandler.kill() <== kill the running application.
>>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>>>> >> >> > > > > >
>>>> >> >> > > > > >The more complex Scheduler operators could be written to
>>>> manage
>>>> >> >> the
>>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Regards,
>>>> >> >> > > > > >-Tushar.
>>>> >> >> > > > >
>>>> >> >> > > > > ________________________________________________________
>>>> >> >> > > > >
>>>> >> >> > > > > The information contained in this e-mail is confidential
>>>> and/or
>>>> >> >> > > > > proprietary to Capital One and/or its affiliates and may
>>>> only be
>>>> >> >> used
>>>> >> >> > > > > solely in performance of work or services for Capital One.
>>>> The
>>>> >> >> > > > information
>>>> >> >> > > > > transmitted herewith is intended only for use by the
>>>> individual
>>>> >> >>or
>>>> >> >> > > entity
>>>> >> >> > > > > to which it is addressed. If the reader of this message is
>>>> not
>>>> >> >>the
>>>> >> >> > > > intended
>>>> >> >> > > > > recipient, you are hereby notified that any review,
>>>> >> >>retransmission,
>>>> >> >> > > > > dissemination, distribution, copying or other use of, or
>>>> taking
>>>> >> >>of
>>>> >> >> > any
>>>> >> >> > > > > action in reliance upon this information is strictly
>>>> >> >>prohibited. If
>>>> >> >> > you
>>>> >> >> > > > > have received this communication in error, please contact the
>>>> >> >> sender
>>>> >> >> > > and
>>>> >> >> > > > > delete the material from your computer.
>>>> >> >> > > > >
>>>> >> >> > > > >
>>>> >> >> > > >
>>>> >> >> > >
>>>> >> >> >
>>>> >> >>
>>>> >>
>>>> >> ________________________________________________________
>>>> >>
>>>> >> The information contained in this e-mail is confidential and/or
>>>> >> proprietary to Capital One and/or its affiliates and may only be used
>>>> >> solely in performance of work or services for Capital One. The
>>>> information
>>>> >> transmitted herewith is intended only for use by the individual or
>>>> entity
>>>> >> to which it is addressed. If the reader of this message is not the
>>>> intended
>>>> >> recipient, you are hereby notified that any review, retransmission,
>>>> >> dissemination, distribution, copying or other use of, or taking of any
>>>> >> action in reliance upon this information is strictly prohibited. If you
>>>> >> have received this communication in error, please contact the sender and
>>>> >> delete the material from your computer.
>>>> >>
>>>> ________________________________________________________
>>>>
>>>> The information contained in this e-mail is confidential and/or
>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>> solely in performance of work or services for Capital One. The information
>>>> transmitted herewith is intended only for use by the individual or entity
>>>> to which it is addressed. If the reader of this message is not the intended
>>>> recipient, you are hereby notified that any review, retransmission,
>>>> dissemination, distribution, copying or other use of, or taking of any
>>>> action in reliance upon this information is strictly prohibited. If you
>>>> have received this communication in error, please contact the sender and
>>>> delete the material from your computer.
>>>>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi All,

I was able to prototype an simple word count application, which will
start with just a single file reader operator. File reader operator
will emit pendingFiles as metric to StatsListener. The statslistener
will change DAG once enough files are available. The listener will return
plan change to add word splitter, counter and console operator to the
reader and complete the DAG for wordcount.

After 120 windows of inactivity, the three operators will be removed
from DAG again. When new set of files are added these operators are
added back again.

The high level proposal document:
https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-2r8WHRjg83zEM8aqpPAXivFRQ/edit?usp=sharing

The prototype code is at :
https://github.com/tushargosavi/apex-dynamic-scheduling/
The Application files are
https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/FileStatListenerSameDag.java
https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java

Please provide your feedback.

Some challenges yet to resolve are
- Restoring operator state from previously removed operator.
- Handling cuncurrent modifications to DAG from multiple StatsListener.
- Making DAG changes persistent, user should be able to restart the
application if application was killed with modified dag.

Thanks,
-Tushar.

On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tu...@datatorrent.com> wrote:
> Hi All,
>
> I have dome some initial prototype which allows stat listener to
> specify dag changes, and the dag changes are applied asynchronously.
>
> The changes involved are
> - Add DagChangeSet object which is inherited from DAG, supporting
> methods to remove
>   operator and streams.
>
> - The stat listener will return this object in Response, and platform
> will apply changes specified in response to the DAG.
>
>
> The Apex changes
> https://github.com/apache/apex-core/compare/master...tushargosavi:scheduler?expand=1
>
> The correspondign Demo application, which one operator monitors the
> directory for files, and launch the wordcount DAG in
> same application master when files are available.
> https://github.com/tushargosavi/incubator-apex-malhar/tree/178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/schedular
>
> Example of stat listerner which monitors a metric and instruct master
> to start a dag.
>
>  /** look for more than 100 files in a directory, before lauching the DAG */
> @Override
>   public Response processStats(BatchedOperatorStats stats)
>   {
>     for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
>       // pendingFiles is autometric.
>       Integer value = (Integer)ws.metrics.get("pendingFiles");
>       LOG.info("stats recevied for {} pendingFiles {}",
> stats.getOperatorId(), value);
>       if (value != null  && value > 100 && !dagStarted) {
>         dagStarted = true;
>         Response resp = new Response();
>         resp.dag = getWordCountDag((String)ws.metrics.get("directory"));
>         counter = 0;
>         return resp;
>       }
>     }
>     return null;
>   }
>
>   DAGChangeSet getWordCountDag(String dir)
>     {
>       DAGChangeSet dag = new DAGChangeSet();
>       LineByLineFileInputOperator reader = dag.addOperator("Reader",
> new LineByLineFileInputOperator());
>       List<StatsListener> listeners = new ArrayList<>();
>       listeners.add(this);
>       dag.getMeta(reader).getAttributes().put(Context.OperatorContext.STATS_LISTENERS,
> listeners);
>       reader.setDirectory(dir);
>       LineSplitter splitter = dag.addOperator("SplitteR", new LineSplitter());
>       UniqueCounter<String> counter = dag.addOperator("Counter", new
> UniqueCounter<String>());
>       ConsoleOutputOperator out = dag.addOperator("Output", new
> ConsoleOutputOperator());
>       dag.addStream("s1", reader.output, splitter.input);
>       dag.addStream("s2", splitter.words, counter.data);
>       dag.addStream("s3", counter.count, out.input);
>       return dag;
>     }
>
> Let me know if this type of API is acceptable for launching the DAG.
> This is an API to specify DAG changes. The scheduler functionality
> will use
> this API.
>
>
> Regards,
> -Tushar.
>
> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <th...@datatorrent.com> wrote:
>> I like the idea of keeping heavy lifting and custom code out of the master,
>> if possible. You find that split in responsibilities even in the case of
>> partitioning (Kafka connector for example). The change that requires
>> partitioning may be detected as byproduct of the regular processing in the
>> container, the information relayed to the master, the action being taken
>> there.
>>
>> We should separate all the different pieces and then decide where they run.
>> There is detecting the need for a plan change, then effecting the change
>> (which requires full DAG view and absolutely has to/should be in the
>> master).
>>
>> Thomas
>>
>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
>> Chandni.Singh@capitalone.com> wrote:
>>
>>> We have couple of components that already run in  master - partitioners,
>>> stats listeners,  metrics aggregators.  The problem of crashing the master
>>> is not specific to just scheduler, isn't it?
>>> ________________________________
>>> From: Tushar Gosavi <tu...@datatorrent.com>
>>> Sent: Wednesday, June 22, 2016 2:32:39 PM
>>> To: dev@apex.apache.org
>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
>>> application
>>>
>>> I was thinking about avoiding running user code in master, As a crash
>>> in master takes down all containers with it. hence was going for
>>> scheduler as an operator, crash in scheduler won't kill the
>>> application, master can restart the scheduler back and it can start
>>> monitoring the job again and change the DAG when required. But this
>>> will require communication between master and scheduler for monitoring
>>> of operator status/stats.
>>>
>>> It is considerably easy to put scheduling functionality in master, as
>>> we have access to operator stats and there is communication channel
>>> already opened between master and operators. And custom scheduler can
>>> be written as shared stat listener, with additional API available to
>>> listener to add/remove/deploy/undeploy etc.. operators.
>>>
>>> Regards,
>>> - Tushar.
>>>
>>>
>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <th...@datatorrent.com>
>>> wrote:
>>> > Right, if it runs in the app master and does not rely on unmanaged
>>> external
>>> > processes, then these requirements can be met.
>>> >
>>> > This capability seeks to avoid users having to deal with external
>>> > schedulers or workflows if all they want is to split a DAG that is
>>> > logically one application into multiple stages for resource optimization.
>>> > This is not very different from the need to have elasticity in terms of
>>> > partitions depending to the availability of input, as you point out.
>>> >
>>> >
>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
>>> > Chandni.Singh@capitalone.com> wrote:
>>> >
>>> >> Scheduling IMO belongs to App master. Operators can influence it, for
>>> eg.
>>> >> File splitter can indicate that no more file to process.
>>> >>
>>> >> I don’t understand how that can not integrate with all the aspects-
>>> >> operability, fault tolerance and security.
>>> >>
>>> >> Chandni
>>> >>
>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <ch...@datatorrent.com>
>>> wrote:
>>> >>
>>> >> >I think its a good idea to have a scheduling operator when you need to
>>> >> >start a part of the DAG when some trigger happens (for eg. FileSplitter
>>> >> >identifying new files in FS) and otherwise bring it down to save
>>> >> >resources.
>>> >> >
>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>>> >> >timothytiborfarkas@gmail.com> wrote:
>>> >> >
>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an API
>>> >> >>completely
>>> >> >> independent of a DAG or an operator. It could be used by a
>>> commandline
>>> >> >>tool
>>> >> >> running on your laptop, a script, or it could happen to be used by an
>>> >> >> Operator running in a DAG and a StatsListener.
>>> >> >>
>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
>>> thomas@datatorrent.com>
>>> >> >> wrote:
>>> >> >>
>>> >> >> > Scheduling can be independent, although we have use cases where the
>>> >> >> > scheduling depends on completion of processing (multi-staged batch
>>> >> >>jobs
>>> >> >> > where unused resources need to be freed).
>>> >> >> >
>>> >> >> > Both can be accomplished with a stats listener.
>>> >> >> >
>>> >> >> > There can be a "scheduling operator" that brings up and removes DAG
>>> >> >> > fragments as needed.
>>> >> >> >
>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>>> >> >><si...@gmail.com>
>>> >> >> > wrote:
>>> >> >> >
>>> >> >> > > Hi,
>>> >> >> > > IMO scheduling a job can be independent of any operator while
>>> >> >> > > StatsListeners are not.  I understand that in a lot of cases
>>> >> >> input/output
>>> >> >> > > operators will decide when the job ends but there can be cases
>>> when
>>> >> >> > > scheduling can be independent of it.
>>> >> >> > >
>>> >> >> > > Thanks,
>>> >> >> > > Chandni
>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <thomas@datatorrent.com
>>> >
>>> >> >> wrote:
>>> >> >> > >
>>> >> >> > > > This looks like something that coordination wise belongs into
>>> the
>>> >> >> > master
>>> >> >> > > > and can be done with a shared stats listener.
>>> >> >> > > >
>>> >> >> > > > The operator request/response protocol could be used the relay
>>> the
>>> >> >> data
>>> >> >> > > for
>>> >> >> > > > the scheduling decisions.
>>> >> >> > > >
>>> >> >> > > > Thomas
>>> >> >> > > >
>>> >> >> > > >
>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
>>> >> >> > > >
>>> >> >> > > > > Hi Tushar,
>>> >> >> > > > >
>>> >> >> > > > > I have some questions about the use case 2: Batch Support
>>> >> >> > > > > I don¹t understand the advantages of providing batch support
>>> by
>>> >> >> > having
>>> >> >> > > an
>>> >> >> > > > > operator as a scheduler.
>>> >> >> > > > >
>>> >> >> > > > > An approach that seemed a little more straightforward to me
>>> was
>>> >> >>to
>>> >> >> > > expose
>>> >> >> > > > > an API for scheduler. If there is a scheduler set then the
>>> >> >>master
>>> >> >> > uses
>>> >> >> > > > and
>>> >> >> > > > > schedules operators. By default there isn¹t any scheduler and
>>> >> >>the
>>> >> >> job
>>> >> >> > > is
>>> >> >> > > > > run as it is now.
>>> >> >> > > > >
>>> >> >> > > > > Maybe this is too simplistic but can you please let me know
>>> why
>>> >> >> > having
>>> >> >> > > an
>>> >> >> > > > > operator as a scheduler is a better way?
>>> >> >> > > > >
>>> >> >> > > > > Thanks,
>>> >> >> > > > > Chandni
>>> >> >> > > > >
>>> >> >> > > > >
>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
>>> tushar@datatorrent.com>
>>> >> >> > wrote:
>>> >> >> > > > >
>>> >> >> > > > > >Hi All,
>>> >> >> > > > > >
>>> >> >> > > > > >We have seen few use cases in field which require Apex
>>> >> >>application
>>> >> >> > > > > >scheduling based on some condition. This has also came up as
>>> >> >>part
>>> >> >> of
>>> >> >> > > > > >Batch Support in Apex previously
>>> >> >> > > > > >(
>>> >> >> > > > >
>>> >> >> > > >
>>> >> >> > >
>>> >> >> >
>>> >> >>
>>> >> >>
>>> >>
>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>>> >> >> 40mail.gmail.com
>>> >> >> > > %3E)
>>> >> >> > > > > >. I am proposing following functionality in Apex to help
>>> >> >> scheduling
>>> >> >> > > > > >and better resource utilization for batch jobs. Please
>>> provide
>>> >> >> your
>>> >> >> > > > > >comments.
>>> >> >> > > > > >
>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
>>> >> >> > > > > >
>>> >> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it
>>> is
>>> >> >> > > > > >desirable to return the resources to yarn when no data is
>>> >> >> available
>>> >> >> > > > > >for processing, and deploy whole DAG once data starts to
>>> >> >>appear.
>>> >> >> For
>>> >> >> > > > > >this to happen automatically, we will need some data
>>> monitoring
>>> >> >> > > > > >operators running in the DAG to trigger restart and
>>> shutdown of
>>> >> >> the
>>> >> >> > > > > >operators in the DAG.
>>> >> >> > > > > >
>>> >> >> > > > > >Apex already have such api to dynamically change the running
>>> >> >>dag
>>> >> >> > > > > >through cli. We could provide similar API available to
>>> >> >>operators
>>> >> >> > which
>>> >> >> > > > > >will trigger dag modification at runtime. This information
>>> can
>>> >> >>be
>>> >> >> > > > > >passed to master using heartbeat RPC and master will make
>>> >> >> > > > > >required changed to the DAG. let me know what do you think
>>> >> >>about
>>> >> >> > it..
>>> >> >> > > > > >something like below.
>>> >> >> > > > > >Context.beginDagChange();
>>> >> >> > > > > >context.addOperator("o1") <== launch operator from previous
>>> >> >> > > > check-pointed
>>> >> >> > > > > >state.
>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create new
>>> >> >>operator
>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>>> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream
>>> >> >>operators
>>> >> >> > from
>>> >> >> > > > the
>>> >> >> > > > > >DAG.
>>> >> >> > > > > >context.apply();  <== dag changes will be send to master,
>>> and
>>> >> >> master
>>> >> >> > > > > >will apply these changes.
>>> >> >> > > > > >
>>> >> >> > > > > >Similarly API for other functionalities such as locality
>>> >> >>settings
>>> >> >> > > > > >needs to be provided.
>>> >> >> > > > > >
>>> >> >> > > > > >
>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>>> >> >> > > > > >
>>> >> >> > > > > >Provide an API callable from operator to launch a DAG. The
>>> >> >> operator
>>> >> >> > > > > >will prepare an dag object and submit it to the yarn, the
>>> DAG
>>> >> >>will
>>> >> >> > be
>>> >> >> > > > > >scheduled as a new application. This way complex schedulers
>>> >> >>can be
>>> >> >> > > > > >written as operators.
>>> >> >> > > > > >
>>> >> >> > > > > >public SchedulerOperator implements Operator {
>>> >> >> > > > > >   void handleIdleTime() {
>>> >> >> > > > > >      // check of conditions to start a job (for example
>>> enough
>>> >> >> > files
>>> >> >> > > > > >available, enough items are available in kafa, or time has
>>> >> >>reached
>>> >> >> > > > > >     Dag dag = context.createDAG();
>>> >> >> > > > > >     dag.addOperator();
>>> >> >> > > > > >     dag.addOperator();
>>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>>> >> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
>>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
>>> >> >> > > > > >   }
>>> >> >> > > > > >}
>>> >> >> > > > > >
>>> >> >> > > > > >DagHandler will have methods to monitor the final state of
>>> >> >> > > > > >application, or to kill the DAG
>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
>>> terminates
>>> >> >> > > > > >dagHandler.status()  <== get the status of application.
>>> >> >> > > > > >dagHandler.kill() <== kill the running application.
>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>>> >> >> > > > > >
>>> >> >> > > > > >The more complex Scheduler operators could be written to
>>> manage
>>> >> >> the
>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>>> >> >> > > > > >
>>> >> >> > > > > >Regards,
>>> >> >> > > > > >-Tushar.
>>> >> >> > > > >
>>> >> >> > > > > ________________________________________________________
>>> >> >> > > > >
>>> >> >> > > > > The information contained in this e-mail is confidential
>>> and/or
>>> >> >> > > > > proprietary to Capital One and/or its affiliates and may
>>> only be
>>> >> >> used
>>> >> >> > > > > solely in performance of work or services for Capital One.
>>> The
>>> >> >> > > > information
>>> >> >> > > > > transmitted herewith is intended only for use by the
>>> individual
>>> >> >>or
>>> >> >> > > entity
>>> >> >> > > > > to which it is addressed. If the reader of this message is
>>> not
>>> >> >>the
>>> >> >> > > > intended
>>> >> >> > > > > recipient, you are hereby notified that any review,
>>> >> >>retransmission,
>>> >> >> > > > > dissemination, distribution, copying or other use of, or
>>> taking
>>> >> >>of
>>> >> >> > any
>>> >> >> > > > > action in reliance upon this information is strictly
>>> >> >>prohibited. If
>>> >> >> > you
>>> >> >> > > > > have received this communication in error, please contact the
>>> >> >> sender
>>> >> >> > > and
>>> >> >> > > > > delete the material from your computer.
>>> >> >> > > > >
>>> >> >> > > > >
>>> >> >> > > >
>>> >> >> > >
>>> >> >> >
>>> >> >>
>>> >>
>>> >> ________________________________________________________
>>> >>
>>> >> The information contained in this e-mail is confidential and/or
>>> >> proprietary to Capital One and/or its affiliates and may only be used
>>> >> solely in performance of work or services for Capital One. The
>>> information
>>> >> transmitted herewith is intended only for use by the individual or
>>> entity
>>> >> to which it is addressed. If the reader of this message is not the
>>> intended
>>> >> recipient, you are hereby notified that any review, retransmission,
>>> >> dissemination, distribution, copying or other use of, or taking of any
>>> >> action in reliance upon this information is strictly prohibited. If you
>>> >> have received this communication in error, please contact the sender and
>>> >> delete the material from your computer.
>>> >>
>>> ________________________________________________________
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The information
>>> transmitted herewith is intended only for use by the individual or entity
>>> to which it is addressed. If the reader of this message is not the intended
>>> recipient, you are hereby notified that any review, retransmission,
>>> dissemination, distribution, copying or other use of, or taking of any
>>> action in reliance upon this information is strictly prohibited. If you
>>> have received this communication in error, please contact the sender and
>>> delete the material from your computer.
>>>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Tushar Gosavi <tu...@datatorrent.com>.
Hi All,

I have dome some initial prototype which allows stat listener to
specify dag changes, and the dag changes are applied asynchronously.

The changes involved are
- Add DagChangeSet object which is inherited from DAG, supporting
methods to remove
  operator and streams.

- The stat listener will return this object in Response, and platform
will apply changes specified in response to the DAG.


The Apex changes
https://github.com/apache/apex-core/compare/master...tushargosavi:scheduler?expand=1

The correspondign Demo application, which one operator monitors the
directory for files, and launch the wordcount DAG in
same application master when files are available.
https://github.com/tushargosavi/incubator-apex-malhar/tree/178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/schedular

Example of stat listerner which monitors a metric and instruct master
to start a dag.

 /** look for more than 100 files in a directory, before lauching the DAG */
@Override
  public Response processStats(BatchedOperatorStats stats)
  {
    for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
      // pendingFiles is autometric.
      Integer value = (Integer)ws.metrics.get("pendingFiles");
      LOG.info("stats recevied for {} pendingFiles {}",
stats.getOperatorId(), value);
      if (value != null  && value > 100 && !dagStarted) {
        dagStarted = true;
        Response resp = new Response();
        resp.dag = getWordCountDag((String)ws.metrics.get("directory"));
        counter = 0;
        return resp;
      }
    }
    return null;
  }

  DAGChangeSet getWordCountDag(String dir)
    {
      DAGChangeSet dag = new DAGChangeSet();
      LineByLineFileInputOperator reader = dag.addOperator("Reader",
new LineByLineFileInputOperator());
      List<StatsListener> listeners = new ArrayList<>();
      listeners.add(this);
      dag.getMeta(reader).getAttributes().put(Context.OperatorContext.STATS_LISTENERS,
listeners);
      reader.setDirectory(dir);
      LineSplitter splitter = dag.addOperator("SplitteR", new LineSplitter());
      UniqueCounter<String> counter = dag.addOperator("Counter", new
UniqueCounter<String>());
      ConsoleOutputOperator out = dag.addOperator("Output", new
ConsoleOutputOperator());
      dag.addStream("s1", reader.output, splitter.input);
      dag.addStream("s2", splitter.words, counter.data);
      dag.addStream("s3", counter.count, out.input);
      return dag;
    }

Let me know if this type of API is acceptable for launching the DAG.
This is an API to specify DAG changes. The scheduler functionality
will use
this API.


Regards,
-Tushar.

On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <th...@datatorrent.com> wrote:
> I like the idea of keeping heavy lifting and custom code out of the master,
> if possible. You find that split in responsibilities even in the case of
> partitioning (Kafka connector for example). The change that requires
> partitioning may be detected as byproduct of the regular processing in the
> container, the information relayed to the master, the action being taken
> there.
>
> We should separate all the different pieces and then decide where they run.
> There is detecting the need for a plan change, then effecting the change
> (which requires full DAG view and absolutely has to/should be in the
> master).
>
> Thomas
>
> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
> Chandni.Singh@capitalone.com> wrote:
>
>> We have couple of components that already run in  master - partitioners,
>> stats listeners,  metrics aggregators.  The problem of crashing the master
>> is not specific to just scheduler, isn't it?
>> ________________________________
>> From: Tushar Gosavi <tu...@datatorrent.com>
>> Sent: Wednesday, June 22, 2016 2:32:39 PM
>> To: dev@apex.apache.org
>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
>> application
>>
>> I was thinking about avoiding running user code in master, As a crash
>> in master takes down all containers with it. hence was going for
>> scheduler as an operator, crash in scheduler won't kill the
>> application, master can restart the scheduler back and it can start
>> monitoring the job again and change the DAG when required. But this
>> will require communication between master and scheduler for monitoring
>> of operator status/stats.
>>
>> It is considerably easy to put scheduling functionality in master, as
>> we have access to operator stats and there is communication channel
>> already opened between master and operators. And custom scheduler can
>> be written as shared stat listener, with additional API available to
>> listener to add/remove/deploy/undeploy etc.. operators.
>>
>> Regards,
>> - Tushar.
>>
>>
>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <th...@datatorrent.com>
>> wrote:
>> > Right, if it runs in the app master and does not rely on unmanaged
>> external
>> > processes, then these requirements can be met.
>> >
>> > This capability seeks to avoid users having to deal with external
>> > schedulers or workflows if all they want is to split a DAG that is
>> > logically one application into multiple stages for resource optimization.
>> > This is not very different from the need to have elasticity in terms of
>> > partitions depending to the availability of input, as you point out.
>> >
>> >
>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
>> > Chandni.Singh@capitalone.com> wrote:
>> >
>> >> Scheduling IMO belongs to App master. Operators can influence it, for
>> eg.
>> >> File splitter can indicate that no more file to process.
>> >>
>> >> I don’t understand how that can not integrate with all the aspects-
>> >> operability, fault tolerance and security.
>> >>
>> >> Chandni
>> >>
>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <ch...@datatorrent.com>
>> wrote:
>> >>
>> >> >I think its a good idea to have a scheduling operator when you need to
>> >> >start a part of the DAG when some trigger happens (for eg. FileSplitter
>> >> >identifying new files in FS) and otherwise bring it down to save
>> >> >resources.
>> >> >
>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>> >> >timothytiborfarkas@gmail.com> wrote:
>> >> >
>> >> >> I am in agreement with Chandni. Scheduling a batch job is an API
>> >> >>completely
>> >> >> independent of a DAG or an operator. It could be used by a
>> commandline
>> >> >>tool
>> >> >> running on your laptop, a script, or it could happen to be used by an
>> >> >> Operator running in a DAG and a StatsListener.
>> >> >>
>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
>> thomas@datatorrent.com>
>> >> >> wrote:
>> >> >>
>> >> >> > Scheduling can be independent, although we have use cases where the
>> >> >> > scheduling depends on completion of processing (multi-staged batch
>> >> >>jobs
>> >> >> > where unused resources need to be freed).
>> >> >> >
>> >> >> > Both can be accomplished with a stats listener.
>> >> >> >
>> >> >> > There can be a "scheduling operator" that brings up and removes DAG
>> >> >> > fragments as needed.
>> >> >> >
>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>> >> >><si...@gmail.com>
>> >> >> > wrote:
>> >> >> >
>> >> >> > > Hi,
>> >> >> > > IMO scheduling a job can be independent of any operator while
>> >> >> > > StatsListeners are not.  I understand that in a lot of cases
>> >> >> input/output
>> >> >> > > operators will decide when the job ends but there can be cases
>> when
>> >> >> > > scheduling can be independent of it.
>> >> >> > >
>> >> >> > > Thanks,
>> >> >> > > Chandni
>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <thomas@datatorrent.com
>> >
>> >> >> wrote:
>> >> >> > >
>> >> >> > > > This looks like something that coordination wise belongs into
>> the
>> >> >> > master
>> >> >> > > > and can be done with a shared stats listener.
>> >> >> > > >
>> >> >> > > > The operator request/response protocol could be used the relay
>> the
>> >> >> data
>> >> >> > > for
>> >> >> > > > the scheduling decisions.
>> >> >> > > >
>> >> >> > > > Thomas
>> >> >> > > >
>> >> >> > > >
>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>> >> >> > > > Chandni.Singh@capitalone.com> wrote:
>> >> >> > > >
>> >> >> > > > > Hi Tushar,
>> >> >> > > > >
>> >> >> > > > > I have some questions about the use case 2: Batch Support
>> >> >> > > > > I don¹t understand the advantages of providing batch support
>> by
>> >> >> > having
>> >> >> > > an
>> >> >> > > > > operator as a scheduler.
>> >> >> > > > >
>> >> >> > > > > An approach that seemed a little more straightforward to me
>> was
>> >> >>to
>> >> >> > > expose
>> >> >> > > > > an API for scheduler. If there is a scheduler set then the
>> >> >>master
>> >> >> > uses
>> >> >> > > > and
>> >> >> > > > > schedules operators. By default there isn¹t any scheduler and
>> >> >>the
>> >> >> job
>> >> >> > > is
>> >> >> > > > > run as it is now.
>> >> >> > > > >
>> >> >> > > > > Maybe this is too simplistic but can you please let me know
>> why
>> >> >> > having
>> >> >> > > an
>> >> >> > > > > operator as a scheduler is a better way?
>> >> >> > > > >
>> >> >> > > > > Thanks,
>> >> >> > > > > Chandni
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
>> tushar@datatorrent.com>
>> >> >> > wrote:
>> >> >> > > > >
>> >> >> > > > > >Hi All,
>> >> >> > > > > >
>> >> >> > > > > >We have seen few use cases in field which require Apex
>> >> >>application
>> >> >> > > > > >scheduling based on some condition. This has also came up as
>> >> >>part
>> >> >> of
>> >> >> > > > > >Batch Support in Apex previously
>> >> >> > > > > >(
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>> >> >> 40mail.gmail.com
>> >> >> > > %3E)
>> >> >> > > > > >. I am proposing following functionality in Apex to help
>> >> >> scheduling
>> >> >> > > > > >and better resource utilization for batch jobs. Please
>> provide
>> >> >> your
>> >> >> > > > > >comments.
>> >> >> > > > > >
>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
>> >> >> > > > > >
>> >> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it
>> is
>> >> >> > > > > >desirable to return the resources to yarn when no data is
>> >> >> available
>> >> >> > > > > >for processing, and deploy whole DAG once data starts to
>> >> >>appear.
>> >> >> For
>> >> >> > > > > >this to happen automatically, we will need some data
>> monitoring
>> >> >> > > > > >operators running in the DAG to trigger restart and
>> shutdown of
>> >> >> the
>> >> >> > > > > >operators in the DAG.
>> >> >> > > > > >
>> >> >> > > > > >Apex already have such api to dynamically change the running
>> >> >>dag
>> >> >> > > > > >through cli. We could provide similar API available to
>> >> >>operators
>> >> >> > which
>> >> >> > > > > >will trigger dag modification at runtime. This information
>> can
>> >> >>be
>> >> >> > > > > >passed to master using heartbeat RPC and master will make
>> >> >> > > > > >required changed to the DAG. let me know what do you think
>> >> >>about
>> >> >> > it..
>> >> >> > > > > >something like below.
>> >> >> > > > > >Context.beginDagChange();
>> >> >> > > > > >context.addOperator("o1") <== launch operator from previous
>> >> >> > > > check-pointed
>> >> >> > > > > >state.
>> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create new
>> >> >>operator
>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream
>> >> >>operators
>> >> >> > from
>> >> >> > > > the
>> >> >> > > > > >DAG.
>> >> >> > > > > >context.apply();  <== dag changes will be send to master,
>> and
>> >> >> master
>> >> >> > > > > >will apply these changes.
>> >> >> > > > > >
>> >> >> > > > > >Similarly API for other functionalities such as locality
>> >> >>settings
>> >> >> > > > > >needs to be provided.
>> >> >> > > > > >
>> >> >> > > > > >
>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>> >> >> > > > > >
>> >> >> > > > > >Provide an API callable from operator to launch a DAG. The
>> >> >> operator
>> >> >> > > > > >will prepare an dag object and submit it to the yarn, the
>> DAG
>> >> >>will
>> >> >> > be
>> >> >> > > > > >scheduled as a new application. This way complex schedulers
>> >> >>can be
>> >> >> > > > > >written as operators.
>> >> >> > > > > >
>> >> >> > > > > >public SchedulerOperator implements Operator {
>> >> >> > > > > >   void handleIdleTime() {
>> >> >> > > > > >      // check of conditions to start a job (for example
>> enough
>> >> >> > files
>> >> >> > > > > >available, enough items are available in kafa, or time has
>> >> >>reached
>> >> >> > > > > >     Dag dag = context.createDAG();
>> >> >> > > > > >     dag.addOperator();
>> >> >> > > > > >     dag.addOperator();
>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>> >> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
>> >> >> > > > > >   }
>> >> >> > > > > >}
>> >> >> > > > > >
>> >> >> > > > > >DagHandler will have methods to monitor the final state of
>> >> >> > > > > >application, or to kill the DAG
>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
>> terminates
>> >> >> > > > > >dagHandler.status()  <== get the status of application.
>> >> >> > > > > >dagHandler.kill() <== kill the running application.
>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>> >> >> > > > > >
>> >> >> > > > > >The more complex Scheduler operators could be written to
>> manage
>> >> >> the
>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>> >> >> > > > > >
>> >> >> > > > > >Regards,
>> >> >> > > > > >-Tushar.
>> >> >> > > > >
>> >> >> > > > > ________________________________________________________
>> >> >> > > > >
>> >> >> > > > > The information contained in this e-mail is confidential
>> and/or
>> >> >> > > > > proprietary to Capital One and/or its affiliates and may
>> only be
>> >> >> used
>> >> >> > > > > solely in performance of work or services for Capital One.
>> The
>> >> >> > > > information
>> >> >> > > > > transmitted herewith is intended only for use by the
>> individual
>> >> >>or
>> >> >> > > entity
>> >> >> > > > > to which it is addressed. If the reader of this message is
>> not
>> >> >>the
>> >> >> > > > intended
>> >> >> > > > > recipient, you are hereby notified that any review,
>> >> >>retransmission,
>> >> >> > > > > dissemination, distribution, copying or other use of, or
>> taking
>> >> >>of
>> >> >> > any
>> >> >> > > > > action in reliance upon this information is strictly
>> >> >>prohibited. If
>> >> >> > you
>> >> >> > > > > have received this communication in error, please contact the
>> >> >> sender
>> >> >> > > and
>> >> >> > > > > delete the material from your computer.
>> >> >> > > > >
>> >> >> > > > >
>> >> >> > > >
>> >> >> > >
>> >> >> >
>> >> >>
>> >>
>> >> ________________________________________________________
>> >>
>> >> The information contained in this e-mail is confidential and/or
>> >> proprietary to Capital One and/or its affiliates and may only be used
>> >> solely in performance of work or services for Capital One. The
>> information
>> >> transmitted herewith is intended only for use by the individual or
>> entity
>> >> to which it is addressed. If the reader of this message is not the
>> intended
>> >> recipient, you are hereby notified that any review, retransmission,
>> >> dissemination, distribution, copying or other use of, or taking of any
>> >> action in reliance upon this information is strictly prohibited. If you
>> >> have received this communication in error, please contact the sender and
>> >> delete the material from your computer.
>> >>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Thomas Weise <th...@datatorrent.com>.
I like the idea of keeping heavy lifting and custom code out of the master,
if possible. You find that split in responsibilities even in the case of
partitioning (Kafka connector for example). The change that requires
partitioning may be detected as byproduct of the regular processing in the
container, the information relayed to the master, the action being taken
there.

We should separate all the different pieces and then decide where they run.
There is detecting the need for a plan change, then effecting the change
(which requires full DAG view and absolutely has to/should be in the
master).

Thomas

On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
Chandni.Singh@capitalone.com> wrote:

> We have couple of components that already run in  master - partitioners,
> stats listeners,  metrics aggregators.  The problem of crashing the master
> is not specific to just scheduler, isn't it?
> ________________________________
> From: Tushar Gosavi <tu...@datatorrent.com>
> Sent: Wednesday, June 22, 2016 2:32:39 PM
> To: dev@apex.apache.org
> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
> application
>
> I was thinking about avoiding running user code in master, As a crash
> in master takes down all containers with it. hence was going for
> scheduler as an operator, crash in scheduler won't kill the
> application, master can restart the scheduler back and it can start
> monitoring the job again and change the DAG when required. But this
> will require communication between master and scheduler for monitoring
> of operator status/stats.
>
> It is considerably easy to put scheduling functionality in master, as
> we have access to operator stats and there is communication channel
> already opened between master and operators. And custom scheduler can
> be written as shared stat listener, with additional API available to
> listener to add/remove/deploy/undeploy etc.. operators.
>
> Regards,
> - Tushar.
>
>
> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <th...@datatorrent.com>
> wrote:
> > Right, if it runs in the app master and does not rely on unmanaged
> external
> > processes, then these requirements can be met.
> >
> > This capability seeks to avoid users having to deal with external
> > schedulers or workflows if all they want is to split a DAG that is
> > logically one application into multiple stages for resource optimization.
> > This is not very different from the need to have elasticity in terms of
> > partitions depending to the availability of input, as you point out.
> >
> >
> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
> > Chandni.Singh@capitalone.com> wrote:
> >
> >> Scheduling IMO belongs to App master. Operators can influence it, for
> eg.
> >> File splitter can indicate that no more file to process.
> >>
> >> I don’t understand how that can not integrate with all the aspects-
> >> operability, fault tolerance and security.
> >>
> >> Chandni
> >>
> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <ch...@datatorrent.com>
> wrote:
> >>
> >> >I think its a good idea to have a scheduling operator when you need to
> >> >start a part of the DAG when some trigger happens (for eg. FileSplitter
> >> >identifying new files in FS) and otherwise bring it down to save
> >> >resources.
> >> >
> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
> >> >timothytiborfarkas@gmail.com> wrote:
> >> >
> >> >> I am in agreement with Chandni. Scheduling a batch job is an API
> >> >>completely
> >> >> independent of a DAG or an operator. It could be used by a
> commandline
> >> >>tool
> >> >> running on your laptop, a script, or it could happen to be used by an
> >> >> Operator running in a DAG and a StatsListener.
> >> >>
> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
> thomas@datatorrent.com>
> >> >> wrote:
> >> >>
> >> >> > Scheduling can be independent, although we have use cases where the
> >> >> > scheduling depends on completion of processing (multi-staged batch
> >> >>jobs
> >> >> > where unused resources need to be freed).
> >> >> >
> >> >> > Both can be accomplished with a stats listener.
> >> >> >
> >> >> > There can be a "scheduling operator" that brings up and removes DAG
> >> >> > fragments as needed.
> >> >> >
> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
> >> >><si...@gmail.com>
> >> >> > wrote:
> >> >> >
> >> >> > > Hi,
> >> >> > > IMO scheduling a job can be independent of any operator while
> >> >> > > StatsListeners are not.  I understand that in a lot of cases
> >> >> input/output
> >> >> > > operators will decide when the job ends but there can be cases
> when
> >> >> > > scheduling can be independent of it.
> >> >> > >
> >> >> > > Thanks,
> >> >> > > Chandni
> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <thomas@datatorrent.com
> >
> >> >> wrote:
> >> >> > >
> >> >> > > > This looks like something that coordination wise belongs into
> the
> >> >> > master
> >> >> > > > and can be done with a shared stats listener.
> >> >> > > >
> >> >> > > > The operator request/response protocol could be used the relay
> the
> >> >> data
> >> >> > > for
> >> >> > > > the scheduling decisions.
> >> >> > > >
> >> >> > > > Thomas
> >> >> > > >
> >> >> > > >
> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> >> >> > > > Chandni.Singh@capitalone.com> wrote:
> >> >> > > >
> >> >> > > > > Hi Tushar,
> >> >> > > > >
> >> >> > > > > I have some questions about the use case 2: Batch Support
> >> >> > > > > I don¹t understand the advantages of providing batch support
> by
> >> >> > having
> >> >> > > an
> >> >> > > > > operator as a scheduler.
> >> >> > > > >
> >> >> > > > > An approach that seemed a little more straightforward to me
> was
> >> >>to
> >> >> > > expose
> >> >> > > > > an API for scheduler. If there is a scheduler set then the
> >> >>master
> >> >> > uses
> >> >> > > > and
> >> >> > > > > schedules operators. By default there isn¹t any scheduler and
> >> >>the
> >> >> job
> >> >> > > is
> >> >> > > > > run as it is now.
> >> >> > > > >
> >> >> > > > > Maybe this is too simplistic but can you please let me know
> why
> >> >> > having
> >> >> > > an
> >> >> > > > > operator as a scheduler is a better way?
> >> >> > > > >
> >> >> > > > > Thanks,
> >> >> > > > > Chandni
> >> >> > > > >
> >> >> > > > >
> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
> tushar@datatorrent.com>
> >> >> > wrote:
> >> >> > > > >
> >> >> > > > > >Hi All,
> >> >> > > > > >
> >> >> > > > > >We have seen few use cases in field which require Apex
> >> >>application
> >> >> > > > > >scheduling based on some condition. This has also came up as
> >> >>part
> >> >> of
> >> >> > > > > >Batch Support in Apex previously
> >> >> > > > > >(
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >> >>
> >>
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
> >> >> 40mail.gmail.com
> >> >> > > %3E)
> >> >> > > > > >. I am proposing following functionality in Apex to help
> >> >> scheduling
> >> >> > > > > >and better resource utilization for batch jobs. Please
> provide
> >> >> your
> >> >> > > > > >comments.
> >> >> > > > > >
> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
> >> >> > > > > >
> >> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it
> is
> >> >> > > > > >desirable to return the resources to yarn when no data is
> >> >> available
> >> >> > > > > >for processing, and deploy whole DAG once data starts to
> >> >>appear.
> >> >> For
> >> >> > > > > >this to happen automatically, we will need some data
> monitoring
> >> >> > > > > >operators running in the DAG to trigger restart and
> shutdown of
> >> >> the
> >> >> > > > > >operators in the DAG.
> >> >> > > > > >
> >> >> > > > > >Apex already have such api to dynamically change the running
> >> >>dag
> >> >> > > > > >through cli. We could provide similar API available to
> >> >>operators
> >> >> > which
> >> >> > > > > >will trigger dag modification at runtime. This information
> can
> >> >>be
> >> >> > > > > >passed to master using heartbeat RPC and master will make
> >> >> > > > > >required changed to the DAG. let me know what do you think
> >> >>about
> >> >> > it..
> >> >> > > > > >something like below.
> >> >> > > > > >Context.beginDagChange();
> >> >> > > > > >context.addOperator("o1") <== launch operator from previous
> >> >> > > > check-pointed
> >> >> > > > > >state.
> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create new
> >> >>operator
> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream
> >> >>operators
> >> >> > from
> >> >> > > > the
> >> >> > > > > >DAG.
> >> >> > > > > >context.apply();  <== dag changes will be send to master,
> and
> >> >> master
> >> >> > > > > >will apply these changes.
> >> >> > > > > >
> >> >> > > > > >Similarly API for other functionalities such as locality
> >> >>settings
> >> >> > > > > >needs to be provided.
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
> >> >> > > > > >
> >> >> > > > > >Provide an API callable from operator to launch a DAG. The
> >> >> operator
> >> >> > > > > >will prepare an dag object and submit it to the yarn, the
> DAG
> >> >>will
> >> >> > be
> >> >> > > > > >scheduled as a new application. This way complex schedulers
> >> >>can be
> >> >> > > > > >written as operators.
> >> >> > > > > >
> >> >> > > > > >public SchedulerOperator implements Operator {
> >> >> > > > > >   void handleIdleTime() {
> >> >> > > > > >      // check of conditions to start a job (for example
> enough
> >> >> > files
> >> >> > > > > >available, enough items are available in kafa, or time has
> >> >>reached
> >> >> > > > > >     Dag dag = context.createDAG();
> >> >> > > > > >     dag.addOperator();
> >> >> > > > > >     dag.addOperator();
> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
> >> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
> >> >> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
> >> >> > > > > >   }
> >> >> > > > > >}
> >> >> > > > > >
> >> >> > > > > >DagHandler will have methods to monitor the final state of
> >> >> > > > > >application, or to kill the DAG
> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
> terminates
> >> >> > > > > >dagHandler.status()  <== get the status of application.
> >> >> > > > > >dagHandler.kill() <== kill the running application.
> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
> >> >> > > > > >
> >> >> > > > > >The more complex Scheduler operators could be written to
> manage
> >> >> the
> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
> >> >> > > > > >
> >> >> > > > > >Regards,
> >> >> > > > > >-Tushar.
> >> >> > > > >
> >> >> > > > > ________________________________________________________
> >> >> > > > >
> >> >> > > > > The information contained in this e-mail is confidential
> and/or
> >> >> > > > > proprietary to Capital One and/or its affiliates and may
> only be
> >> >> used
> >> >> > > > > solely in performance of work or services for Capital One.
> The
> >> >> > > > information
> >> >> > > > > transmitted herewith is intended only for use by the
> individual
> >> >>or
> >> >> > > entity
> >> >> > > > > to which it is addressed. If the reader of this message is
> not
> >> >>the
> >> >> > > > intended
> >> >> > > > > recipient, you are hereby notified that any review,
> >> >>retransmission,
> >> >> > > > > dissemination, distribution, copying or other use of, or
> taking
> >> >>of
> >> >> > any
> >> >> > > > > action in reliance upon this information is strictly
> >> >>prohibited. If
> >> >> > you
> >> >> > > > > have received this communication in error, please contact the
> >> >> sender
> >> >> > > and
> >> >> > > > > delete the material from your computer.
> >> >> > > > >
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> >>
> >> ________________________________________________________
> >>
> >> The information contained in this e-mail is confidential and/or
> >> proprietary to Capital One and/or its affiliates and may only be used
> >> solely in performance of work or services for Capital One. The
> information
> >> transmitted herewith is intended only for use by the individual or
> entity
> >> to which it is addressed. If the reader of this message is not the
> intended
> >> recipient, you are hereby notified that any review, retransmission,
> >> dissemination, distribution, copying or other use of, or taking of any
> >> action in reliance upon this information is strictly prohibited. If you
> >> have received this communication in error, please contact the sender and
> >> delete the material from your computer.
> >>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

RE: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by "Singh, Chandni" <Ch...@capitalone.com>.
We have couple of components that already run in  master - partitioners,  stats listeners,  metrics aggregators.  The problem of crashing the master is not specific to just scheduler, isn't it?
________________________________
From: Tushar Gosavi <tu...@datatorrent.com>
Sent: Wednesday, June 22, 2016 2:32:39 PM
To: dev@apex.apache.org
Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

I was thinking about avoiding running user code in master, As a crash
in master takes down all containers with it. hence was going for
scheduler as an operator, crash in scheduler won't kill the
application, master can restart the scheduler back and it can start
monitoring the job again and change the DAG when required. But this
will require communication between master and scheduler for monitoring
of operator status/stats.

It is considerably easy to put scheduling functionality in master, as
we have access to operator stats and there is communication channel
already opened between master and operators. And custom scheduler can
be written as shared stat listener, with additional API available to
listener to add/remove/deploy/undeploy etc.. operators.

Regards,
- Tushar.


On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <th...@datatorrent.com> wrote:
> Right, if it runs in the app master and does not rely on unmanaged external
> processes, then these requirements can be met.
>
> This capability seeks to avoid users having to deal with external
> schedulers or workflows if all they want is to split a DAG that is
> logically one application into multiple stages for resource optimization.
> This is not very different from the need to have elasticity in terms of
> partitions depending to the availability of input, as you point out.
>
>
> On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
> Chandni.Singh@capitalone.com> wrote:
>
>> Scheduling IMO belongs to App master. Operators can influence it, for eg.
>> File splitter can indicate that no more file to process.
>>
>> I don’t understand how that can not integrate with all the aspects-
>> operability, fault tolerance and security.
>>
>> Chandni
>>
>> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <ch...@datatorrent.com> wrote:
>>
>> >I think its a good idea to have a scheduling operator when you need to
>> >start a part of the DAG when some trigger happens (for eg. FileSplitter
>> >identifying new files in FS) and otherwise bring it down to save
>> >resources.
>> >
>> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>> >timothytiborfarkas@gmail.com> wrote:
>> >
>> >> I am in agreement with Chandni. Scheduling a batch job is an API
>> >>completely
>> >> independent of a DAG or an operator. It could be used by a commandline
>> >>tool
>> >> running on your laptop, a script, or it could happen to be used by an
>> >> Operator running in a DAG and a StatsListener.
>> >>
>> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <th...@datatorrent.com>
>> >> wrote:
>> >>
>> >> > Scheduling can be independent, although we have use cases where the
>> >> > scheduling depends on completion of processing (multi-staged batch
>> >>jobs
>> >> > where unused resources need to be freed).
>> >> >
>> >> > Both can be accomplished with a stats listener.
>> >> >
>> >> > There can be a "scheduling operator" that brings up and removes DAG
>> >> > fragments as needed.
>> >> >
>> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>> >><si...@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Hi,
>> >> > > IMO scheduling a job can be independent of any operator while
>> >> > > StatsListeners are not.  I understand that in a lot of cases
>> >> input/output
>> >> > > operators will decide when the job ends but there can be cases when
>> >> > > scheduling can be independent of it.
>> >> > >
>> >> > > Thanks,
>> >> > > Chandni
>> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <th...@datatorrent.com>
>> >> wrote:
>> >> > >
>> >> > > > This looks like something that coordination wise belongs into the
>> >> > master
>> >> > > > and can be done with a shared stats listener.
>> >> > > >
>> >> > > > The operator request/response protocol could be used the relay the
>> >> data
>> >> > > for
>> >> > > > the scheduling decisions.
>> >> > > >
>> >> > > > Thomas
>> >> > > >
>> >> > > >
>> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>> >> > > > Chandni.Singh@capitalone.com> wrote:
>> >> > > >
>> >> > > > > Hi Tushar,
>> >> > > > >
>> >> > > > > I have some questions about the use case 2: Batch Support
>> >> > > > > I don¹t understand the advantages of providing batch support by
>> >> > having
>> >> > > an
>> >> > > > > operator as a scheduler.
>> >> > > > >
>> >> > > > > An approach that seemed a little more straightforward to me was
>> >>to
>> >> > > expose
>> >> > > > > an API for scheduler. If there is a scheduler set then the
>> >>master
>> >> > uses
>> >> > > > and
>> >> > > > > schedules operators. By default there isn¹t any scheduler and
>> >>the
>> >> job
>> >> > > is
>> >> > > > > run as it is now.
>> >> > > > >
>> >> > > > > Maybe this is too simplistic but can you please let me know why
>> >> > having
>> >> > > an
>> >> > > > > operator as a scheduler is a better way?
>> >> > > > >
>> >> > > > > Thanks,
>> >> > > > > Chandni
>> >> > > > >
>> >> > > > >
>> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com>
>> >> > wrote:
>> >> > > > >
>> >> > > > > >Hi All,
>> >> > > > > >
>> >> > > > > >We have seen few use cases in field which require Apex
>> >>application
>> >> > > > > >scheduling based on some condition. This has also came up as
>> >>part
>> >> of
>> >> > > > > >Batch Support in Apex previously
>> >> > > > > >(
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >>
>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>> >> 40mail.gmail.com
>> >> > > %3E)
>> >> > > > > >. I am proposing following functionality in Apex to help
>> >> scheduling
>> >> > > > > >and better resource utilization for batch jobs. Please provide
>> >> your
>> >> > > > > >comments.
>> >> > > > > >
>> >> > > > > >Usecase 1 - Dynamic Dag modification.
>> >> > > > > >
>> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it is
>> >> > > > > >desirable to return the resources to yarn when no data is
>> >> available
>> >> > > > > >for processing, and deploy whole DAG once data starts to
>> >>appear.
>> >> For
>> >> > > > > >this to happen automatically, we will need some data monitoring
>> >> > > > > >operators running in the DAG to trigger restart and shutdown of
>> >> the
>> >> > > > > >operators in the DAG.
>> >> > > > > >
>> >> > > > > >Apex already have such api to dynamically change the running
>> >>dag
>> >> > > > > >through cli. We could provide similar API available to
>> >>operators
>> >> > which
>> >> > > > > >will trigger dag modification at runtime. This information can
>> >>be
>> >> > > > > >passed to master using heartbeat RPC and master will make
>> >> > > > > >required changed to the DAG. let me know what do you think
>> >>about
>> >> > it..
>> >> > > > > >something like below.
>> >> > > > > >Context.beginDagChange();
>> >> > > > > >context.addOperator("o1") <== launch operator from previous
>> >> > > > check-pointed
>> >> > > > > >state.
>> >> > > > > >context.addOperator("o2", new Operator2()) <== create new
>> >>operator
>> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>> >> > > > > >context.shutdown("o3"); <== delete this and downstream
>> >>operators
>> >> > from
>> >> > > > the
>> >> > > > > >DAG.
>> >> > > > > >context.apply();  <== dag changes will be send to master, and
>> >> master
>> >> > > > > >will apply these changes.
>> >> > > > > >
>> >> > > > > >Similarly API for other functionalities such as locality
>> >>settings
>> >> > > > > >needs to be provided.
>> >> > > > > >
>> >> > > > > >
>> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>> >> > > > > >
>> >> > > > > >Provide an API callable from operator to launch a DAG. The
>> >> operator
>> >> > > > > >will prepare an dag object and submit it to the yarn, the DAG
>> >>will
>> >> > be
>> >> > > > > >scheduled as a new application. This way complex schedulers
>> >>can be
>> >> > > > > >written as operators.
>> >> > > > > >
>> >> > > > > >public SchedulerOperator implements Operator {
>> >> > > > > >   void handleIdleTime() {
>> >> > > > > >      // check of conditions to start a job (for example enough
>> >> > files
>> >> > > > > >available, enough items are available in kafa, or time has
>> >>reached
>> >> > > > > >     Dag dag = context.createDAG();
>> >> > > > > >     dag.addOperator();
>> >> > > > > >     dag.addOperator();
>> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
>> >> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
>> >> > > > > >   }
>> >> > > > > >}
>> >> > > > > >
>> >> > > > > >DagHandler will have methods to monitor the final state of
>> >> > > > > >application, or to kill the DAG
>> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG terminates
>> >> > > > > >dagHandler.status()  <== get the status of application.
>> >> > > > > >dagHandler.kill() <== kill the running application.
>> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>> >> > > > > >
>> >> > > > > >The more complex Scheduler operators could be written to manage
>> >> the
>> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>> >> > > > > >
>> >> > > > > >Regards,
>> >> > > > > >-Tushar.
>> >> > > > >
>> >> > > > > ________________________________________________________
>> >> > > > >
>> >> > > > > The information contained in this e-mail is confidential and/or
>> >> > > > > proprietary to Capital One and/or its affiliates and may only be
>> >> used
>> >> > > > > solely in performance of work or services for Capital One. The
>> >> > > > information
>> >> > > > > transmitted herewith is intended only for use by the individual
>> >>or
>> >> > > entity
>> >> > > > > to which it is addressed. If the reader of this message is not
>> >>the
>> >> > > > intended
>> >> > > > > recipient, you are hereby notified that any review,
>> >>retransmission,
>> >> > > > > dissemination, distribution, copying or other use of, or taking
>> >>of
>> >> > any
>> >> > > > > action in reliance upon this information is strictly
>> >>prohibited. If
>> >> > you
>> >> > > > > have received this communication in error, please contact the
>> >> sender
>> >> > > and
>> >> > > > > delete the material from your computer.
>> >> > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Tushar Gosavi <tu...@datatorrent.com>.
I was thinking about avoiding running user code in master, As a crash
in master takes down all containers with it. hence was going for
scheduler as an operator, crash in scheduler won't kill the
application, master can restart the scheduler back and it can start
monitoring the job again and change the DAG when required. But this
will require communication between master and scheduler for monitoring
of operator status/stats.

It is considerably easy to put scheduling functionality in master, as
we have access to operator stats and there is communication channel
already opened between master and operators. And custom scheduler can
be written as shared stat listener, with additional API available to
listener to add/remove/deploy/undeploy etc.. operators.

Regards,
- Tushar.


On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <th...@datatorrent.com> wrote:
> Right, if it runs in the app master and does not rely on unmanaged external
> processes, then these requirements can be met.
>
> This capability seeks to avoid users having to deal with external
> schedulers or workflows if all they want is to split a DAG that is
> logically one application into multiple stages for resource optimization.
> This is not very different from the need to have elasticity in terms of
> partitions depending to the availability of input, as you point out.
>
>
> On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
> Chandni.Singh@capitalone.com> wrote:
>
>> Scheduling IMO belongs to App master. Operators can influence it, for eg.
>> File splitter can indicate that no more file to process.
>>
>> I don’t understand how that can not integrate with all the aspects-
>> operability, fault tolerance and security.
>>
>> Chandni
>>
>> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <ch...@datatorrent.com> wrote:
>>
>> >I think its a good idea to have a scheduling operator when you need to
>> >start a part of the DAG when some trigger happens (for eg. FileSplitter
>> >identifying new files in FS) and otherwise bring it down to save
>> >resources.
>> >
>> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>> >timothytiborfarkas@gmail.com> wrote:
>> >
>> >> I am in agreement with Chandni. Scheduling a batch job is an API
>> >>completely
>> >> independent of a DAG or an operator. It could be used by a commandline
>> >>tool
>> >> running on your laptop, a script, or it could happen to be used by an
>> >> Operator running in a DAG and a StatsListener.
>> >>
>> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <th...@datatorrent.com>
>> >> wrote:
>> >>
>> >> > Scheduling can be independent, although we have use cases where the
>> >> > scheduling depends on completion of processing (multi-staged batch
>> >>jobs
>> >> > where unused resources need to be freed).
>> >> >
>> >> > Both can be accomplished with a stats listener.
>> >> >
>> >> > There can be a "scheduling operator" that brings up and removes DAG
>> >> > fragments as needed.
>> >> >
>> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>> >><si...@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Hi,
>> >> > > IMO scheduling a job can be independent of any operator while
>> >> > > StatsListeners are not.  I understand that in a lot of cases
>> >> input/output
>> >> > > operators will decide when the job ends but there can be cases when
>> >> > > scheduling can be independent of it.
>> >> > >
>> >> > > Thanks,
>> >> > > Chandni
>> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <th...@datatorrent.com>
>> >> wrote:
>> >> > >
>> >> > > > This looks like something that coordination wise belongs into the
>> >> > master
>> >> > > > and can be done with a shared stats listener.
>> >> > > >
>> >> > > > The operator request/response protocol could be used the relay the
>> >> data
>> >> > > for
>> >> > > > the scheduling decisions.
>> >> > > >
>> >> > > > Thomas
>> >> > > >
>> >> > > >
>> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>> >> > > > Chandni.Singh@capitalone.com> wrote:
>> >> > > >
>> >> > > > > Hi Tushar,
>> >> > > > >
>> >> > > > > I have some questions about the use case 2: Batch Support
>> >> > > > > I don¹t understand the advantages of providing batch support by
>> >> > having
>> >> > > an
>> >> > > > > operator as a scheduler.
>> >> > > > >
>> >> > > > > An approach that seemed a little more straightforward to me was
>> >>to
>> >> > > expose
>> >> > > > > an API for scheduler. If there is a scheduler set then the
>> >>master
>> >> > uses
>> >> > > > and
>> >> > > > > schedules operators. By default there isn¹t any scheduler and
>> >>the
>> >> job
>> >> > > is
>> >> > > > > run as it is now.
>> >> > > > >
>> >> > > > > Maybe this is too simplistic but can you please let me know why
>> >> > having
>> >> > > an
>> >> > > > > operator as a scheduler is a better way?
>> >> > > > >
>> >> > > > > Thanks,
>> >> > > > > Chandni
>> >> > > > >
>> >> > > > >
>> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com>
>> >> > wrote:
>> >> > > > >
>> >> > > > > >Hi All,
>> >> > > > > >
>> >> > > > > >We have seen few use cases in field which require Apex
>> >>application
>> >> > > > > >scheduling based on some condition. This has also came up as
>> >>part
>> >> of
>> >> > > > > >Batch Support in Apex previously
>> >> > > > > >(
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >>
>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>> >> 40mail.gmail.com
>> >> > > %3E)
>> >> > > > > >. I am proposing following functionality in Apex to help
>> >> scheduling
>> >> > > > > >and better resource utilization for batch jobs. Please provide
>> >> your
>> >> > > > > >comments.
>> >> > > > > >
>> >> > > > > >Usecase 1 - Dynamic Dag modification.
>> >> > > > > >
>> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it is
>> >> > > > > >desirable to return the resources to yarn when no data is
>> >> available
>> >> > > > > >for processing, and deploy whole DAG once data starts to
>> >>appear.
>> >> For
>> >> > > > > >this to happen automatically, we will need some data monitoring
>> >> > > > > >operators running in the DAG to trigger restart and shutdown of
>> >> the
>> >> > > > > >operators in the DAG.
>> >> > > > > >
>> >> > > > > >Apex already have such api to dynamically change the running
>> >>dag
>> >> > > > > >through cli. We could provide similar API available to
>> >>operators
>> >> > which
>> >> > > > > >will trigger dag modification at runtime. This information can
>> >>be
>> >> > > > > >passed to master using heartbeat RPC and master will make
>> >> > > > > >required changed to the DAG. let me know what do you think
>> >>about
>> >> > it..
>> >> > > > > >something like below.
>> >> > > > > >Context.beginDagChange();
>> >> > > > > >context.addOperator("o1") <== launch operator from previous
>> >> > > > check-pointed
>> >> > > > > >state.
>> >> > > > > >context.addOperator("o2", new Operator2()) <== create new
>> >>operator
>> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>> >> > > > > >context.shutdown("o3"); <== delete this and downstream
>> >>operators
>> >> > from
>> >> > > > the
>> >> > > > > >DAG.
>> >> > > > > >context.apply();  <== dag changes will be send to master, and
>> >> master
>> >> > > > > >will apply these changes.
>> >> > > > > >
>> >> > > > > >Similarly API for other functionalities such as locality
>> >>settings
>> >> > > > > >needs to be provided.
>> >> > > > > >
>> >> > > > > >
>> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>> >> > > > > >
>> >> > > > > >Provide an API callable from operator to launch a DAG. The
>> >> operator
>> >> > > > > >will prepare an dag object and submit it to the yarn, the DAG
>> >>will
>> >> > be
>> >> > > > > >scheduled as a new application. This way complex schedulers
>> >>can be
>> >> > > > > >written as operators.
>> >> > > > > >
>> >> > > > > >public SchedulerOperator implements Operator {
>> >> > > > > >   void handleIdleTime() {
>> >> > > > > >      // check of conditions to start a job (for example enough
>> >> > files
>> >> > > > > >available, enough items are available in kafa, or time has
>> >>reached
>> >> > > > > >     Dag dag = context.createDAG();
>> >> > > > > >     dag.addOperator();
>> >> > > > > >     dag.addOperator();
>> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
>> >> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
>> >> > > > > >   }
>> >> > > > > >}
>> >> > > > > >
>> >> > > > > >DagHandler will have methods to monitor the final state of
>> >> > > > > >application, or to kill the DAG
>> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG terminates
>> >> > > > > >dagHandler.status()  <== get the status of application.
>> >> > > > > >dagHandler.kill() <== kill the running application.
>> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>> >> > > > > >
>> >> > > > > >The more complex Scheduler operators could be written to manage
>> >> the
>> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>> >> > > > > >
>> >> > > > > >Regards,
>> >> > > > > >-Tushar.
>> >> > > > >
>> >> > > > > ________________________________________________________
>> >> > > > >
>> >> > > > > The information contained in this e-mail is confidential and/or
>> >> > > > > proprietary to Capital One and/or its affiliates and may only be
>> >> used
>> >> > > > > solely in performance of work or services for Capital One. The
>> >> > > > information
>> >> > > > > transmitted herewith is intended only for use by the individual
>> >>or
>> >> > > entity
>> >> > > > > to which it is addressed. If the reader of this message is not
>> >>the
>> >> > > > intended
>> >> > > > > recipient, you are hereby notified that any review,
>> >>retransmission,
>> >> > > > > dissemination, distribution, copying or other use of, or taking
>> >>of
>> >> > any
>> >> > > > > action in reliance upon this information is strictly
>> >>prohibited. If
>> >> > you
>> >> > > > > have received this communication in error, please contact the
>> >> sender
>> >> > > and
>> >> > > > > delete the material from your computer.
>> >> > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>>
>> ________________________________________________________
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Thomas Weise <th...@datatorrent.com>.
Right, if it runs in the app master and does not rely on unmanaged external
processes, then these requirements can be met.

This capability seeks to avoid users having to deal with external
schedulers or workflows if all they want is to split a DAG that is
logically one application into multiple stages for resource optimization.
This is not very different from the need to have elasticity in terms of
partitions depending to the availability of input, as you point out.


On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
Chandni.Singh@capitalone.com> wrote:

> Scheduling IMO belongs to App master. Operators can influence it, for eg.
> File splitter can indicate that no more file to process.
>
> I don’t understand how that can not integrate with all the aspects-
> operability, fault tolerance and security.
>
> Chandni
>
> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <ch...@datatorrent.com> wrote:
>
> >I think its a good idea to have a scheduling operator when you need to
> >start a part of the DAG when some trigger happens (for eg. FileSplitter
> >identifying new files in FS) and otherwise bring it down to save
> >resources.
> >
> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
> >timothytiborfarkas@gmail.com> wrote:
> >
> >> I am in agreement with Chandni. Scheduling a batch job is an API
> >>completely
> >> independent of a DAG or an operator. It could be used by a commandline
> >>tool
> >> running on your laptop, a script, or it could happen to be used by an
> >> Operator running in a DAG and a StatsListener.
> >>
> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <th...@datatorrent.com>
> >> wrote:
> >>
> >> > Scheduling can be independent, although we have use cases where the
> >> > scheduling depends on completion of processing (multi-staged batch
> >>jobs
> >> > where unused resources need to be freed).
> >> >
> >> > Both can be accomplished with a stats listener.
> >> >
> >> > There can be a "scheduling operator" that brings up and removes DAG
> >> > fragments as needed.
> >> >
> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
> >><si...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi,
> >> > > IMO scheduling a job can be independent of any operator while
> >> > > StatsListeners are not.  I understand that in a lot of cases
> >> input/output
> >> > > operators will decide when the job ends but there can be cases when
> >> > > scheduling can be independent of it.
> >> > >
> >> > > Thanks,
> >> > > Chandni
> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <th...@datatorrent.com>
> >> wrote:
> >> > >
> >> > > > This looks like something that coordination wise belongs into the
> >> > master
> >> > > > and can be done with a shared stats listener.
> >> > > >
> >> > > > The operator request/response protocol could be used the relay the
> >> data
> >> > > for
> >> > > > the scheduling decisions.
> >> > > >
> >> > > > Thomas
> >> > > >
> >> > > >
> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> >> > > > Chandni.Singh@capitalone.com> wrote:
> >> > > >
> >> > > > > Hi Tushar,
> >> > > > >
> >> > > > > I have some questions about the use case 2: Batch Support
> >> > > > > I don¹t understand the advantages of providing batch support by
> >> > having
> >> > > an
> >> > > > > operator as a scheduler.
> >> > > > >
> >> > > > > An approach that seemed a little more straightforward to me was
> >>to
> >> > > expose
> >> > > > > an API for scheduler. If there is a scheduler set then the
> >>master
> >> > uses
> >> > > > and
> >> > > > > schedules operators. By default there isn¹t any scheduler and
> >>the
> >> job
> >> > > is
> >> > > > > run as it is now.
> >> > > > >
> >> > > > > Maybe this is too simplistic but can you please let me know why
> >> > having
> >> > > an
> >> > > > > operator as a scheduler is a better way?
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Chandni
> >> > > > >
> >> > > > >
> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com>
> >> > wrote:
> >> > > > >
> >> > > > > >Hi All,
> >> > > > > >
> >> > > > > >We have seen few use cases in field which require Apex
> >>application
> >> > > > > >scheduling based on some condition. This has also came up as
> >>part
> >> of
> >> > > > > >Batch Support in Apex previously
> >> > > > > >(
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >>
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
> >> 40mail.gmail.com
> >> > > %3E)
> >> > > > > >. I am proposing following functionality in Apex to help
> >> scheduling
> >> > > > > >and better resource utilization for batch jobs. Please provide
> >> your
> >> > > > > >comments.
> >> > > > > >
> >> > > > > >Usecase 1 - Dynamic Dag modification.
> >> > > > > >
> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it is
> >> > > > > >desirable to return the resources to yarn when no data is
> >> available
> >> > > > > >for processing, and deploy whole DAG once data starts to
> >>appear.
> >> For
> >> > > > > >this to happen automatically, we will need some data monitoring
> >> > > > > >operators running in the DAG to trigger restart and shutdown of
> >> the
> >> > > > > >operators in the DAG.
> >> > > > > >
> >> > > > > >Apex already have such api to dynamically change the running
> >>dag
> >> > > > > >through cli. We could provide similar API available to
> >>operators
> >> > which
> >> > > > > >will trigger dag modification at runtime. This information can
> >>be
> >> > > > > >passed to master using heartbeat RPC and master will make
> >> > > > > >required changed to the DAG. let me know what do you think
> >>about
> >> > it..
> >> > > > > >something like below.
> >> > > > > >Context.beginDagChange();
> >> > > > > >context.addOperator("o1") <== launch operator from previous
> >> > > > check-pointed
> >> > > > > >state.
> >> > > > > >context.addOperator("o2", new Operator2()) <== create new
> >>operator
> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
> >> > > > > >context.shutdown("o3"); <== delete this and downstream
> >>operators
> >> > from
> >> > > > the
> >> > > > > >DAG.
> >> > > > > >context.apply();  <== dag changes will be send to master, and
> >> master
> >> > > > > >will apply these changes.
> >> > > > > >
> >> > > > > >Similarly API for other functionalities such as locality
> >>settings
> >> > > > > >needs to be provided.
> >> > > > > >
> >> > > > > >
> >> > > > > >Usecase 2 - Classic Batch Scheduling.
> >> > > > > >
> >> > > > > >Provide an API callable from operator to launch a DAG. The
> >> operator
> >> > > > > >will prepare an dag object and submit it to the yarn, the DAG
> >>will
> >> > be
> >> > > > > >scheduled as a new application. This way complex schedulers
> >>can be
> >> > > > > >written as operators.
> >> > > > > >
> >> > > > > >public SchedulerOperator implements Operator {
> >> > > > > >   void handleIdleTime() {
> >> > > > > >      // check of conditions to start a job (for example enough
> >> > files
> >> > > > > >available, enough items are available in kafa, or time has
> >>reached
> >> > > > > >     Dag dag = context.createDAG();
> >> > > > > >     dag.addOperator();
> >> > > > > >     dag.addOperator();
> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
> >> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
> >> > > > > >   }
> >> > > > > >}
> >> > > > > >
> >> > > > > >DagHandler will have methods to monitor the final state of
> >> > > > > >application, or to kill the DAG
> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG terminates
> >> > > > > >dagHandler.status()  <== get the status of application.
> >> > > > > >dagHandler.kill() <== kill the running application.
> >> > > > > >dagHandler.shutdown() <== shutdown the application.
> >> > > > > >
> >> > > > > >The more complex Scheduler operators could be written to manage
> >> the
> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
> >> > > > > >
> >> > > > > >Regards,
> >> > > > > >-Tushar.
> >> > > > >
> >> > > > > ________________________________________________________
> >> > > > >
> >> > > > > The information contained in this e-mail is confidential and/or
> >> > > > > proprietary to Capital One and/or its affiliates and may only be
> >> used
> >> > > > > solely in performance of work or services for Capital One. The
> >> > > > information
> >> > > > > transmitted herewith is intended only for use by the individual
> >>or
> >> > > entity
> >> > > > > to which it is addressed. If the reader of this message is not
> >>the
> >> > > > intended
> >> > > > > recipient, you are hereby notified that any review,
> >>retransmission,
> >> > > > > dissemination, distribution, copying or other use of, or taking
> >>of
> >> > any
> >> > > > > action in reliance upon this information is strictly
> >>prohibited. If
> >> > you
> >> > > > > have received this communication in error, please contact the
> >> sender
> >> > > and
> >> > > > > delete the material from your computer.
> >> > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by "Singh, Chandni" <Ch...@capitalone.com>.
Scheduling IMO belongs to App master. Operators can influence it, for eg.
File splitter can indicate that no more file to process.

I don’t understand how that can not integrate with all the aspects-
operability, fault tolerance and security.

Chandni

On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <ch...@datatorrent.com> wrote:

>I think its a good idea to have a scheduling operator when you need to
>start a part of the DAG when some trigger happens (for eg. FileSplitter
>identifying new files in FS) and otherwise bring it down to save
>resources.
>
>On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>timothytiborfarkas@gmail.com> wrote:
>
>> I am in agreement with Chandni. Scheduling a batch job is an API
>>completely
>> independent of a DAG or an operator. It could be used by a commandline
>>tool
>> running on your laptop, a script, or it could happen to be used by an
>> Operator running in a DAG and a StatsListener.
>>
>> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <th...@datatorrent.com>
>> wrote:
>>
>> > Scheduling can be independent, although we have use cases where the
>> > scheduling depends on completion of processing (multi-staged batch
>>jobs
>> > where unused resources need to be freed).
>> >
>> > Both can be accomplished with a stats listener.
>> >
>> > There can be a "scheduling operator" that brings up and removes DAG
>> > fragments as needed.
>> >
>> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>><si...@gmail.com>
>> > wrote:
>> >
>> > > Hi,
>> > > IMO scheduling a job can be independent of any operator while
>> > > StatsListeners are not.  I understand that in a lot of cases
>> input/output
>> > > operators will decide when the job ends but there can be cases when
>> > > scheduling can be independent of it.
>> > >
>> > > Thanks,
>> > > Chandni
>> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <th...@datatorrent.com>
>> wrote:
>> > >
>> > > > This looks like something that coordination wise belongs into the
>> > master
>> > > > and can be done with a shared stats listener.
>> > > >
>> > > > The operator request/response protocol could be used the relay the
>> data
>> > > for
>> > > > the scheduling decisions.
>> > > >
>> > > > Thomas
>> > > >
>> > > >
>> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>> > > > Chandni.Singh@capitalone.com> wrote:
>> > > >
>> > > > > Hi Tushar,
>> > > > >
>> > > > > I have some questions about the use case 2: Batch Support
>> > > > > I don¹t understand the advantages of providing batch support by
>> > having
>> > > an
>> > > > > operator as a scheduler.
>> > > > >
>> > > > > An approach that seemed a little more straightforward to me was
>>to
>> > > expose
>> > > > > an API for scheduler. If there is a scheduler set then the
>>master
>> > uses
>> > > > and
>> > > > > schedules operators. By default there isn¹t any scheduler and
>>the
>> job
>> > > is
>> > > > > run as it is now.
>> > > > >
>> > > > > Maybe this is too simplistic but can you please let me know why
>> > having
>> > > an
>> > > > > operator as a scheduler is a better way?
>> > > > >
>> > > > > Thanks,
>> > > > > Chandni
>> > > > >
>> > > > >
>> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com>
>> > wrote:
>> > > > >
>> > > > > >Hi All,
>> > > > > >
>> > > > > >We have seen few use cases in field which require Apex
>>application
>> > > > > >scheduling based on some condition. This has also came up as
>>part
>> of
>> > > > > >Batch Support in Apex previously
>> > > > > >(
>> > > > >
>> > > >
>> > >
>> >
>> 
>>http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>> 40mail.gmail.com
>> > > %3E)
>> > > > > >. I am proposing following functionality in Apex to help
>> scheduling
>> > > > > >and better resource utilization for batch jobs. Please provide
>> your
>> > > > > >comments.
>> > > > > >
>> > > > > >Usecase 1 - Dynamic Dag modification.
>> > > > > >
>> > > > > >Each operator in DAG consumes yarn resources, sometimes it is
>> > > > > >desirable to return the resources to yarn when no data is
>> available
>> > > > > >for processing, and deploy whole DAG once data starts to
>>appear.
>> For
>> > > > > >this to happen automatically, we will need some data monitoring
>> > > > > >operators running in the DAG to trigger restart and shutdown of
>> the
>> > > > > >operators in the DAG.
>> > > > > >
>> > > > > >Apex already have such api to dynamically change the running
>>dag
>> > > > > >through cli. We could provide similar API available to
>>operators
>> > which
>> > > > > >will trigger dag modification at runtime. This information can
>>be
>> > > > > >passed to master using heartbeat RPC and master will make
>> > > > > >required changed to the DAG. let me know what do you think
>>about
>> > it..
>> > > > > >something like below.
>> > > > > >Context.beginDagChange();
>> > > > > >context.addOperator("o1") <== launch operator from previous
>> > > > check-pointed
>> > > > > >state.
>> > > > > >context.addOperator("o2", new Operator2()) <== create new
>>operator
>> > > > > >context.addStream("s1", "reader.output", "o1.input");
>> > > > > >context.shutdown("o3"); <== delete this and downstream
>>operators
>> > from
>> > > > the
>> > > > > >DAG.
>> > > > > >context.apply();  <== dag changes will be send to master, and
>> master
>> > > > > >will apply these changes.
>> > > > > >
>> > > > > >Similarly API for other functionalities such as locality
>>settings
>> > > > > >needs to be provided.
>> > > > > >
>> > > > > >
>> > > > > >Usecase 2 - Classic Batch Scheduling.
>> > > > > >
>> > > > > >Provide an API callable from operator to launch a DAG. The
>> operator
>> > > > > >will prepare an dag object and submit it to the yarn, the DAG
>>will
>> > be
>> > > > > >scheduled as a new application. This way complex schedulers
>>can be
>> > > > > >written as operators.
>> > > > > >
>> > > > > >public SchedulerOperator implements Operator {
>> > > > > >   void handleIdleTime() {
>> > > > > >      // check of conditions to start a job (for example enough
>> > files
>> > > > > >available, enough items are available in kafa, or time has
>>reached
>> > > > > >     Dag dag = context.createDAG();
>> > > > > >     dag.addOperator();
>> > > > > >     dag.addOperator();
>> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
>> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
>> > > > > >   }
>> > > > > >}
>> > > > > >
>> > > > > >DagHandler will have methods to monitor the final state of
>> > > > > >application, or to kill the DAG
>> > > > > >dagHandler.waitForCompletion() <== wait till the DAG terminates
>> > > > > >dagHandler.status()  <== get the status of application.
>> > > > > >dagHandler.kill() <== kill the running application.
>> > > > > >dagHandler.shutdown() <== shutdown the application.
>> > > > > >
>> > > > > >The more complex Scheduler operators could be written to manage
>> the
>> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>> > > > > >
>> > > > > >Regards,
>> > > > > >-Tushar.
>> > > > >
>> > > > > ________________________________________________________
>> > > > >
>> > > > > The information contained in this e-mail is confidential and/or
>> > > > > proprietary to Capital One and/or its affiliates and may only be
>> used
>> > > > > solely in performance of work or services for Capital One. The
>> > > > information
>> > > > > transmitted herewith is intended only for use by the individual
>>or
>> > > entity
>> > > > > to which it is addressed. If the reader of this message is not
>>the
>> > > > intended
>> > > > > recipient, you are hereby notified that any review,
>>retransmission,
>> > > > > dissemination, distribution, copying or other use of, or taking
>>of
>> > any
>> > > > > action in reliance upon this information is strictly
>>prohibited. If
>> > you
>> > > > > have received this communication in error, please contact the
>> sender
>> > > and
>> > > > > delete the material from your computer.
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Chinmay Kolhatkar <ch...@datatorrent.com>.
I think its a good idea to have a scheduling operator when you need to
start a part of the DAG when some trigger happens (for eg. FileSplitter
identifying new files in FS) and otherwise bring it down to save resources.

On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
timothytiborfarkas@gmail.com> wrote:

> I am in agreement with Chandni. Scheduling a batch job is an API completely
> independent of a DAG or an operator. It could be used by a commandline tool
> running on your laptop, a script, or it could happen to be used by an
> Operator running in a DAG and a StatsListener.
>
> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Scheduling can be independent, although we have use cases where the
> > scheduling depends on completion of processing (multi-staged batch jobs
> > where unused resources need to be freed).
> >
> > Both can be accomplished with a stats listener.
> >
> > There can be a "scheduling operator" that brings up and removes DAG
> > fragments as needed.
> >
> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh <si...@gmail.com>
> > wrote:
> >
> > > Hi,
> > > IMO scheduling a job can be independent of any operator while
> > > StatsListeners are not.  I understand that in a lot of cases
> input/output
> > > operators will decide when the job ends but there can be cases when
> > > scheduling can be independent of it.
> > >
> > > Thanks,
> > > Chandni
> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <th...@datatorrent.com>
> wrote:
> > >
> > > > This looks like something that coordination wise belongs into the
> > master
> > > > and can be done with a shared stats listener.
> > > >
> > > > The operator request/response protocol could be used the relay the
> data
> > > for
> > > > the scheduling decisions.
> > > >
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> > > > Chandni.Singh@capitalone.com> wrote:
> > > >
> > > > > Hi Tushar,
> > > > >
> > > > > I have some questions about the use case 2: Batch Support
> > > > > I don¹t understand the advantages of providing batch support by
> > having
> > > an
> > > > > operator as a scheduler.
> > > > >
> > > > > An approach that seemed a little more straightforward to me was to
> > > expose
> > > > > an API for scheduler. If there is a scheduler set then the master
> > uses
> > > > and
> > > > > schedules operators. By default there isn¹t any scheduler and the
> job
> > > is
> > > > > run as it is now.
> > > > >
> > > > > Maybe this is too simplistic but can you please let me know why
> > having
> > > an
> > > > > operator as a scheduler is a better way?
> > > > >
> > > > > Thanks,
> > > > > Chandni
> > > > >
> > > > >
> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com>
> > wrote:
> > > > >
> > > > > >Hi All,
> > > > > >
> > > > > >We have seen few use cases in field which require Apex application
> > > > > >scheduling based on some condition. This has also came up as part
> of
> > > > > >Batch Support in Apex previously
> > > > > >(
> > > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
> 40mail.gmail.com
> > > %3E)
> > > > > >. I am proposing following functionality in Apex to help
> scheduling
> > > > > >and better resource utilization for batch jobs. Please provide
> your
> > > > > >comments.
> > > > > >
> > > > > >Usecase 1 - Dynamic Dag modification.
> > > > > >
> > > > > >Each operator in DAG consumes yarn resources, sometimes it is
> > > > > >desirable to return the resources to yarn when no data is
> available
> > > > > >for processing, and deploy whole DAG once data starts to appear.
> For
> > > > > >this to happen automatically, we will need some data monitoring
> > > > > >operators running in the DAG to trigger restart and shutdown of
> the
> > > > > >operators in the DAG.
> > > > > >
> > > > > >Apex already have such api to dynamically change the running dag
> > > > > >through cli. We could provide similar API available to operators
> > which
> > > > > >will trigger dag modification at runtime. This information can be
> > > > > >passed to master using heartbeat RPC and master will make
> > > > > >required changed to the DAG. let me know what do you think about
> > it..
> > > > > >something like below.
> > > > > >Context.beginDagChange();
> > > > > >context.addOperator("o1") <== launch operator from previous
> > > > check-pointed
> > > > > >state.
> > > > > >context.addOperator("o2", new Operator2()) <== create new operator
> > > > > >context.addStream("s1", "reader.output", "o1.input");
> > > > > >context.shutdown("o3"); <== delete this and downstream operators
> > from
> > > > the
> > > > > >DAG.
> > > > > >context.apply();  <== dag changes will be send to master, and
> master
> > > > > >will apply these changes.
> > > > > >
> > > > > >Similarly API for other functionalities such as locality settings
> > > > > >needs to be provided.
> > > > > >
> > > > > >
> > > > > >Usecase 2 - Classic Batch Scheduling.
> > > > > >
> > > > > >Provide an API callable from operator to launch a DAG. The
> operator
> > > > > >will prepare an dag object and submit it to the yarn, the DAG will
> > be
> > > > > >scheduled as a new application. This way complex schedulers can be
> > > > > >written as operators.
> > > > > >
> > > > > >public SchedulerOperator implements Operator {
> > > > > >   void handleIdleTime() {
> > > > > >      // check of conditions to start a job (for example enough
> > files
> > > > > >available, enough items are available in kafa, or time has reached
> > > > > >     Dag dag = context.createDAG();
> > > > > >     dag.addOperator();
> > > > > >     dag.addOperator();
> > > > > >     LaunchOptions lOptions = new LaunchOptions();
> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
> > > > > >   }
> > > > > >}
> > > > > >
> > > > > >DagHandler will have methods to monitor the final state of
> > > > > >application, or to kill the DAG
> > > > > >dagHandler.waitForCompletion() <== wait till the DAG terminates
> > > > > >dagHandler.status()  <== get the status of application.
> > > > > >dagHandler.kill() <== kill the running application.
> > > > > >dagHandler.shutdown() <== shutdown the application.
> > > > > >
> > > > > >The more complex Scheduler operators could be written to manage
> the
> > > > > >workflows, i.e DAG of DAGs. using these APIs.
> > > > > >
> > > > > >Regards,
> > > > > >-Tushar.
> > > > >
> > > > > ________________________________________________________
> > > > >
> > > > > The information contained in this e-mail is confidential and/or
> > > > > proprietary to Capital One and/or its affiliates and may only be
> used
> > > > > solely in performance of work or services for Capital One. The
> > > > information
> > > > > transmitted herewith is intended only for use by the individual or
> > > entity
> > > > > to which it is addressed. If the reader of this message is not the
> > > > intended
> > > > > recipient, you are hereby notified that any review, retransmission,
> > > > > dissemination, distribution, copying or other use of, or taking of
> > any
> > > > > action in reliance upon this information is strictly prohibited. If
> > you
> > > > > have received this communication in error, please contact the
> sender
> > > and
> > > > > delete the material from your computer.
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Thomas Weise <th...@datatorrent.com>.
This isn't just an API discussion.  We are looking for a solution that
integrates with all aspects, including operability, fault tolerance and
security.

The underlying capability we are looking at is dynamic DAG change, which
happens to be only exposed to the CLI at this point.

Thomas

On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
timothytiborfarkas@gmail.com> wrote:

> I am in agreement with Chandni. Scheduling a batch job is an API completely
> independent of a DAG or an operator. It could be used by a commandline tool
> running on your laptop, a script, or it could happen to be used by an
> Operator running in a DAG and a StatsListener.
>
> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Scheduling can be independent, although we have use cases where the
> > scheduling depends on completion of processing (multi-staged batch jobs
> > where unused resources need to be freed).
> >
> > Both can be accomplished with a stats listener.
> >
> > There can be a "scheduling operator" that brings up and removes DAG
> > fragments as needed.
> >
> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh <si...@gmail.com>
> > wrote:
> >
> > > Hi,
> > > IMO scheduling a job can be independent of any operator while
> > > StatsListeners are not.  I understand that in a lot of cases
> input/output
> > > operators will decide when the job ends but there can be cases when
> > > scheduling can be independent of it.
> > >
> > > Thanks,
> > > Chandni
> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <th...@datatorrent.com>
> wrote:
> > >
> > > > This looks like something that coordination wise belongs into the
> > master
> > > > and can be done with a shared stats listener.
> > > >
> > > > The operator request/response protocol could be used the relay the
> data
> > > for
> > > > the scheduling decisions.
> > > >
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> > > > Chandni.Singh@capitalone.com> wrote:
> > > >
> > > > > Hi Tushar,
> > > > >
> > > > > I have some questions about the use case 2: Batch Support
> > > > > I don¹t understand the advantages of providing batch support by
> > having
> > > an
> > > > > operator as a scheduler.
> > > > >
> > > > > An approach that seemed a little more straightforward to me was to
> > > expose
> > > > > an API for scheduler. If there is a scheduler set then the master
> > uses
> > > > and
> > > > > schedules operators. By default there isn¹t any scheduler and the
> job
> > > is
> > > > > run as it is now.
> > > > >
> > > > > Maybe this is too simplistic but can you please let me know why
> > having
> > > an
> > > > > operator as a scheduler is a better way?
> > > > >
> > > > > Thanks,
> > > > > Chandni
> > > > >
> > > > >
> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com>
> > wrote:
> > > > >
> > > > > >Hi All,
> > > > > >
> > > > > >We have seen few use cases in field which require Apex application
> > > > > >scheduling based on some condition. This has also came up as part
> of
> > > > > >Batch Support in Apex previously
> > > > > >(
> > > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
> 40mail.gmail.com
> > > %3E)
> > > > > >. I am proposing following functionality in Apex to help
> scheduling
> > > > > >and better resource utilization for batch jobs. Please provide
> your
> > > > > >comments.
> > > > > >
> > > > > >Usecase 1 - Dynamic Dag modification.
> > > > > >
> > > > > >Each operator in DAG consumes yarn resources, sometimes it is
> > > > > >desirable to return the resources to yarn when no data is
> available
> > > > > >for processing, and deploy whole DAG once data starts to appear.
> For
> > > > > >this to happen automatically, we will need some data monitoring
> > > > > >operators running in the DAG to trigger restart and shutdown of
> the
> > > > > >operators in the DAG.
> > > > > >
> > > > > >Apex already have such api to dynamically change the running dag
> > > > > >through cli. We could provide similar API available to operators
> > which
> > > > > >will trigger dag modification at runtime. This information can be
> > > > > >passed to master using heartbeat RPC and master will make
> > > > > >required changed to the DAG. let me know what do you think about
> > it..
> > > > > >something like below.
> > > > > >Context.beginDagChange();
> > > > > >context.addOperator("o1") <== launch operator from previous
> > > > check-pointed
> > > > > >state.
> > > > > >context.addOperator("o2", new Operator2()) <== create new operator
> > > > > >context.addStream("s1", "reader.output", "o1.input");
> > > > > >context.shutdown("o3"); <== delete this and downstream operators
> > from
> > > > the
> > > > > >DAG.
> > > > > >context.apply();  <== dag changes will be send to master, and
> master
> > > > > >will apply these changes.
> > > > > >
> > > > > >Similarly API for other functionalities such as locality settings
> > > > > >needs to be provided.
> > > > > >
> > > > > >
> > > > > >Usecase 2 - Classic Batch Scheduling.
> > > > > >
> > > > > >Provide an API callable from operator to launch a DAG. The
> operator
> > > > > >will prepare an dag object and submit it to the yarn, the DAG will
> > be
> > > > > >scheduled as a new application. This way complex schedulers can be
> > > > > >written as operators.
> > > > > >
> > > > > >public SchedulerOperator implements Operator {
> > > > > >   void handleIdleTime() {
> > > > > >      // check of conditions to start a job (for example enough
> > files
> > > > > >available, enough items are available in kafa, or time has reached
> > > > > >     Dag dag = context.createDAG();
> > > > > >     dag.addOperator();
> > > > > >     dag.addOperator();
> > > > > >     LaunchOptions lOptions = new LaunchOptions();
> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
> > > > > >   }
> > > > > >}
> > > > > >
> > > > > >DagHandler will have methods to monitor the final state of
> > > > > >application, or to kill the DAG
> > > > > >dagHandler.waitForCompletion() <== wait till the DAG terminates
> > > > > >dagHandler.status()  <== get the status of application.
> > > > > >dagHandler.kill() <== kill the running application.
> > > > > >dagHandler.shutdown() <== shutdown the application.
> > > > > >
> > > > > >The more complex Scheduler operators could be written to manage
> the
> > > > > >workflows, i.e DAG of DAGs. using these APIs.
> > > > > >
> > > > > >Regards,
> > > > > >-Tushar.
> > > > >
> > > > > ________________________________________________________
> > > > >
> > > > > The information contained in this e-mail is confidential and/or
> > > > > proprietary to Capital One and/or its affiliates and may only be
> used
> > > > > solely in performance of work or services for Capital One. The
> > > > information
> > > > > transmitted herewith is intended only for use by the individual or
> > > entity
> > > > > to which it is addressed. If the reader of this message is not the
> > > > intended
> > > > > recipient, you are hereby notified that any review, retransmission,
> > > > > dissemination, distribution, copying or other use of, or taking of
> > any
> > > > > action in reliance upon this information is strictly prohibited. If
> > you
> > > > > have received this communication in error, please contact the
> sender
> > > and
> > > > > delete the material from your computer.
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Timothy Farkas <ti...@gmail.com>.
I am in agreement with Chandni. Scheduling a batch job is an API completely
independent of a DAG or an operator. It could be used by a commandline tool
running on your laptop, a script, or it could happen to be used by an
Operator running in a DAG and a StatsListener.

On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <th...@datatorrent.com>
wrote:

> Scheduling can be independent, although we have use cases where the
> scheduling depends on completion of processing (multi-staged batch jobs
> where unused resources need to be freed).
>
> Both can be accomplished with a stats listener.
>
> There can be a "scheduling operator" that brings up and removes DAG
> fragments as needed.
>
> On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh <si...@gmail.com>
> wrote:
>
> > Hi,
> > IMO scheduling a job can be independent of any operator while
> > StatsListeners are not.  I understand that in a lot of cases input/output
> > operators will decide when the job ends but there can be cases when
> > scheduling can be independent of it.
> >
> > Thanks,
> > Chandni
> > On Jun 21, 2016 12:12 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
> >
> > > This looks like something that coordination wise belongs into the
> master
> > > and can be done with a shared stats listener.
> > >
> > > The operator request/response protocol could be used the relay the data
> > for
> > > the scheduling decisions.
> > >
> > > Thomas
> > >
> > >
> > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> > > Chandni.Singh@capitalone.com> wrote:
> > >
> > > > Hi Tushar,
> > > >
> > > > I have some questions about the use case 2: Batch Support
> > > > I don¹t understand the advantages of providing batch support by
> having
> > an
> > > > operator as a scheduler.
> > > >
> > > > An approach that seemed a little more straightforward to me was to
> > expose
> > > > an API for scheduler. If there is a scheduler set then the master
> uses
> > > and
> > > > schedules operators. By default there isn¹t any scheduler and the job
> > is
> > > > run as it is now.
> > > >
> > > > Maybe this is too simplistic but can you please let me know why
> having
> > an
> > > > operator as a scheduler is a better way?
> > > >
> > > > Thanks,
> > > > Chandni
> > > >
> > > >
> > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com>
> wrote:
> > > >
> > > > >Hi All,
> > > > >
> > > > >We have seen few use cases in field which require Apex application
> > > > >scheduling based on some condition. This has also came up as part of
> > > > >Batch Support in Apex previously
> > > > >(
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com
> > %3E)
> > > > >. I am proposing following functionality in Apex to help scheduling
> > > > >and better resource utilization for batch jobs. Please provide your
> > > > >comments.
> > > > >
> > > > >Usecase 1 - Dynamic Dag modification.
> > > > >
> > > > >Each operator in DAG consumes yarn resources, sometimes it is
> > > > >desirable to return the resources to yarn when no data is available
> > > > >for processing, and deploy whole DAG once data starts to appear. For
> > > > >this to happen automatically, we will need some data monitoring
> > > > >operators running in the DAG to trigger restart and shutdown of the
> > > > >operators in the DAG.
> > > > >
> > > > >Apex already have such api to dynamically change the running dag
> > > > >through cli. We could provide similar API available to operators
> which
> > > > >will trigger dag modification at runtime. This information can be
> > > > >passed to master using heartbeat RPC and master will make
> > > > >required changed to the DAG. let me know what do you think about
> it..
> > > > >something like below.
> > > > >Context.beginDagChange();
> > > > >context.addOperator("o1") <== launch operator from previous
> > > check-pointed
> > > > >state.
> > > > >context.addOperator("o2", new Operator2()) <== create new operator
> > > > >context.addStream("s1", "reader.output", "o1.input");
> > > > >context.shutdown("o3"); <== delete this and downstream operators
> from
> > > the
> > > > >DAG.
> > > > >context.apply();  <== dag changes will be send to master, and master
> > > > >will apply these changes.
> > > > >
> > > > >Similarly API for other functionalities such as locality settings
> > > > >needs to be provided.
> > > > >
> > > > >
> > > > >Usecase 2 - Classic Batch Scheduling.
> > > > >
> > > > >Provide an API callable from operator to launch a DAG. The operator
> > > > >will prepare an dag object and submit it to the yarn, the DAG will
> be
> > > > >scheduled as a new application. This way complex schedulers can be
> > > > >written as operators.
> > > > >
> > > > >public SchedulerOperator implements Operator {
> > > > >   void handleIdleTime() {
> > > > >      // check of conditions to start a job (for example enough
> files
> > > > >available, enough items are available in kafa, or time has reached
> > > > >     Dag dag = context.createDAG();
> > > > >     dag.addOperator();
> > > > >     dag.addOperator();
> > > > >     LaunchOptions lOptions = new LaunchOptions();
> > > > >     lOptions.oldId = ""; // start for this checkpoint.
> > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
> > > > >   }
> > > > >}
> > > > >
> > > > >DagHandler will have methods to monitor the final state of
> > > > >application, or to kill the DAG
> > > > >dagHandler.waitForCompletion() <== wait till the DAG terminates
> > > > >dagHandler.status()  <== get the status of application.
> > > > >dagHandler.kill() <== kill the running application.
> > > > >dagHandler.shutdown() <== shutdown the application.
> > > > >
> > > > >The more complex Scheduler operators could be written to manage the
> > > > >workflows, i.e DAG of DAGs. using these APIs.
> > > > >
> > > > >Regards,
> > > > >-Tushar.
> > > >
> > > > ________________________________________________________
> > > >
> > > > The information contained in this e-mail is confidential and/or
> > > > proprietary to Capital One and/or its affiliates and may only be used
> > > > solely in performance of work or services for Capital One. The
> > > information
> > > > transmitted herewith is intended only for use by the individual or
> > entity
> > > > to which it is addressed. If the reader of this message is not the
> > > intended
> > > > recipient, you are hereby notified that any review, retransmission,
> > > > dissemination, distribution, copying or other use of, or taking of
> any
> > > > action in reliance upon this information is strictly prohibited. If
> you
> > > > have received this communication in error, please contact the sender
> > and
> > > > delete the material from your computer.
> > > >
> > > >
> > >
> >
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Thomas Weise <th...@datatorrent.com>.
Scheduling can be independent, although we have use cases where the
scheduling depends on completion of processing (multi-staged batch jobs
where unused resources need to be freed).

Both can be accomplished with a stats listener.

There can be a "scheduling operator" that brings up and removes DAG
fragments as needed.

On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh <si...@gmail.com>
wrote:

> Hi,
> IMO scheduling a job can be independent of any operator while
> StatsListeners are not.  I understand that in a lot of cases input/output
> operators will decide when the job ends but there can be cases when
> scheduling can be independent of it.
>
> Thanks,
> Chandni
> On Jun 21, 2016 12:12 PM, "Thomas Weise" <th...@datatorrent.com> wrote:
>
> > This looks like something that coordination wise belongs into the master
> > and can be done with a shared stats listener.
> >
> > The operator request/response protocol could be used the relay the data
> for
> > the scheduling decisions.
> >
> > Thomas
> >
> >
> > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> > Chandni.Singh@capitalone.com> wrote:
> >
> > > Hi Tushar,
> > >
> > > I have some questions about the use case 2: Batch Support
> > > I don¹t understand the advantages of providing batch support by having
> an
> > > operator as a scheduler.
> > >
> > > An approach that seemed a little more straightforward to me was to
> expose
> > > an API for scheduler. If there is a scheduler set then the master uses
> > and
> > > schedules operators. By default there isn¹t any scheduler and the job
> is
> > > run as it is now.
> > >
> > > Maybe this is too simplistic but can you please let me know why having
> an
> > > operator as a scheduler is a better way?
> > >
> > > Thanks,
> > > Chandni
> > >
> > >
> > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com> wrote:
> > >
> > > >Hi All,
> > > >
> > > >We have seen few use cases in field which require Apex application
> > > >scheduling based on some condition. This has also came up as part of
> > > >Batch Support in Apex previously
> > > >(
> > >
> >
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com
> %3E)
> > > >. I am proposing following functionality in Apex to help scheduling
> > > >and better resource utilization for batch jobs. Please provide your
> > > >comments.
> > > >
> > > >Usecase 1 - Dynamic Dag modification.
> > > >
> > > >Each operator in DAG consumes yarn resources, sometimes it is
> > > >desirable to return the resources to yarn when no data is available
> > > >for processing, and deploy whole DAG once data starts to appear. For
> > > >this to happen automatically, we will need some data monitoring
> > > >operators running in the DAG to trigger restart and shutdown of the
> > > >operators in the DAG.
> > > >
> > > >Apex already have such api to dynamically change the running dag
> > > >through cli. We could provide similar API available to operators which
> > > >will trigger dag modification at runtime. This information can be
> > > >passed to master using heartbeat RPC and master will make
> > > >required changed to the DAG. let me know what do you think about it..
> > > >something like below.
> > > >Context.beginDagChange();
> > > >context.addOperator("o1") <== launch operator from previous
> > check-pointed
> > > >state.
> > > >context.addOperator("o2", new Operator2()) <== create new operator
> > > >context.addStream("s1", "reader.output", "o1.input");
> > > >context.shutdown("o3"); <== delete this and downstream operators from
> > the
> > > >DAG.
> > > >context.apply();  <== dag changes will be send to master, and master
> > > >will apply these changes.
> > > >
> > > >Similarly API for other functionalities such as locality settings
> > > >needs to be provided.
> > > >
> > > >
> > > >Usecase 2 - Classic Batch Scheduling.
> > > >
> > > >Provide an API callable from operator to launch a DAG. The operator
> > > >will prepare an dag object and submit it to the yarn, the DAG will be
> > > >scheduled as a new application. This way complex schedulers can be
> > > >written as operators.
> > > >
> > > >public SchedulerOperator implements Operator {
> > > >   void handleIdleTime() {
> > > >      // check of conditions to start a job (for example enough files
> > > >available, enough items are available in kafa, or time has reached
> > > >     Dag dag = context.createDAG();
> > > >     dag.addOperator();
> > > >     dag.addOperator();
> > > >     LaunchOptions lOptions = new LaunchOptions();
> > > >     lOptions.oldId = ""; // start for this checkpoint.
> > > >     DagHandler dagHandler = context.submit(dag, lOptions);
> > > >   }
> > > >}
> > > >
> > > >DagHandler will have methods to monitor the final state of
> > > >application, or to kill the DAG
> > > >dagHandler.waitForCompletion() <== wait till the DAG terminates
> > > >dagHandler.status()  <== get the status of application.
> > > >dagHandler.kill() <== kill the running application.
> > > >dagHandler.shutdown() <== shutdown the application.
> > > >
> > > >The more complex Scheduler operators could be written to manage the
> > > >workflows, i.e DAG of DAGs. using these APIs.
> > > >
> > > >Regards,
> > > >-Tushar.
> > >
> > > ________________________________________________________
> > >
> > > The information contained in this e-mail is confidential and/or
> > > proprietary to Capital One and/or its affiliates and may only be used
> > > solely in performance of work or services for Capital One. The
> > information
> > > transmitted herewith is intended only for use by the individual or
> entity
> > > to which it is addressed. If the reader of this message is not the
> > intended
> > > recipient, you are hereby notified that any review, retransmission,
> > > dissemination, distribution, copying or other use of, or taking of any
> > > action in reliance upon this information is strictly prohibited. If you
> > > have received this communication in error, please contact the sender
> and
> > > delete the material from your computer.
> > >
> > >
> >
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Chandni Singh <si...@gmail.com>.
Hi,
IMO scheduling a job can be independent of any operator while
StatsListeners are not.  I understand that in a lot of cases input/output
operators will decide when the job ends but there can be cases when
scheduling can be independent of it.

Thanks,
Chandni
On Jun 21, 2016 12:12 PM, "Thomas Weise" <th...@datatorrent.com> wrote:

> This looks like something that coordination wise belongs into the master
> and can be done with a shared stats listener.
>
> The operator request/response protocol could be used the relay the data for
> the scheduling decisions.
>
> Thomas
>
>
> On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> Chandni.Singh@capitalone.com> wrote:
>
> > Hi Tushar,
> >
> > I have some questions about the use case 2: Batch Support
> > I don¹t understand the advantages of providing batch support by having an
> > operator as a scheduler.
> >
> > An approach that seemed a little more straightforward to me was to expose
> > an API for scheduler. If there is a scheduler set then the master uses
> and
> > schedules operators. By default there isn¹t any scheduler and the job is
> > run as it is now.
> >
> > Maybe this is too simplistic but can you please let me know why having an
> > operator as a scheduler is a better way?
> >
> > Thanks,
> > Chandni
> >
> >
> > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com> wrote:
> >
> > >Hi All,
> > >
> > >We have seen few use cases in field which require Apex application
> > >scheduling based on some condition. This has also came up as part of
> > >Batch Support in Apex previously
> > >(
> >
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E)
> > >. I am proposing following functionality in Apex to help scheduling
> > >and better resource utilization for batch jobs. Please provide your
> > >comments.
> > >
> > >Usecase 1 - Dynamic Dag modification.
> > >
> > >Each operator in DAG consumes yarn resources, sometimes it is
> > >desirable to return the resources to yarn when no data is available
> > >for processing, and deploy whole DAG once data starts to appear. For
> > >this to happen automatically, we will need some data monitoring
> > >operators running in the DAG to trigger restart and shutdown of the
> > >operators in the DAG.
> > >
> > >Apex already have such api to dynamically change the running dag
> > >through cli. We could provide similar API available to operators which
> > >will trigger dag modification at runtime. This information can be
> > >passed to master using heartbeat RPC and master will make
> > >required changed to the DAG. let me know what do you think about it..
> > >something like below.
> > >Context.beginDagChange();
> > >context.addOperator("o1") <== launch operator from previous
> check-pointed
> > >state.
> > >context.addOperator("o2", new Operator2()) <== create new operator
> > >context.addStream("s1", "reader.output", "o1.input");
> > >context.shutdown("o3"); <== delete this and downstream operators from
> the
> > >DAG.
> > >context.apply();  <== dag changes will be send to master, and master
> > >will apply these changes.
> > >
> > >Similarly API for other functionalities such as locality settings
> > >needs to be provided.
> > >
> > >
> > >Usecase 2 - Classic Batch Scheduling.
> > >
> > >Provide an API callable from operator to launch a DAG. The operator
> > >will prepare an dag object and submit it to the yarn, the DAG will be
> > >scheduled as a new application. This way complex schedulers can be
> > >written as operators.
> > >
> > >public SchedulerOperator implements Operator {
> > >   void handleIdleTime() {
> > >      // check of conditions to start a job (for example enough files
> > >available, enough items are available in kafa, or time has reached
> > >     Dag dag = context.createDAG();
> > >     dag.addOperator();
> > >     dag.addOperator();
> > >     LaunchOptions lOptions = new LaunchOptions();
> > >     lOptions.oldId = ""; // start for this checkpoint.
> > >     DagHandler dagHandler = context.submit(dag, lOptions);
> > >   }
> > >}
> > >
> > >DagHandler will have methods to monitor the final state of
> > >application, or to kill the DAG
> > >dagHandler.waitForCompletion() <== wait till the DAG terminates
> > >dagHandler.status()  <== get the status of application.
> > >dagHandler.kill() <== kill the running application.
> > >dagHandler.shutdown() <== shutdown the application.
> > >
> > >The more complex Scheduler operators could be written to manage the
> > >workflows, i.e DAG of DAGs. using these APIs.
> > >
> > >Regards,
> > >-Tushar.
> >
> > ________________________________________________________
> >
> > The information contained in this e-mail is confidential and/or
> > proprietary to Capital One and/or its affiliates and may only be used
> > solely in performance of work or services for Capital One. The
> information
> > transmitted herewith is intended only for use by the individual or entity
> > to which it is addressed. If the reader of this message is not the
> intended
> > recipient, you are hereby notified that any review, retransmission,
> > dissemination, distribution, copying or other use of, or taking of any
> > action in reliance upon this information is strictly prohibited. If you
> > have received this communication in error, please contact the sender and
> > delete the material from your computer.
> >
> >
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Thomas Weise <th...@datatorrent.com>.
This looks like something that coordination wise belongs into the master
and can be done with a shared stats listener.

The operator request/response protocol could be used the relay the data for
the scheduling decisions.

Thomas


On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
Chandni.Singh@capitalone.com> wrote:

> Hi Tushar,
>
> I have some questions about the use case 2: Batch Support
> I don¹t understand the advantages of providing batch support by having an
> operator as a scheduler.
>
> An approach that seemed a little more straightforward to me was to expose
> an API for scheduler. If there is a scheduler set then the master uses and
> schedules operators. By default there isn¹t any scheduler and the job is
> run as it is now.
>
> Maybe this is too simplistic but can you please let me know why having an
> operator as a scheduler is a better way?
>
> Thanks,
> Chandni
>
>
> On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com> wrote:
>
> >Hi All,
> >
> >We have seen few use cases in field which require Apex application
> >scheduling based on some condition. This has also came up as part of
> >Batch Support in Apex previously
> >(
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E)
> >. I am proposing following functionality in Apex to help scheduling
> >and better resource utilization for batch jobs. Please provide your
> >comments.
> >
> >Usecase 1 - Dynamic Dag modification.
> >
> >Each operator in DAG consumes yarn resources, sometimes it is
> >desirable to return the resources to yarn when no data is available
> >for processing, and deploy whole DAG once data starts to appear. For
> >this to happen automatically, we will need some data monitoring
> >operators running in the DAG to trigger restart and shutdown of the
> >operators in the DAG.
> >
> >Apex already have such api to dynamically change the running dag
> >through cli. We could provide similar API available to operators which
> >will trigger dag modification at runtime. This information can be
> >passed to master using heartbeat RPC and master will make
> >required changed to the DAG. let me know what do you think about it..
> >something like below.
> >Context.beginDagChange();
> >context.addOperator("o1") <== launch operator from previous check-pointed
> >state.
> >context.addOperator("o2", new Operator2()) <== create new operator
> >context.addStream("s1", "reader.output", "o1.input");
> >context.shutdown("o3"); <== delete this and downstream operators from the
> >DAG.
> >context.apply();  <== dag changes will be send to master, and master
> >will apply these changes.
> >
> >Similarly API for other functionalities such as locality settings
> >needs to be provided.
> >
> >
> >Usecase 2 - Classic Batch Scheduling.
> >
> >Provide an API callable from operator to launch a DAG. The operator
> >will prepare an dag object and submit it to the yarn, the DAG will be
> >scheduled as a new application. This way complex schedulers can be
> >written as operators.
> >
> >public SchedulerOperator implements Operator {
> >   void handleIdleTime() {
> >      // check of conditions to start a job (for example enough files
> >available, enough items are available in kafa, or time has reached
> >     Dag dag = context.createDAG();
> >     dag.addOperator();
> >     dag.addOperator();
> >     LaunchOptions lOptions = new LaunchOptions();
> >     lOptions.oldId = ""; // start for this checkpoint.
> >     DagHandler dagHandler = context.submit(dag, lOptions);
> >   }
> >}
> >
> >DagHandler will have methods to monitor the final state of
> >application, or to kill the DAG
> >dagHandler.waitForCompletion() <== wait till the DAG terminates
> >dagHandler.status()  <== get the status of application.
> >dagHandler.kill() <== kill the running application.
> >dagHandler.shutdown() <== shutdown the application.
> >
> >The more complex Scheduler operators could be written to manage the
> >workflows, i.e DAG of DAGs. using these APIs.
> >
> >Regards,
> >-Tushar.
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by "Singh, Chandni" <Ch...@capitalone.com>.
Hi Tushar,

I have some questions about the use case 2: Batch Support
I don¹t understand the advantages of providing batch support by having an
operator as a scheduler.

An approach that seemed a little more straightforward to me was to expose
an API for scheduler. If there is a scheduler set then the master uses and
schedules operators. By default there isn¹t any scheduler and the job is
run as it is now.

Maybe this is too simplistic but can you please let me know why having an
operator as a scheduler is a better way?

Thanks,
Chandni
 

On 6/21/16, 11:09 AM, "Tushar Gosavi" <tu...@datatorrent.com> wrote:

>Hi All,
>
>We have seen few use cases in field which require Apex application
>scheduling based on some condition. This has also came up as part of
>Batch Support in Apex previously
>(http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E)
>. I am proposing following functionality in Apex to help scheduling
>and better resource utilization for batch jobs. Please provide your
>comments.
>
>Usecase 1 - Dynamic Dag modification.
>
>Each operator in DAG consumes yarn resources, sometimes it is
>desirable to return the resources to yarn when no data is available
>for processing, and deploy whole DAG once data starts to appear. For
>this to happen automatically, we will need some data monitoring
>operators running in the DAG to trigger restart and shutdown of the
>operators in the DAG.
>
>Apex already have such api to dynamically change the running dag
>through cli. We could provide similar API available to operators which
>will trigger dag modification at runtime. This information can be
>passed to master using heartbeat RPC and master will make
>required changed to the DAG. let me know what do you think about it..
>something like below.
>Context.beginDagChange();
>context.addOperator("o1") <== launch operator from previous check-pointed
>state.
>context.addOperator("o2", new Operator2()) <== create new operator
>context.addStream("s1", "reader.output", "o1.input");
>context.shutdown("o3"); <== delete this and downstream operators from the
>DAG.
>context.apply();  <== dag changes will be send to master, and master
>will apply these changes.
>
>Similarly API for other functionalities such as locality settings
>needs to be provided.
>
>
>Usecase 2 - Classic Batch Scheduling.
>
>Provide an API callable from operator to launch a DAG. The operator
>will prepare an dag object and submit it to the yarn, the DAG will be
>scheduled as a new application. This way complex schedulers can be
>written as operators.
>
>public SchedulerOperator implements Operator {
>   void handleIdleTime() {
>      // check of conditions to start a job (for example enough files
>available, enough items are available in kafa, or time has reached
>     Dag dag = context.createDAG();
>     dag.addOperator();
>     dag.addOperator();
>     LaunchOptions lOptions = new LaunchOptions();
>     lOptions.oldId = ""; // start for this checkpoint.
>     DagHandler dagHandler = context.submit(dag, lOptions);
>   }
>}
>
>DagHandler will have methods to monitor the final state of
>application, or to kill the DAG
>dagHandler.waitForCompletion() <== wait till the DAG terminates
>dagHandler.status()  <== get the status of application.
>dagHandler.kill() <== kill the running application.
>dagHandler.shutdown() <== shutdown the application.
>
>The more complex Scheduler operators could be written to manage the
>workflows, i.e DAG of DAGs. using these APIs.
>
>Regards,
>-Tushar.

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.


Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

Posted by Sandesh Hegde <sa...@datatorrent.com>.
For the usecase 1, is it possible to avoid changing the Context? Can we
have something along the lines of "StramToNodeRequest" ?

On Tue, Jun 21, 2016 at 11:09 AM Tushar Gosavi <tu...@datatorrent.com>
wrote:

> Hi All,
>
> We have seen few use cases in field which require Apex application
> scheduling based on some condition. This has also came up as part of
> Batch Support in Apex previously
> (
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDPXNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E
> )
> . I am proposing following functionality in Apex to help scheduling
> and better resource utilization for batch jobs. Please provide your
> comments.
>
> Usecase 1 - Dynamic Dag modification.
>
> Each operator in DAG consumes yarn resources, sometimes it is
> desirable to return the resources to yarn when no data is available
> for processing, and deploy whole DAG once data starts to appear. For
> this to happen automatically, we will need some data monitoring
> operators running in the DAG to trigger restart and shutdown of the
> operators in the DAG.
>
> Apex already have such api to dynamically change the running dag
> through cli. We could provide similar API available to operators which
> will trigger dag modification at runtime. This information can be
> passed to master using heartbeat RPC and master will make
> required changed to the DAG. let me know what do you think about it..
> something like below.
> Context.beginDagChange();
> context.addOperator("o1") <== launch operator from previous check-pointed
> state.
> context.addOperator("o2", new Operator2()) <== create new operator
> context.addStream("s1", "reader.output", "o1.input");
> context.shutdown("o3"); <== delete this and downstream operators from the
> DAG.
> context.apply();  <== dag changes will be send to master, and master
> will apply these changes.
>
> Similarly API for other functionalities such as locality settings
> needs to be provided.
>
>
> Usecase 2 - Classic Batch Scheduling.
>
> Provide an API callable from operator to launch a DAG. The operator
> will prepare an dag object and submit it to the yarn, the DAG will be
> scheduled as a new application. This way complex schedulers can be
> written as operators.
>
> public SchedulerOperator implements Operator {
>    void handleIdleTime() {
>       // check of conditions to start a job (for example enough files
> available, enough items are available in kafa, or time has reached
>      Dag dag = context.createDAG();
>      dag.addOperator();
>      dag.addOperator();
>      LaunchOptions lOptions = new LaunchOptions();
>      lOptions.oldId = ""; // start for this checkpoint.
>      DagHandler dagHandler = context.submit(dag, lOptions);
>    }
> }
>
> DagHandler will have methods to monitor the final state of
> application, or to kill the DAG
> dagHandler.waitForCompletion() <== wait till the DAG terminates
> dagHandler.status()  <== get the status of application.
> dagHandler.kill() <== kill the running application.
> dagHandler.shutdown() <== shutdown the application.
>
> The more complex Scheduler operators could be written to manage the
> workflows, i.e DAG of DAGs. using these APIs.
>
> Regards,
> -Tushar.
>