You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Ke Wu (Confluence)" <no...@apache.org> on 2019/11/15 22:30:00 UTC

[CONF] Apache Samza > SEP-23: Simplify Job Runner

There's **1 new edit** on this page  
---  
|  
---  
|  | [![page icon](cid:page-
icon)](https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner?src=mail&src.mail.product=confluence-
server&src.mail.timestamp=1573857000692&src.mail.notification=com.atlassian.confluence.plugins.confluence-
notifications-batch-plugin%3Abatching-
notification&src.mail.recipient=8aa980875bf24635015c82a7d09902ac&src.mail.action=view
"page icon")  
---  
[SEP-23: Simplify Job
Runner](https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner?src=mail&src.mail.product=confluence-
server&src.mail.timestamp=1573857000692&src.mail.notification=com.atlassian.confluence.plugins.confluence-
notifications-batch-plugin%3Abatching-
notification&src.mail.recipient=8aa980875bf24635015c82a7d09902ac&src.mail.action=view
"SEP-23: Simplify Job Runner")  
|  |  |  |  | ![](cid:avatar_9f2697819e37c875ff50f49ce698f1ef) |  | Ke Wu
edited this page  
---  
|  
|  | Here's what changed:  
---  
|

...

**JIRA** : SAMZA-TBD _  
_

**Released:  **

#  **Problem**

#  **Motivation**

#  **Proposed Changes**

#  **Public Interfaces**

#  **Implementation and Test Plan**

Samza Yarn follows a split deployment model, where Job Runner, which runs on
the submission host, reads configuration, performs planning and persist config
in the coordinator stream before submitting the job to Yarn cluster. In Yarn,
Application Master (AM) reads config from coordinator stream before spinning
up containers to execute. Split of responsibility between job runner and AM is
operationally confusing, and makes debugging the pipeline difficult. In
addition, since planning invokes user code, it requires isolation on the
runner from security perspective to guard the framework from malicious user
code. In addition, config file is already packed in the tarball submitted to
Yarn, it could be easier for AM to pick up the config locally.

#  **Proposed Changes**

We will provide a plugable config retrieval and planning interface on AM, when
set, Job Runner will simplify submit the job to Yarn, without involving any
complex logic. AM on the other hand, will read job config by provided config
loader, performs planning, generate DAG and persist back to coordinator
stream.

#  **Public Interfaces**

Two job configs to configure the job to use the alternative workflow:

  * job.config.loader.class
  * job.config.loader.properties

Fully backward compatible. For people who are interested in the new workflow,
simplify supply "job.config.loader.class" and "job.config.loader.properties".
For example, in Hello Samza example, application will be invoked by

|
![](https://cwiki.apache.org/confluence/s/en_GB/8100/6ef1ce95c788ac159314a8fa6387047b8d1cc9fb/_/plugins/servlet/confluence/placeholder/macro-
icon?name=code) Code Block  
---  
      
    
    deploy/samza/bin/run-app.sh \
      --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \
      --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties \
      --config job.config.loader.class=org.apache.samza.config.loader.PropertiesConfigLoader \
      --config job.config.loader.properties.path=./__package/config/wikipedia-feed.properties
      
  
#  **Implementation and Test Plan**

##  JobConfig

Extra two configs in JobConfig we add to control whether to read AM from
ConfigLoader instead of coordinator stream.

![](https://cwiki.apache.org/confluence/s/en_GB/8100/6ef1ce95c788ac159314a8fa6387047b8d1cc9fb/_/plugins/servlet/confluence/placeholder/macro-
icon?name=code) Code Block  
---  
      
    
    // Configuration to a fully qualified class name to load config from.
    public static final String CONFIG_LOADER_CLASS = "job.config.loader.class";
    // Properties needed for the config loader to load config.
    public static final String CONFIG_LOADER_PROPERTIES_PREFIX = "job.config.loader.properties.";
      
  
##  ConfigLoader

Interface which AM relies on to read configuration from. It takes in a
properties map, which defines the variables it needed in order to get the
proper config.

![](https://cwiki.apache.org/confluence/s/en_GB/8100/6ef1ce95c788ac159314a8fa6387047b8d1cc9fb/_/plugins/servlet/confluence/placeholder/macro-
icon?name=code) Code Block  
---  
| language | java  
---|---  
      
    
    public interface ConfigLoader {
      /**
       * Build a specific Config.
       * @param properties Resource containing information necessary for this Config.
       * @return Newly constructed Config.
       */
      Config getConfig(Config properties);
    }
      
  
##  PropertiesConfigLoader

Default implementation of ConfigLoader, which reads "path" from the input
properties, which leads to a property file.

![](https://cwiki.apache.org/confluence/s/en_GB/8100/6ef1ce95c788ac159314a8fa6387047b8d1cc9fb/_/plugins/servlet/confluence/placeholder/macro-
icon?name=code) Code Block  
---  
| language | scala  
---|---  
      
    
    public class PropertiesConfigLoader extends ConfigLoader {
      /**
       * Build a specific Config.
       * @param properties Resource containing information necessary for this Config.
       * @return Newly constructed Config.
       */
      override def getConfig(config: MapConfig): Config = {
        val path = config.get(JobConfig.CONFIG_LOADER_PROPERTIES_PREFIX + "path")
    
        val props = new Properties()
        val in = new FileInputStream(path)
    
        props.load(in)
        in.close()
    
        debug("got config %s from config %s" format (props, path))
    
        new MapConfig(props.asScala.asJava)
      }
    }
      
  
  

##  RemoteApplicationRunner

Depending on the existence of "job.config.loader.class" and
"job.config.loader.properties", RemoteApplicationRunner#run will keep its
current behavior or simplify submit the job to Yarn.

![](https://cwiki.apache.org/confluence/s/en_GB/8100/6ef1ce95c788ac159314a8fa6387047b8d1cc9fb/_/plugins/servlet/confluence/placeholder/macro-
icon?name=code) Code Block  
---  
      
    
    @Override
      public void run(ExternalContext externalContext) {
        if (new JobConfig(config).getConfigLoaderClass() != null) {
          JobRunner runner = new JobRunner(config);
          runner.getJobFactory().getJob(config).submit();
        } else {
          // Keep existing behavior
        }
      
  
##  YarnJob

YarnJob#buildEnvironment will build coordinator stream env variable or config
loader env variable based on the existence of "job.config.loader.class" and
"job.config.loader.properties".

  

![](https://cwiki.apache.org/confluence/s/en_GB/8100/6ef1ce95c788ac159314a8fa6387047b8d1cc9fb/_/plugins/servlet/confluence/placeholder/macro-
icon?name=code) Code Block  
---  
      
    
    private[yarn] def buildEnvironment(config: Config, yarnConfig: YarnConfig,
        jobConfig: JobConfig): Map[String, String] = {
        val envMapBuilder = Map.newBuilder[String, String]
    
        if (jobConfig.getConfigLoaderClass != null) {
          envMapBuilder += ShellCommandConfig.ENV_REMOTE_CONFIG_FACTORY ->
            Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(jobConfig.getRemoteConfigFactoryClassName))
          envMapBuilder += ShellCommandConfig.ENV_REMOTE_CONFIG_PROPERTIES ->
            Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(jobConfig.getRemoteConfigProperties))
        } else {
          val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)
          envMapBuilder += ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG ->
            Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig))
        }
      
  
  

##  ClusterBasedJobCoordinator

ClusterBasedJobCoordinator#main will construct the application config through
coordinator stream or config loader, depending on the env variables it get.

#  **Compatibility, Deprecation, and Migration Plan**

...

Fully backward compatible, jobs can still follow the existing workflow. We
will gradually deprecate the current flow and make the two new configs
required.

  
  
  
|  |  | [Go to page
history](https://cwiki.apache.org/confluence/pages/viewpreviousversions.action?pageId=135861183&src=mail&src.mail.product=confluence-
server&src.mail.timestamp=1573857000692&src.mail.notification=com.atlassian.confluence.plugins.confluence-
notifications-batch-plugin%3Abatching-
notification&src.mail.recipient=8aa980875bf24635015c82a7d09902ac "Go to page
history")  
---  
---  
| [View
page](https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner?src=mail&src.mail.product=confluence-
server&src.mail.timestamp=1573857000692&src.mail.notification=com.atlassian.confluence.plugins.confluence-
notifications-batch-plugin%3Abatching-
notification&src.mail.recipient=8aa980875bf24635015c82a7d09902ac&src.mail.action=view)  
---  
  
|  | [Stop watching
space](https://cwiki.apache.org/confluence/users/removespacenotification.action?spaceKey=SAMZA&src=mail&src.mail.product=confluence-
server&src.mail.timestamp=1573857000692&src.mail.notification=com.atlassian.confluence.plugins.confluence-
notifications-batch-plugin%3Abatching-
notification&src.mail.recipient=8aa980875bf24635015c82a7d09902ac&src.mail.action=stop-
watching&jwt=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ4c3JmOjhhYTk4MDg3NWJmMjQ2MzUwMTVjODJhN2QwOTkwMmFjIiwicXNoIjoiNmJiMTUzYTkxNzI1NTE3YzY0MTM4OWVjYWYxZmYzMTFjNmYxNzc3NTVlZDg3MTc2NzNkMmJjMTU3OWNiOTNkOSIsImlzcyI6ImNvbmZsdWVuY2Vfbm90aWZpY2F0aW9uc0FSRUgtWFVEMS1QT1FHLUNTQU8iLCJleHAiOjE1NzQ0NjE4MDAsImlhdCI6MTU3Mzg1NzAwMH0.xCf2-rFrbbQjGm8KcGIDaeHp_1hceNEAhUrwLk2Gydw)
| •  
---|---  
[Manage
notifications](https://cwiki.apache.org/confluence/users/editmyemailsettings.action?src=mail&src.mail.product=confluence-
server&src.mail.timestamp=1573857000692&src.mail.notification=com.atlassian.confluence.plugins.confluence-
notifications-batch-plugin%3Abatching-
notification&src.mail.recipient=8aa980875bf24635015c82a7d09902ac&src.mail.action=manage)  
---  
| ![Confluence logo big](cid:footer-desktop-logo)  
---  
This message was sent by Atlassian Confluence 6.15.8  
![](cid:footer-mobile-logo)  
---