You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by xinyu liu <xi...@gmail.com> on 2017/05/02 01:58:02 UTC

Re: [DISCUSS] SEP-2: ApplicationRunner Design

Looked again at Chris's beam-samza-runner implementation. Seems
LocalApplicationRunner.run() should be asynchronous too. Current
implementation is actually using a latch to wait for the StreamProcessors
to finish, which seems unnecessary. And we can provide a waitUntilFinish()
counterpart to the user. I created
https://issues.apache.org/jira/browse/SAMZA-1252 to track it.

Thanks,
Xinyu

On Fri, Apr 28, 2017 at 5:55 PM, xinyu liu <xi...@gmail.com> wrote:

> Right, option #2 seems redundant for defining streams after further
> discussion here. StreamSpec itself is flexible enough to achieve both
> static and programmatic specification of the stream. Agree it's not
> convenient for now (pretty obvious after looking at your bsr
> beam.runners.samza.wrapper), and we should provide similar predefined
> convenient wrappers for user to create the StreamSpec. In your case
> something like BoundedStreamSpec.file(....) which will generate the system
> and serialize the data as you did.
>
> We're still thinking the callback proposed in #2 can be useful for
> requirement #6: injecting other user objects in run time, such as stores
> and metrics. To simplify the user understanding further, I think we might
> hide the ApplicationRunner and expose the StreamApplication instead, which
> will make requirement #3 not user facing. So the API becomes like:
>
>   StreamApplication app = StreamApplication.local(config)
>     .init (env -> {
>        env.registerStore("my-store", new MyStoreFactory());
>        env.registerMetricsReporter("my-reporte", new
> MyMetricsReporterFactory());
>     })
>     .withLifeCycleListener(myListener);
>
>   app.input(BoundedStreamSpec.create("/sample/input.txt"))
>         .map(...)
>         .window(...)
>
>   app.run();
>
> For requirement #5, I add a .withLifeCycleListener() in the API, which can
> trigger the callbacks with life cycle events.
>
> For #4: distribution of the jars will be what we have today using the Yarn
> localization with a remote store like artifactory or http server. We
> discussed where to put the graph serialization. The current thinking is to
> define a general interface which can backed by a remote store, like Kafka,
> artifactory or http server. For Kafka, it's straightforward but we will
> have the size limit or cut it by ourselves. For the other two, we need to
> investigate whether we can easily upload jars to our artifactory and
> localizing it with Yarn. Any opinions on this?
>
> Thanks,
> Xinyu
>
> On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt <
> cpettitt@linkedin.com.invalid> wrote:
>
>> Your proposal for #1 looks good.
>>
>> I'm not quite how to reconcile the proposals for #1 and #2. In #1 you add
>> the stream spec straight onto the runner while in #2 you do it in a
>> callback. If it is either-or, #1 looks a lot better for my purposes.
>>
>> For #4 what mechanism are you using to distribute the JARs? Can you use
>> the
>> same mechanism to distribute the serialized graph?
>>
>> On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu <xi...@gmail.com>
>> wrote:
>>
>> > btw, I will get to SAMZA-1246 as soon as possible.
>> >
>> > Thanks,
>> > Xinyu
>> >
>> > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xi...@gmail.com>
>> wrote:
>> >
>> > > Let me try to capture the updated requirements:
>> > >
>> > > 1. Set up input streams outside StreamGraph, and treat graph building
>> as
>> > a
>> > > library (*Fluent, Beam*).
>> > >
>> > > 2. Improve ease of use for ApplicationRunner to avoid complex
>> > > configurations such as zkCoordinator, zkCoordinationService.
>> > (*Standalone*).
>> > > Provide some programmatic way to tweak them in the API.
>> > >
>> > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We
>> can
>> > > have one or more implementations but it's hidden from the users.
>> > >
>> > > 4. Separate StreamGraph from runtime environment so it can be
>> serialized
>> > (*Beam,
>> > > Yarn*)
>> > >
>> > > 5. Better life cycle management of application, parity with
>> > > StreamProcessor (*Standalone, Beam*). Stats should include exception
>> in
>> > > case of failure (tracked in SAMZA-1246).
>> > >
>> > > 6. Support injecting user-defined objects into ApplicationRunner.
>> > >
>> > > Prateek and I iterate on the ApplilcationRunner API based on these
>> > > requirements. To support #1, we can set up input streams on the runner
>> > > level, which returns the MessageStream and allows graph building
>> > > afterwards. The code looks like below:
>> > >
>> > >   ApplicationRunner runner = ApplicationRunner.local();
>> > >   runner.input(streamSpec)
>> > >             .map(..)
>> > >             .window(...)
>> > >   runner.run();
>> > >
>> > > StreamSpec is the building block for setting up streams here. It can
>> be
>> > > set up in different ways:
>> > >
>> > >   - Direct creation of stream spec, like runner.input(new
>> StreamSpec(id,
>> > > system, stream))
>> > >   - Load from streamId from env or config, like
>> > runner.input(runner.env().
>> > > getStreamSpec(id))
>> > >   - Canned Spec which generates the StreamSpec with id, system and
>> stream
>> > > to minimize the configuration. For example, CollectionSpec.create(new
>> > > ArrayList[]{1,2,3,4}), which will auto generate the system and stream
>> in
>> > > the spec.
>> > >
>> > > To support #2, we need to be able to set up StreamSpec-related objects
>> > and
>> > > factories programmatically in env. Suppose we have the following
>> before
>> > > runner.input(...):
>> > >
>> > >   runner.setup(env /* a writable interface of env*/ -> {
>> > >     env.setStreamSpec(streamId, streamSpec);
>> > >     env.setSystem(systemName, systemFactory);
>> > >   })
>> > >
>> > > runner.setup(->) also provides setup for stores and other runtime
>> stuff
>> > > needed for the execution. The setup should be able to serialized to
>> > config.
>> > > For #6, I haven't figured out a good way to inject user-defined
>> objects
>> > > here yet.
>> > >
>> > > With this API, we should be able to also support #4. For remote
>> > > runner.run(), the operator user classes/lamdas in the StreamGraph
>> need to
>> > > be serialized. As today, the existing option is to serialize to a
>> stream,
>> > > either the coordinator stream or the pipeline control stream, which
>> will
>> > > have the size limit per message. Do you see RPC as an option?
>> > >
>> > > For this version of API, seems we don't need the StreamApplication
>> > wrapper
>> > > as well as exposing the StreamGraph. Do you think we are on the right
>> > path?
>> > >
>> > > Thanks,
>> > > Xinyu
>> > >
>> > >
>> > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
>> > > cpettitt@linkedin.com.invalid> wrote:
>> > >
>> > >> That should have been:
>> > >>
>> > >> For #1, Beam doesn't have a hard requirement...
>> > >>
>> > >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <
>> cpettitt@linkedin.com>
>> > >> wrote:
>> > >>
>> > >> > For #1, I doesn't have a hard requirement for any change from
>> Samza. A
>> > >> > very nice to have would be to allow the input systems to be set up
>> at
>> > >> the
>> > >> > same time as the rest of the StreamGraph. An even nicer to have
>> would
>> > >> be to
>> > >> > do away with the callback based approach and treat graph building
>> as a
>> > >> > library, a la Beam and Flink.
>> > >> >
>> > >> > For the moment I've worked around the two pass requirement (once
>> for
>> > >> > config, once for StreamGraph) by introducing an IR layer between
>> Beam
>> > >> and
>> > >> > the Samza Fluent translation. The IR layer is convenient
>> independent
>> > of
>> > >> > this problem because it makes it easier to switch between the
>> Fluent
>> > and
>> > >> > low-level APIs.
>> > >> >
>> > >> >
>> > >> > For #4, if we had parity with StreamProcessor for lifecycle we'd
>> be in
>> > >> > great shape. One additional issue with the status call that I may
>> not
>> > >> have
>> > >> > mentioned is that it provides you no way to get at the cause of
>> > failure.
>> > >> > The StreamProcessor API does allow this via the callback.
>> > >> >
>> > >> >
>> > >> > Re. #2 and #3, I'm a big fan of getting rid of the extra
>> configuration
>> > >> > indirection you currently have to jump through (this is also
>> related
>> > to
>> > >> > system consumer configuration from #1. It makes it much easier to
>> > >> discover
>> > >> > what the configurable parameters are too, if we provide some
>> > >> programmatic
>> > >> > way to tweak them in the API - which can turn into config under the
>> > >> hood.
>> > >> >
>> > >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu <xi...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> >> Let me give a shot to summarize the requirements for
>> > ApplicationRunner
>> > >> we
>> > >> >> have discussed so far:
>> > >> >>
>> > >> >> - Support environment for passing in user-defined objects (streams
>> > >> >> potentially) into ApplicationRunner (*Beam*)
>> > >> >>
>> > >> >> - Improve ease of use for ApplicationRunner to avoid complex
>> > >> >> configurations
>> > >> >> such as zkCoordinator, zkCoordinationService. (*Standalone*)
>> > >> >>
>> > >> >> - Clean up ApplicationRunner into a single interface (*Fluent*).
>> We
>> > can
>> > >> >> have one or more implementations but it's hidden from the users.
>> > >> >>
>> > >> >> - Separate StreamGraph from environment so it can be serializable
>> > >> (*Beam,
>> > >> >> Yarn*)
>> > >> >>
>> > >> >> - Better life cycle management of application, including
>> > >> >> start/stop/stats (*Standalone,
>> > >> >> Beam*)
>> > >> >>
>> > >> >>
>> > >> >> One way to address 2 and 3 is to provide pre-packaged runner using
>> > >> static
>> > >> >> factory methods, and the return type will be the ApplicationRunner
>> > >> >> interface. So we can have:
>> > >> >>
>> > >> >>   ApplicationRunner runner = ApplicationRunner.zk() /
>> > >> >> ApplicationRunner.local()
>> > >> >> / ApplicationRunner.remote() / ApplicationRunner.test().
>> > >> >>
>> > >> >> Internally we will package the right configs and run-time
>> environment
>> > >> with
>> > >> >> the runner. For example, ApplicationRunner.zk() will define all
>> the
>> > >> >> configs
>> > >> >> needed for zk coordination.
>> > >> >>
>> > >> >> To support 1 and 4, can we pass in a lambda function in the
>> runner,
>> > and
>> > >> >> then we can run the stream graph? Like the following:
>> > >> >>
>> > >> >>   ApplicationRunner.zk().env(config ->
>> > environment).run(streamGraph);
>> > >> >>
>> > >> >> Then we need a way to pass the environment into the StreamGraph.
>> This
>> > >> can
>> > >> >> be done by either adding an extra parameter to each operator, or
>> > have a
>> > >> >> getEnv() function in the MessageStream, which seems to be pretty
>> > hacky.
>> > >> >>
>> > >> >> What do you think?
>> > >> >>
>> > >> >> Thanks,
>> > >> >> Xinyu
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
>> > >> >> pmaheshwari@linkedin.com.invalid> wrote:
>> > >> >>
>> > >> >> > Thanks for putting this together Yi!
>> > >> >> >
>> > >> >> > I agree with Jake, it does seem like there are a few too many
>> > moving
>> > >> >> parts
>> > >> >> > here. That said, the problem being solved is pretty broad, so
>> let
>> > me
>> > >> >> try to
>> > >> >> > summarize my current understanding of the requirements. Please
>> > >> correct
>> > >> >> me
>> > >> >> > if I'm wrong or missing something.
>> > >> >> >
>> > >> >> > ApplicationRunner and JobRunner first, ignoring test environment
>> > for
>> > >> the
>> > >> >> > moment.
>> > >> >> > ApplicationRunner:
>> > >> >> > 1. Create execution plan: Same in Standalone and Yarn
>> > >> >> > 2. Create intermediate streams: Same logic but different leader
>> > >> election
>> > >> >> > (ZK-based or pre-configured in standalone, AM in Yarn).
>> > >> >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in
>> Yarn.
>> > >> >> >
>> > >> >> > JobRunner:
>> > >> >> > 1. Run the StreamProcessors: Same process in Standalone & Test.
>> > >> Remote
>> > >> >> host
>> > >> >> > in Yarn.
>> > >> >> >
>> > >> >> > To get a single ApplicationRunner implementation, like Jake
>> > >> suggested,
>> > >> >> we
>> > >> >> > need to make leader election and JobRunner implementation
>> > pluggable.
>> > >> >> > There's still the question of whether ApplicationRunner#run API
>> > >> should
>> > >> >> be
>> > >> >> > blocking or non-blocking. It has to be non-blocking in YARN. We
>> > want
>> > >> it
>> > >> >> to
>> > >> >> > be blocking in standalone, but seems like the main reason is
>> ease
>> > of
>> > >> use
>> > >> >> > when launched from main(). I'd prefer making it consitently
>> > >> non-blocking
>> > >> >> > instead, esp. since in embedded standalone mode (where the
>> > processor
>> > >> is
>> > >> >> > running in another container) a blocking API would not be
>> > >> user-friendly
>> > >> >> > either. If not, we can add both run and runBlocking.
>> > >> >> >
>> > >> >> > Coming to RuntimeEnvironment, which is the least clear to me so
>> > far:
>> > >> >> > 1. I don't think RuntimeEnvironment should be responsible for
>> > >> providing
>> > >> >> > StreamSpecs for streamIds - they can be obtained with a
>> config/util
>> > >> >> class.
>> > >> >> > The StreamProcessor should only know about logical streamIds and
>> > the
>> > >> >> > streamId <-> actual stream mapping should happen within the
>> > >> >> > SystemProducer/Consumer/Admins provided by the
>> RuntimeEnvironment.
>> > >> >> > 2. There's also other components that the user might be
>> interested
>> > in
>> > >> >> > providing implementations of in embedded Standalone mode (i.e.,
>> not
>> > >> >> just in
>> > >> >> > tests) - MetricsRegistry and JMXServer come to mind.
>> > >> >> > 3. Most importantly, it's not clear to me who creates and
>> manages
>> > the
>> > >> >> > RuntimeEnvironment. It seems like it should be the
>> > ApplicationRunner
>> > >> or
>> > >> >> the
>> > >> >> > user because of (2) above and because StreamManager also needs
>> > >> access to
>> > >> >> > SystemAdmins for creating intermediate streams which users might
>> > >> want to
>> > >> >> > mock. But it also needs to be passed down to the
>> StreamProcessor -
>> > >> how
>> > >> >> > would this work on Yarn?
>> > >> >> >
>> > >> >> > I think we should figure out how to integrate RuntimeEnvironment
>> > with
>> > >> >> > ApplicationRunner before we can make a call on one vs. multiple
>> > >> >> > ApplicationRunner implementations. If we do keep
>> > >> LocalApplicationRunner
>> > >> >> and
>> > >> >> > RemoteApplication (and TestApplicationRunner) separate, agree
>> with
>> > >> Jake
>> > >> >> > that we should remove the JobRunners and roll them up into the
>> > >> >> respective
>> > >> >> > ApplicationRunners.
>> > >> >> >
>> > >> >> > - Prateek
>> > >> >> >
>> > >> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes <
>> jacob.maes@gmail.com
>> > >
>> > >> >> wrote:
>> > >> >> >
>> > >> >> > > Thanks for the SEP!
>> > >> >> > >
>> > >> >> > > +1 on introducing these new components
>> > >> >> > > -1 on the current definition of their roles (see Design
>> feedback
>> > >> >> below)
>> > >> >> > >
>> > >> >> > > *Design*
>> > >> >> > >
>> > >> >> > >    - If LocalJobRunner and RemoteJobRunner handle the
>> different
>> > >> >> methods
>> > >> >> > of
>> > >> >> > >    launching a Job, what additional value do the different
>> types
>> > of
>> > >> >> > >    ApplicationRunner and RuntimeEnvironment provide? It seems
>> > like
>> > >> a
>> > >> >> red
>> > >> >> > > flag
>> > >> >> > >    that all 3 would need to change from environment to
>> > >> environment. It
>> > >> >> > >    indicates that they don't have proper modularity. The
>> > >> >> > > call-sequence-figures
>> > >> >> > >    support this; LocalApplicationRunner and
>> > RemoteApplicationRunner
>> > >> >> make
>> > >> >> > > the
>> > >> >> > >    same calls and the diagram only varies after
>> jobRunner.start()
>> > >> >> > >    - As far as I can tell, the only difference between Local
>> and
>> > >> >> Remote
>> > >> >> > >    ApplicationRunner is that one is blocking and the other is
>> > >> >> > > non-blocking. If
>> > >> >> > >    that's all they're for then either the names should be
>> changed
>> > >> to
>> > >> >> > > reflect
>> > >> >> > >    this, or they should be combined into one ApplicationRunner
>> > and
>> > >> >> just
>> > >> >> > > expose
>> > >> >> > >    separate methods for run() and runBlocking()
>> > >> >> > >    - There isn't much detail on why the main() methods for
>> > >> >> Local/Remote
>> > >> >> > >    have such different implementations, how they receive the
>> > >> >> Application
>> > >> >> > >    (direct vs config), and concretely how the deployment
>> scripts,
>> > >> if
>> > >> >> any,
>> > >> >> > >    should interact with them.
>> > >> >> > >
>> > >> >> > >
>> > >> >> > > *Style*
>> > >> >> > >
>> > >> >> > >    - nit: None of the 11 uses of the word "actual" in the doc
>> are
>> > >> >> > > *actually*
>> > >> >> > >    needed. :-)
>> > >> >> > >    - nit: Colors of the runtime blocks in the diagrams are
>> > >> >> unconventional
>> > >> >> > >    and a little distracting. Reminds me of nai won bao. Now
>> I'm
>> > >> >> hungry.
>> > >> >> > :-)
>> > >> >> > >    - Prefer the name "ExecutionEnvironment" over
>> > >> "RuntimeEnvironment".
>> > >> >> > The
>> > >> >> > >    term "execution environment" is used
>> > >> >> > >    - The code comparisons for the ApplicationRunners are not
>> > >> >> > apples-apples.
>> > >> >> > >    The local runner example is an application that USES the
>> local
>> > >> >> runner.
>> > >> >> > > The
>> > >> >> > >    remote runner example is the just the runner code itself.
>> So,
>> > >> it's
>> > >> >> not
>> > >> >> > >    readily apparent that we're comparing the main() methods
>> and
>> > not
>> > >> >> the
>> > >> >> > >    application itself.
>> > >> >> > >
>> > >> >> > >
>> > >> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan <ni...@gmail.com>
>> > >> wrote:
>> > >> >> > >
>> > >> >> > > > Made some updates to clarify the role and functions of
>> > >> >> > RuntimeEnvironment
>> > >> >> > > > in SEP-2.
>> > >> >> > > >
>> > >> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan <
>> nickpan47@gmail.com>
>> > >> >> wrote:
>> > >> >> > > >
>> > >> >> > > > > Hi, everyone,
>> > >> >> > > > >
>> > >> >> > > > > In light of new features such as fluent API and standalone
>> > that
>> > >> >> > > introduce
>> > >> >> > > > > new deployment / application launch models in Samza, I
>> > created
>> > >> a
>> > >> >> new
>> > >> >> > > > SEP-2
>> > >> >> > > > > to address the new use cases. SEP-2 link:
>> > https://cwiki.apache
>> > >> .
>> > >> >> > > > > org/confluence/display/SAMZA/SEP-2%3A+ApplicationRunner+
>> > Design
>> > >> >> > > > >
>> > >> >> > > > > Please take a look and give feedbacks!
>> > >> >> > > > >
>> > >> >> > > > > Thanks!
>> > >> >> > > > >
>> > >> >> > > > > -Yi
>> > >> >> > > > >
>> > >> >> > > >
>> > >> >> > >
>> > >> >> >
>> > >> >>
>> > >> >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Re: [DISCUSS] SEP-2: ApplicationRunner Design

Posted by Chris Pettitt <cp...@linkedin.com.INVALID>.
Thanks Xinyu, that would be a nice improvement!

- Chris

On Mon, May 1, 2017 at 9:58 PM, xinyu liu <xi...@gmail.com> wrote:

> Looked again at Chris's beam-samza-runner implementation. Seems
> LocalApplicationRunner.run() should be asynchronous too. Current
> implementation is actually using a latch to wait for the StreamProcessors
> to finish, which seems unnecessary. And we can provide a waitUntilFinish()
> counterpart to the user. I created
> https://issues.apache.org/jira/browse/SAMZA-1252 to track it.
>
> Thanks,
> Xinyu
>
> On Fri, Apr 28, 2017 at 5:55 PM, xinyu liu <xi...@gmail.com> wrote:
>
> > Right, option #2 seems redundant for defining streams after further
> > discussion here. StreamSpec itself is flexible enough to achieve both
> > static and programmatic specification of the stream. Agree it's not
> > convenient for now (pretty obvious after looking at your bsr
> > beam.runners.samza.wrapper), and we should provide similar predefined
> > convenient wrappers for user to create the StreamSpec. In your case
> > something like BoundedStreamSpec.file(....) which will generate the
> system
> > and serialize the data as you did.
> >
> > We're still thinking the callback proposed in #2 can be useful for
> > requirement #6: injecting other user objects in run time, such as stores
> > and metrics. To simplify the user understanding further, I think we might
> > hide the ApplicationRunner and expose the StreamApplication instead,
> which
> > will make requirement #3 not user facing. So the API becomes like:
> >
> >   StreamApplication app = StreamApplication.local(config)
> >     .init (env -> {
> >        env.registerStore("my-store", new MyStoreFactory());
> >        env.registerMetricsReporter("my-reporte", new
> > MyMetricsReporterFactory());
> >     })
> >     .withLifeCycleListener(myListener);
> >
> >   app.input(BoundedStreamSpec.create("/sample/input.txt"))
> >         .map(...)
> >         .window(...)
> >
> >   app.run();
> >
> > For requirement #5, I add a .withLifeCycleListener() in the API, which
> can
> > trigger the callbacks with life cycle events.
> >
> > For #4: distribution of the jars will be what we have today using the
> Yarn
> > localization with a remote store like artifactory or http server. We
> > discussed where to put the graph serialization. The current thinking is
> to
> > define a general interface which can backed by a remote store, like
> Kafka,
> > artifactory or http server. For Kafka, it's straightforward but we will
> > have the size limit or cut it by ourselves. For the other two, we need to
> > investigate whether we can easily upload jars to our artifactory and
> > localizing it with Yarn. Any opinions on this?
> >
> > Thanks,
> > Xinyu
> >
> > On Fri, Apr 28, 2017 at 11:34 AM, Chris Pettitt <
> > cpettitt@linkedin.com.invalid> wrote:
> >
> >> Your proposal for #1 looks good.
> >>
> >> I'm not quite how to reconcile the proposals for #1 and #2. In #1 you
> add
> >> the stream spec straight onto the runner while in #2 you do it in a
> >> callback. If it is either-or, #1 looks a lot better for my purposes.
> >>
> >> For #4 what mechanism are you using to distribute the JARs? Can you use
> >> the
> >> same mechanism to distribute the serialized graph?
> >>
> >> On Fri, Apr 28, 2017 at 12:14 AM, xinyu liu <xi...@gmail.com>
> >> wrote:
> >>
> >> > btw, I will get to SAMZA-1246 as soon as possible.
> >> >
> >> > Thanks,
> >> > Xinyu
> >> >
> >> > On Thu, Apr 27, 2017 at 9:11 PM, xinyu liu <xi...@gmail.com>
> >> wrote:
> >> >
> >> > > Let me try to capture the updated requirements:
> >> > >
> >> > > 1. Set up input streams outside StreamGraph, and treat graph
> building
> >> as
> >> > a
> >> > > library (*Fluent, Beam*).
> >> > >
> >> > > 2. Improve ease of use for ApplicationRunner to avoid complex
> >> > > configurations such as zkCoordinator, zkCoordinationService.
> >> > (*Standalone*).
> >> > > Provide some programmatic way to tweak them in the API.
> >> > >
> >> > > 3. Clean up ApplicationRunner into a single interface (*Fluent*). We
> >> can
> >> > > have one or more implementations but it's hidden from the users.
> >> > >
> >> > > 4. Separate StreamGraph from runtime environment so it can be
> >> serialized
> >> > (*Beam,
> >> > > Yarn*)
> >> > >
> >> > > 5. Better life cycle management of application, parity with
> >> > > StreamProcessor (*Standalone, Beam*). Stats should include exception
> >> in
> >> > > case of failure (tracked in SAMZA-1246).
> >> > >
> >> > > 6. Support injecting user-defined objects into ApplicationRunner.
> >> > >
> >> > > Prateek and I iterate on the ApplilcationRunner API based on these
> >> > > requirements. To support #1, we can set up input streams on the
> runner
> >> > > level, which returns the MessageStream and allows graph building
> >> > > afterwards. The code looks like below:
> >> > >
> >> > >   ApplicationRunner runner = ApplicationRunner.local();
> >> > >   runner.input(streamSpec)
> >> > >             .map(..)
> >> > >             .window(...)
> >> > >   runner.run();
> >> > >
> >> > > StreamSpec is the building block for setting up streams here. It can
> >> be
> >> > > set up in different ways:
> >> > >
> >> > >   - Direct creation of stream spec, like runner.input(new
> >> StreamSpec(id,
> >> > > system, stream))
> >> > >   - Load from streamId from env or config, like
> >> > runner.input(runner.env().
> >> > > getStreamSpec(id))
> >> > >   - Canned Spec which generates the StreamSpec with id, system and
> >> stream
> >> > > to minimize the configuration. For example,
> CollectionSpec.create(new
> >> > > ArrayList[]{1,2,3,4}), which will auto generate the system and
> stream
> >> in
> >> > > the spec.
> >> > >
> >> > > To support #2, we need to be able to set up StreamSpec-related
> objects
> >> > and
> >> > > factories programmatically in env. Suppose we have the following
> >> before
> >> > > runner.input(...):
> >> > >
> >> > >   runner.setup(env /* a writable interface of env*/ -> {
> >> > >     env.setStreamSpec(streamId, streamSpec);
> >> > >     env.setSystem(systemName, systemFactory);
> >> > >   })
> >> > >
> >> > > runner.setup(->) also provides setup for stores and other runtime
> >> stuff
> >> > > needed for the execution. The setup should be able to serialized to
> >> > config.
> >> > > For #6, I haven't figured out a good way to inject user-defined
> >> objects
> >> > > here yet.
> >> > >
> >> > > With this API, we should be able to also support #4. For remote
> >> > > runner.run(), the operator user classes/lamdas in the StreamGraph
> >> need to
> >> > > be serialized. As today, the existing option is to serialize to a
> >> stream,
> >> > > either the coordinator stream or the pipeline control stream, which
> >> will
> >> > > have the size limit per message. Do you see RPC as an option?
> >> > >
> >> > > For this version of API, seems we don't need the StreamApplication
> >> > wrapper
> >> > > as well as exposing the StreamGraph. Do you think we are on the
> right
> >> > path?
> >> > >
> >> > > Thanks,
> >> > > Xinyu
> >> > >
> >> > >
> >> > > On Thu, Apr 27, 2017 at 6:09 AM, Chris Pettitt <
> >> > > cpettitt@linkedin.com.invalid> wrote:
> >> > >
> >> > >> That should have been:
> >> > >>
> >> > >> For #1, Beam doesn't have a hard requirement...
> >> > >>
> >> > >> On Thu, Apr 27, 2017 at 9:07 AM, Chris Pettitt <
> >> cpettitt@linkedin.com>
> >> > >> wrote:
> >> > >>
> >> > >> > For #1, I doesn't have a hard requirement for any change from
> >> Samza. A
> >> > >> > very nice to have would be to allow the input systems to be set
> up
> >> at
> >> > >> the
> >> > >> > same time as the rest of the StreamGraph. An even nicer to have
> >> would
> >> > >> be to
> >> > >> > do away with the callback based approach and treat graph building
> >> as a
> >> > >> > library, a la Beam and Flink.
> >> > >> >
> >> > >> > For the moment I've worked around the two pass requirement (once
> >> for
> >> > >> > config, once for StreamGraph) by introducing an IR layer between
> >> Beam
> >> > >> and
> >> > >> > the Samza Fluent translation. The IR layer is convenient
> >> independent
> >> > of
> >> > >> > this problem because it makes it easier to switch between the
> >> Fluent
> >> > and
> >> > >> > low-level APIs.
> >> > >> >
> >> > >> >
> >> > >> > For #4, if we had parity with StreamProcessor for lifecycle we'd
> >> be in
> >> > >> > great shape. One additional issue with the status call that I may
> >> not
> >> > >> have
> >> > >> > mentioned is that it provides you no way to get at the cause of
> >> > failure.
> >> > >> > The StreamProcessor API does allow this via the callback.
> >> > >> >
> >> > >> >
> >> > >> > Re. #2 and #3, I'm a big fan of getting rid of the extra
> >> configuration
> >> > >> > indirection you currently have to jump through (this is also
> >> related
> >> > to
> >> > >> > system consumer configuration from #1. It makes it much easier to
> >> > >> discover
> >> > >> > what the configurable parameters are too, if we provide some
> >> > >> programmatic
> >> > >> > way to tweak them in the API - which can turn into config under
> the
> >> > >> hood.
> >> > >> >
> >> > >> > On Wed, Apr 26, 2017 at 9:20 PM, xinyu liu <
> xinyuliu.us@gmail.com>
> >> > >> wrote:
> >> > >> >
> >> > >> >> Let me give a shot to summarize the requirements for
> >> > ApplicationRunner
> >> > >> we
> >> > >> >> have discussed so far:
> >> > >> >>
> >> > >> >> - Support environment for passing in user-defined objects
> (streams
> >> > >> >> potentially) into ApplicationRunner (*Beam*)
> >> > >> >>
> >> > >> >> - Improve ease of use for ApplicationRunner to avoid complex
> >> > >> >> configurations
> >> > >> >> such as zkCoordinator, zkCoordinationService. (*Standalone*)
> >> > >> >>
> >> > >> >> - Clean up ApplicationRunner into a single interface (*Fluent*).
> >> We
> >> > can
> >> > >> >> have one or more implementations but it's hidden from the users.
> >> > >> >>
> >> > >> >> - Separate StreamGraph from environment so it can be
> serializable
> >> > >> (*Beam,
> >> > >> >> Yarn*)
> >> > >> >>
> >> > >> >> - Better life cycle management of application, including
> >> > >> >> start/stop/stats (*Standalone,
> >> > >> >> Beam*)
> >> > >> >>
> >> > >> >>
> >> > >> >> One way to address 2 and 3 is to provide pre-packaged runner
> using
> >> > >> static
> >> > >> >> factory methods, and the return type will be the
> ApplicationRunner
> >> > >> >> interface. So we can have:
> >> > >> >>
> >> > >> >>   ApplicationRunner runner = ApplicationRunner.zk() /
> >> > >> >> ApplicationRunner.local()
> >> > >> >> / ApplicationRunner.remote() / ApplicationRunner.test().
> >> > >> >>
> >> > >> >> Internally we will package the right configs and run-time
> >> environment
> >> > >> with
> >> > >> >> the runner. For example, ApplicationRunner.zk() will define all
> >> the
> >> > >> >> configs
> >> > >> >> needed for zk coordination.
> >> > >> >>
> >> > >> >> To support 1 and 4, can we pass in a lambda function in the
> >> runner,
> >> > and
> >> > >> >> then we can run the stream graph? Like the following:
> >> > >> >>
> >> > >> >>   ApplicationRunner.zk().env(config ->
> >> > environment).run(streamGraph);
> >> > >> >>
> >> > >> >> Then we need a way to pass the environment into the StreamGraph.
> >> This
> >> > >> can
> >> > >> >> be done by either adding an extra parameter to each operator, or
> >> > have a
> >> > >> >> getEnv() function in the MessageStream, which seems to be pretty
> >> > hacky.
> >> > >> >>
> >> > >> >> What do you think?
> >> > >> >>
> >> > >> >> Thanks,
> >> > >> >> Xinyu
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >>
> >> > >> >> On Sun, Apr 23, 2017 at 11:01 PM, Prateek Maheshwari <
> >> > >> >> pmaheshwari@linkedin.com.invalid> wrote:
> >> > >> >>
> >> > >> >> > Thanks for putting this together Yi!
> >> > >> >> >
> >> > >> >> > I agree with Jake, it does seem like there are a few too many
> >> > moving
> >> > >> >> parts
> >> > >> >> > here. That said, the problem being solved is pretty broad, so
> >> let
> >> > me
> >> > >> >> try to
> >> > >> >> > summarize my current understanding of the requirements. Please
> >> > >> correct
> >> > >> >> me
> >> > >> >> > if I'm wrong or missing something.
> >> > >> >> >
> >> > >> >> > ApplicationRunner and JobRunner first, ignoring test
> environment
> >> > for
> >> > >> the
> >> > >> >> > moment.
> >> > >> >> > ApplicationRunner:
> >> > >> >> > 1. Create execution plan: Same in Standalone and Yarn
> >> > >> >> > 2. Create intermediate streams: Same logic but different
> leader
> >> > >> election
> >> > >> >> > (ZK-based or pre-configured in standalone, AM in Yarn).
> >> > >> >> > 3. Run jobs: In JVM in standalone. Submit to the cluster in
> >> Yarn.
> >> > >> >> >
> >> > >> >> > JobRunner:
> >> > >> >> > 1. Run the StreamProcessors: Same process in Standalone &
> Test.
> >> > >> Remote
> >> > >> >> host
> >> > >> >> > in Yarn.
> >> > >> >> >
> >> > >> >> > To get a single ApplicationRunner implementation, like Jake
> >> > >> suggested,
> >> > >> >> we
> >> > >> >> > need to make leader election and JobRunner implementation
> >> > pluggable.
> >> > >> >> > There's still the question of whether ApplicationRunner#run
> API
> >> > >> should
> >> > >> >> be
> >> > >> >> > blocking or non-blocking. It has to be non-blocking in YARN.
> We
> >> > want
> >> > >> it
> >> > >> >> to
> >> > >> >> > be blocking in standalone, but seems like the main reason is
> >> ease
> >> > of
> >> > >> use
> >> > >> >> > when launched from main(). I'd prefer making it consitently
> >> > >> non-blocking
> >> > >> >> > instead, esp. since in embedded standalone mode (where the
> >> > processor
> >> > >> is
> >> > >> >> > running in another container) a blocking API would not be
> >> > >> user-friendly
> >> > >> >> > either. If not, we can add both run and runBlocking.
> >> > >> >> >
> >> > >> >> > Coming to RuntimeEnvironment, which is the least clear to me
> so
> >> > far:
> >> > >> >> > 1. I don't think RuntimeEnvironment should be responsible for
> >> > >> providing
> >> > >> >> > StreamSpecs for streamIds - they can be obtained with a
> >> config/util
> >> > >> >> class.
> >> > >> >> > The StreamProcessor should only know about logical streamIds
> and
> >> > the
> >> > >> >> > streamId <-> actual stream mapping should happen within the
> >> > >> >> > SystemProducer/Consumer/Admins provided by the
> >> RuntimeEnvironment.
> >> > >> >> > 2. There's also other components that the user might be
> >> interested
> >> > in
> >> > >> >> > providing implementations of in embedded Standalone mode
> (i.e.,
> >> not
> >> > >> >> just in
> >> > >> >> > tests) - MetricsRegistry and JMXServer come to mind.
> >> > >> >> > 3. Most importantly, it's not clear to me who creates and
> >> manages
> >> > the
> >> > >> >> > RuntimeEnvironment. It seems like it should be the
> >> > ApplicationRunner
> >> > >> or
> >> > >> >> the
> >> > >> >> > user because of (2) above and because StreamManager also needs
> >> > >> access to
> >> > >> >> > SystemAdmins for creating intermediate streams which users
> might
> >> > >> want to
> >> > >> >> > mock. But it also needs to be passed down to the
> >> StreamProcessor -
> >> > >> how
> >> > >> >> > would this work on Yarn?
> >> > >> >> >
> >> > >> >> > I think we should figure out how to integrate
> RuntimeEnvironment
> >> > with
> >> > >> >> > ApplicationRunner before we can make a call on one vs.
> multiple
> >> > >> >> > ApplicationRunner implementations. If we do keep
> >> > >> LocalApplicationRunner
> >> > >> >> and
> >> > >> >> > RemoteApplication (and TestApplicationRunner) separate, agree
> >> with
> >> > >> Jake
> >> > >> >> > that we should remove the JobRunners and roll them up into the
> >> > >> >> respective
> >> > >> >> > ApplicationRunners.
> >> > >> >> >
> >> > >> >> > - Prateek
> >> > >> >> >
> >> > >> >> > On Thu, Apr 20, 2017 at 10:06 AM, Jacob Maes <
> >> jacob.maes@gmail.com
> >> > >
> >> > >> >> wrote:
> >> > >> >> >
> >> > >> >> > > Thanks for the SEP!
> >> > >> >> > >
> >> > >> >> > > +1 on introducing these new components
> >> > >> >> > > -1 on the current definition of their roles (see Design
> >> feedback
> >> > >> >> below)
> >> > >> >> > >
> >> > >> >> > > *Design*
> >> > >> >> > >
> >> > >> >> > >    - If LocalJobRunner and RemoteJobRunner handle the
> >> different
> >> > >> >> methods
> >> > >> >> > of
> >> > >> >> > >    launching a Job, what additional value do the different
> >> types
> >> > of
> >> > >> >> > >    ApplicationRunner and RuntimeEnvironment provide? It
> seems
> >> > like
> >> > >> a
> >> > >> >> red
> >> > >> >> > > flag
> >> > >> >> > >    that all 3 would need to change from environment to
> >> > >> environment. It
> >> > >> >> > >    indicates that they don't have proper modularity. The
> >> > >> >> > > call-sequence-figures
> >> > >> >> > >    support this; LocalApplicationRunner and
> >> > RemoteApplicationRunner
> >> > >> >> make
> >> > >> >> > > the
> >> > >> >> > >    same calls and the diagram only varies after
> >> jobRunner.start()
> >> > >> >> > >    - As far as I can tell, the only difference between Local
> >> and
> >> > >> >> Remote
> >> > >> >> > >    ApplicationRunner is that one is blocking and the other
> is
> >> > >> >> > > non-blocking. If
> >> > >> >> > >    that's all they're for then either the names should be
> >> changed
> >> > >> to
> >> > >> >> > > reflect
> >> > >> >> > >    this, or they should be combined into one
> ApplicationRunner
> >> > and
> >> > >> >> just
> >> > >> >> > > expose
> >> > >> >> > >    separate methods for run() and runBlocking()
> >> > >> >> > >    - There isn't much detail on why the main() methods for
> >> > >> >> Local/Remote
> >> > >> >> > >    have such different implementations, how they receive the
> >> > >> >> Application
> >> > >> >> > >    (direct vs config), and concretely how the deployment
> >> scripts,
> >> > >> if
> >> > >> >> any,
> >> > >> >> > >    should interact with them.
> >> > >> >> > >
> >> > >> >> > >
> >> > >> >> > > *Style*
> >> > >> >> > >
> >> > >> >> > >    - nit: None of the 11 uses of the word "actual" in the
> doc
> >> are
> >> > >> >> > > *actually*
> >> > >> >> > >    needed. :-)
> >> > >> >> > >    - nit: Colors of the runtime blocks in the diagrams are
> >> > >> >> unconventional
> >> > >> >> > >    and a little distracting. Reminds me of nai won bao. Now
> >> I'm
> >> > >> >> hungry.
> >> > >> >> > :-)
> >> > >> >> > >    - Prefer the name "ExecutionEnvironment" over
> >> > >> "RuntimeEnvironment".
> >> > >> >> > The
> >> > >> >> > >    term "execution environment" is used
> >> > >> >> > >    - The code comparisons for the ApplicationRunners are not
> >> > >> >> > apples-apples.
> >> > >> >> > >    The local runner example is an application that USES the
> >> local
> >> > >> >> runner.
> >> > >> >> > > The
> >> > >> >> > >    remote runner example is the just the runner code itself.
> >> So,
> >> > >> it's
> >> > >> >> not
> >> > >> >> > >    readily apparent that we're comparing the main() methods
> >> and
> >> > not
> >> > >> >> the
> >> > >> >> > >    application itself.
> >> > >> >> > >
> >> > >> >> > >
> >> > >> >> > > On Mon, Apr 17, 2017 at 5:02 PM, Yi Pan <
> nickpan47@gmail.com>
> >> > >> wrote:
> >> > >> >> > >
> >> > >> >> > > > Made some updates to clarify the role and functions of
> >> > >> >> > RuntimeEnvironment
> >> > >> >> > > > in SEP-2.
> >> > >> >> > > >
> >> > >> >> > > > On Fri, Apr 14, 2017 at 9:30 AM, Yi Pan <
> >> nickpan47@gmail.com>
> >> > >> >> wrote:
> >> > >> >> > > >
> >> > >> >> > > > > Hi, everyone,
> >> > >> >> > > > >
> >> > >> >> > > > > In light of new features such as fluent API and
> standalone
> >> > that
> >> > >> >> > > introduce
> >> > >> >> > > > > new deployment / application launch models in Samza, I
> >> > created
> >> > >> a
> >> > >> >> new
> >> > >> >> > > > SEP-2
> >> > >> >> > > > > to address the new use cases. SEP-2 link:
> >> > https://cwiki.apache
> >> > >> .
> >> > >> >> > > > > org/confluence/display/SAMZA/
> SEP-2%3A+ApplicationRunner+
> >> > Design
> >> > >> >> > > > >
> >> > >> >> > > > > Please take a look and give feedbacks!
> >> > >> >> > > > >
> >> > >> >> > > > > Thanks!
> >> > >> >> > > > >
> >> > >> >> > > > > -Yi
> >> > >> >> > > > >
> >> > >> >> > > >
> >> > >> >> > >
> >> > >> >> >
> >> > >> >>
> >> > >> >
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >
> >
>