You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by melin li <li...@gmail.com> on 2020/01/20 06:47:32 UTC
支持flink.yarn.jars 参数
在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。
YarnClusterDescriptor.java
// upload and register ship files
String systemJarHdfsDir =
configuration.getString("stream.flink.system.jars.dir", "");
List<String> systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths,
localResources, envShipFileList);
String userJars = configuration.getString("stream.flink.use.jars", "");
List<String> userClassPaths;
if (userJars != null && !"".equals(userJars)) {
userClassPaths = registerUserJars(fs, userJars.split(","), paths,
localResources, envShipFileList);
} else {
userClassPaths = Collections.emptyList();
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
// Setup jar for ApplicationMaster
Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath,
localResources);
Re: 支持flink.yarn.jars 参数
Posted by tison <wa...@gmail.com>.
有这个想法,目前腾讯内部已经实现了相关功能,我记得 Yang Wang(in cc) 在阿里也做了类似的功能,这个要做干净可能需要连着跟
YarnClusterDescriptor 的代码都整理一下。确实也看到这个需求常常被提起,尽量在 1.11 里面实现吧。
你也可以再详细描述下行为或者由你实现社区这边帮忙 review 呀,我不太记得有没有 JIRA 了,你可以找找或者直接建一个。
Best,
tison.
melin li <li...@gmail.com> 于2020年1月20日周一 下午4:59写道:
> 在spark中有一个spark.yarn.jars 参数,作业依赖jar 直接放在hdfs上,避免从本地上传jar,在分发,加快启动速度。
>
> YarnClusterDescriptor.java
>
> // upload and register ship files
> String systemJarHdfsDir =
> configuration.getString("stream.flink.system.jars.dir", "");
> List<String> systemClassPaths = findHdfsJars(fs, systemJarHdfsDir, paths,
> localResources, envShipFileList);
>
> String userJars = configuration.getString("stream.flink.use.jars", "");
> List<String> userClassPaths;
> if (userJars != null && !"".equals(userJars)) {
> userClassPaths = registerUserJars(fs, userJars.split(","), paths,
> localResources, envShipFileList);
> } else {
> userClassPaths = Collections.emptyList();
> }
>
> if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
> systemClassPaths.addAll(userClassPaths);
> }
>
> // normalize classpath by sorting
> Collections.sort(systemClassPaths);
> Collections.sort(userClassPaths);
>
> // classpath assembler
> StringBuilder classPathBuilder = new StringBuilder();
> if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
> for (String userClassPath : userClassPaths) {
> classPathBuilder.append(userClassPath).append(File.pathSeparator);
> }
> }
> for (String classPath : systemClassPaths) {
> classPathBuilder.append(classPath).append(File.pathSeparator);
> }
> if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
> for (String userClassPath : userClassPaths) {
> classPathBuilder.append(userClassPath).append(File.pathSeparator);
> }
> }
>
> // Setup jar for ApplicationMaster
> Path remotePathJar = setupFlinkJar("flink.jar", fs, flinkJarPath,
> localResources);
>