You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 周涛 <06...@163.com> on 2022/04/08 01:37:33 UTC
yarn api 提交报错
hi,
我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
flink版本1.14.4
Hadoop版本:3.0.0-cdh6.2.1
application模式,使用命令提交正常运行,api提交失败
提交失败,yarn日志:
LogType:jobmanager.err
LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
LogLength:107
LogContents:
Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
End of LogType:jobmanager.err
以下是代码:
System.setProperty("HADOOP_USER_NAME", "hdfs");
//flink的本地配置目录,为了得到flink的配置
String configurationDirectory = "E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";
//存放flink集群相关的jar包目录
String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
//用户jar
String userJarPath = "hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
String flinkDistJar = "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";
YarnClientService yarnClientService = new YarnClientService();
//yarnclient创建
YarnClient yarnClient = yarnClientService.getYarnClient();
yarnClient.start();
// 设置日志的,没有的话看不到日志
YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
.create(yarnClient);
//获取flink的配置
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
configurationDirectory);
flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");
flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
flinkConfiguration.set(PipelineOptions.JARS, Collections.singletonList(userJarPath));
Path remoteLib = new Path(flinkLibs);
flinkConfiguration.set(
YarnConfigOptions.PROVIDED_LIB_DIRS,
Collections.singletonList(remoteLib.toString()));
flinkConfiguration.set(
YarnConfigOptions.FLINK_DIST_JAR,
flinkDistJar);
// 设置为application模式
flinkConfiguration.set(
DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName());
// yarn application name
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);
// 设置用户jar的参数和主类
ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[]{}, "com.zt.FlinkTest1");
final int jobManagerMemoryMB =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getTotalProcessMemorySize()
.getMebiBytes();
final int taskManagerMemoryMB =
TaskExecutorProcessUtils.processSpecFromConfig(
TaskExecutorProcessUtils
.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
flinkConfiguration,
TaskManagerOptions.TOTAL_PROCESS_MEMORY))
.getTotalProcessMemorySize()
.getMebiBytes();
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
.setTaskManagerMemoryMB(taskManagerMemoryMB)
.setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
.createClusterSpecification();
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
(YarnConfiguration) yarnClient.getConfig(),
yarnClient,
clusterInformationRetriever,
true);
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
String webInterfaceURL = clusterClient.getWebInterfaceURL();
log.error("applicationId is {}", applicationId);
log.error("webInterfaceURL is {}", webInterfaceURL);
// 退出
// yarnClusterDescriptor.killCluster(applicationId);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// yarnClient.close();
}
以下是提交的部分日志:
以上是相关问题描述,已排查了多天,未找到原因,特此咨询专业团队,辛苦能给一些指导,感谢。
期待回复!
以上,祝好