You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 周涛 <06...@163.com> on 2022/04/08 01:37:33 UTC

yarn api 提交报错

hi,
    我在测试使用java api提交flink任务时,遇到了一些问题,需要请教:
    flink版本1.14.4
    Hadoop版本:3.0.0-cdh6.2.1
    application模式,使用命令提交正常运行,api提交失败
    提交失败,yarn日志:
               LogType:jobmanager.err
               LogLastModifiedTime:Fri Apr 08 09:24:01 +0800 2022
               LogLength:107
               LogContents:
               Error: Could not find or load main class org.apache.flink.yarn.entrypoint.YarnApplicationClusterEntryPoint
               End of LogType:jobmanager.err
以下是代码:


         System.setProperty("HADOOP_USER_NAME", "hdfs");
//flink的本地配置目录,为了得到flink的配置
String configurationDirectory = "E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\flink114\\";

//存放flink集群相关的jar包目录
String flinkLibs = "hdfs://nameservice1/flink/jar/libs/lib/";
//用户jar
String userJarPath = "hdfs://nameservice1/flink/jar/userTask/flink-streaming-test-1.0-SNAPSHOT.jar";
        String flinkDistJar = "hdfs://nameservice1/flink/jar/libs/flink-dist.jar";

        YarnClientService yarnClientService = new YarnClientService();
        //yarnclient创建
        YarnClient yarnClient = yarnClientService.getYarnClient();
        yarnClient.start();

// 设置日志的,没有的话看不到日志
YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
                .create(yarnClient);

//获取flink的配置
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
                configurationDirectory);

        flinkConfiguration.setString(ConfigOptions.key("fs.hdfs.hadoopconf").stringType().noDefaultValue(),
"E:\\AtourWork\\dctool\\dctool-common\\dctool-common-hdfs\\src\\main\\resources\\test\\yarn\\");

        flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);

        flinkConfiguration.set(PipelineOptions.JARS, Collections.singletonList(userJarPath));

        Path remoteLib = new Path(flinkLibs);
        flinkConfiguration.set(
                YarnConfigOptions.PROVIDED_LIB_DIRS,
                Collections.singletonList(remoteLib.toString()));

        flinkConfiguration.set(
                YarnConfigOptions.FLINK_DIST_JAR,
                flinkDistJar);

// 设置为application模式
flinkConfiguration.set(
                DeploymentOptions.TARGET,
                YarnDeploymentTarget.APPLICATION.getName());

// yarn application name
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "flink-application");

        YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

// 设置用户jar的参数和主类
ApplicationConfiguration appConfig = new ApplicationConfiguration(new String[]{}, "com.zt.FlinkTest1");


final int jobManagerMemoryMB =
                JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                        flinkConfiguration, JobManagerOptions.TOTAL_PROCESS_MEMORY)
                        .getTotalProcessMemorySize()
                        .getMebiBytes();
final int taskManagerMemoryMB =
                TaskExecutorProcessUtils.processSpecFromConfig(
                        TaskExecutorProcessUtils
                                .getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
                                        flinkConfiguration,
                                        TaskManagerOptions.TOTAL_PROCESS_MEMORY))
                        .getTotalProcessMemorySize()
                        .getMebiBytes();
        ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
                .setMasterMemoryMB(jobManagerMemoryMB)
                .setTaskManagerMemoryMB(taskManagerMemoryMB)
                .setSlotsPerTaskManager(flinkConfiguration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS))
                .createClusterSpecification();
        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
                flinkConfiguration,
                (YarnConfiguration) yarnClient.getConfig(),
                yarnClient,
                clusterInformationRetriever,
true);

try {
            ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                    clusterSpecification,
                    appConfig);

            ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

            ApplicationId applicationId = clusterClient.getClusterId();
            String webInterfaceURL = clusterClient.getWebInterfaceURL();
log.error("applicationId is {}", applicationId);
log.error("webInterfaceURL is {}", webInterfaceURL);

// 退出
// yarnClusterDescriptor.killCluster(applicationId);
} catch (Exception e) {
log.error(e.getMessage(), e);
        } finally {
//            yarnClient.close();
}

以下是提交的部分日志:


    以上是相关问题描述,已排查了多天,未找到原因,特此咨询专业团队,辛苦能给一些指导,感谢。
    期待回复!


以上,祝好