You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/09/14 14:06:06 UTC

[GitHub] [dolphinscheduler] github-code-scanning[bot] commented on a diff in pull request #11939: [Feature-11440][Flink Stream] Use flink SDK replace the shell cli

github-code-scanning[bot] commented on code in PR #11939:
URL: https://github.com/apache/dolphinscheduler/pull/11939#discussion_r970856087


##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) {
+                if (doSavepoint) {
+                    CompletableFuture<String> savepointFuture =
+                            clusterClient.cancelWithSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT);
+                    Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job savepoint path: {}", result.toString());
+                } else {
+                    CompletableFuture<Acknowledge> cancelFuture = clusterClient.cancel(flinkJobId);
+                    Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job cancel result: {}", result.toString());
+                }
+            } catch (Exception e) {
+                try {
+                    logger.error("cancel job error, will kill job:", e);
+                    clusterDescriptor.killCluster(applicationId);
+                } catch (FlinkException e1) {
+                    logger.error("yarn cluster Descriptor kill cluster error:", e);
+                    return new ResultInfo("", "");
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("cancel job failed,appId:{}, jobId:", appId, jobId), e);

Review Comment:
   ## Unused format argument
   
   This format call refers to 0 argument(s) but supplies 2 argument(s).
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1250)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1244)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) {
+                if (doSavepoint) {
+                    CompletableFuture<String> savepointFuture =
+                            clusterClient.cancelWithSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT);
+                    Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job savepoint path: {}", result.toString());
+                } else {
+                    CompletableFuture<Acknowledge> cancelFuture = clusterClient.cancel(flinkJobId);
+                    Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job cancel result: {}", result.toString());
+                }
+            } catch (Exception e) {
+                try {
+                    logger.error("cancel job error, will kill job:", e);
+                    clusterDescriptor.killCluster(applicationId);
+                } catch (FlinkException e1) {
+                    logger.error("yarn cluster Descriptor kill cluster error:", e);
+                    return new ResultInfo("", "");
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("cancel job failed,appId:{}, jobId:", appId, jobId), e);
+            return new ResultInfo(appId, jobId);
+        }
+
+        return new ResultInfo(appId, jobId);
+    }
+
+    public ResultInfo savePoint() {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1247)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) {
+                if (doSavepoint) {
+                    CompletableFuture<String> savepointFuture =
+                            clusterClient.cancelWithSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT);
+                    Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job savepoint path: {}", result.toString());
+                } else {
+                    CompletableFuture<Acknowledge> cancelFuture = clusterClient.cancel(flinkJobId);
+                    Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job cancel result: {}", result.toString());
+                }
+            } catch (Exception e) {
+                try {
+                    logger.error("cancel job error, will kill job:", e);
+                    clusterDescriptor.killCluster(applicationId);
+                } catch (FlinkException e1) {
+                    logger.error("yarn cluster Descriptor kill cluster error:", e);
+                    return new ResultInfo("", "");
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("cancel job failed,appId:{}, jobId:", appId, jobId), e);
+            return new ResultInfo(appId, jobId);
+        }
+
+        return new ResultInfo(appId, jobId);
+    }
+
+    public ResultInfo savePoint() {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) {
+                CompletableFuture<String> savepointFuture =
+                        clusterClient.triggerSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT);
+                Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                logger.info("flink job savepoint path: {}", result.toString());
+            } catch (Exception e) {
+                logger.error("flink job savepoint error", e);
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("flink job savepoint failed, appId:{}, jobId:", appId, jobId), e);
+            return new ResultInfo(appId, jobId);
+        }
+
+        return new ResultInfo(appId, jobId);
+    }
+
+    protected Configuration getFlinkConfigFromParamsInfo() {
+        Configuration defaultGlobalConfig =
+                JobGraphBuildUtil.getFlinkConfiguration(jobParamsInfo.getFlinkConfDir());
+        replaceDefaultGlobalConfig(defaultGlobalConfig, jobParamsInfo);
+        return defaultGlobalConfig;
+    }
+
+    /**
+     * replace the default configuration items in the flink-conf.yaml
+     *
+     * @param flinkConfig
+     * @param jobParamsInfo
+     */
+    protected void replaceDefaultGlobalConfig(Configuration flinkConfig, ParamsInfo jobParamsInfo) {
+        if (!StringUtils.isEmpty(jobParamsInfo.getName())) {
+            flinkConfig.setString(YarnConfigOptions.APPLICATION_NAME, jobParamsInfo.getName());
+        }
+
+        if (!StringUtils.isEmpty(jobParamsInfo.getQueue())) {
+            flinkConfig.setString(YarnConfigOptions.APPLICATION_QUEUE, jobParamsInfo.getQueue());
+        }
+
+        if (!StringUtils.isEmpty(jobParamsInfo.getFlinkConfDir())) {
+            discoverLogConfigFile(jobParamsInfo.getFlinkConfDir())
+                    .ifPresent(
+                            file -> flinkConfig.setString(
+                                    YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE,
+                                    file.getPath()));
+        }
+
+        if (!flinkConfig.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY)) {
+            flinkConfig.setString(
+                    TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), DEFAULT_TOTAL_PROCESS_MEMORY);
+        }
+
+        // fill security config
+        if (jobParamsInfo.isOpenSecurity()) {
+            flinkConfig.setString(
+                    SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(),
+                    PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
+            Optional.ofNullable(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME))
+                    .ifPresent(
+                            principal -> flinkConfig.setString(
+                                    SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(),
+                                    principal));
+        }
+
+        Properties flinkConfigProperties = jobParamsInfo.getConfProperties();
+        if (!Objects.isNull(flinkConfigProperties)) {
+            flinkConfigProperties.stringPropertyNames().stream()
+                    .forEach(
+                            key -> flinkConfig.setString(
+                                    key, flinkConfigProperties.getProperty(key)));
+        }
+    }
+
+    /**
+     * find log4 files from flink conf
+     *
+     * @param configurationDirectory
+     * @return
+     */
+    protected Optional<File> discoverLogConfigFile(final String configurationDirectory) {
+        Optional<File> logConfigFile = Optional.empty();
+
+        final File log4jFile =
+                new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
+        if (log4jFile.exists()) {
+            logConfigFile = Optional.of(log4jFile);
+        }
+
+        final File logbackFile =
+                new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
+        if (logbackFile.exists()) {
+            if (logConfigFile.isPresent()) {
+                logger.warn(
+                        "The configuration directory ('"
+                                + configurationDirectory
+                                + "') already contains a logger4J config file."
+                                + "If you want to use logback, then please delete or rename the log configuration file.");

Review Comment:
   ## Missing space in string literal
   
   This string appears to be missing a space after 'file.'.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1249)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/utils/YarnLogHelper.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * reference for yarn LogCLIHelpers
+ *
+ */
+public class YarnLogHelper {
+
+    private static final Logger logger = LoggerFactory.getLogger(YarnLogHelper.class);
+
+    public static String printAllContainersLogsReturnFilePath(
+                                                              YarnConfiguration configuration, String finishedJobLogDir,
+                                                              String applicationId) throws IOException {
+        Path remoteRootLogDir =
+                new Path(
+                        configuration.get(
+                                YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+                                YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+        ApplicationId appId = ConverterUtils.toApplicationId(applicationId);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1248)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1246)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java:
##########
@@ -43,6 +54,13 @@
      */
     private TaskExecutionContext taskExecutionContext;
 
+
+
+    private ResultInfo flinkStreamResultInfo;
+
+    protected final Logger logger =

Review Comment:
   ## Field masks field in super class
   
   This field shadows another field called [logger](1) in a superclass.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1253)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/utils/YarnLogHelper.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.*;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * reference for yarn LogCLIHelpers
+ *
+ */
+public class YarnLogHelper {
+
+    private static final Logger logger = LoggerFactory.getLogger(YarnLogHelper.class);
+
+    public static String printAllContainersLogsReturnFilePath(
+                                                              YarnConfiguration configuration, String finishedJobLogDir,
+                                                              String applicationId) throws IOException {
+        Path remoteRootLogDir =
+                new Path(
+                        configuration.get(
+                                YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+                                YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+        ApplicationId appId = ConverterUtils.toApplicationId(applicationId);
+        // mkdir if not exist
+        FileUtils.forceMkdir(new File(finishedJobLogDir));
+        String logFilePath = finishedJobLogDir + "/" + applicationId + ".log";
+        logger.info("finished job log file is:{} ", logFilePath);
+        File localLogFile = new File(logFilePath);
+        if (localLogFile.exists() && localLogFile.isFile() && localLogFile.length() > 0) {
+            logger.info("yarn log exist in local, log file path is:{}", localLogFile);
+            return logFilePath;
+        }
+
+        String hadoopUser = UserGroupInformation.getCurrentUser().getShortUserName();
+        String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(configuration);
+
+        logger.info("Current Hadoop/Kerberos user: {}", hadoopUser);
+        Path remoteAppLogDir =
+                LogAggregationUtils.getRemoteAppLogDir(
+                        remoteRootLogDir, appId, hadoopUser, logDirSuffix);
+
+        long logFileSize = getLogFileSize(configuration, remoteAppLogDir.toString());
+        Preconditions.checkArgument(logFileSize > 0, "log file size =0");
+
+        // hdfs log file exist and create file and print stream
+        FileUtils.touch(localLogFile);
+        FileOutputStream fileOutputStream = new FileOutputStream(logFilePath);
+
+        try (PrintStream printStream = new PrintStream(fileOutputStream, true)) {
+            RemoteIterator<FileStatus> nodeFiles = null;
+            try {
+                Path qualifiedLogDir =
+                        FileContext.getFileContext(configuration).makeQualified(remoteAppLogDir);
+                nodeFiles =
+                        FileContext.getFileContext(qualifiedLogDir.toUri(), configuration)
+                                .listStatus(remoteAppLogDir);
+            } catch (FileNotFoundException fnf) {
+                logDirNotExist(remoteAppLogDir.toString(), printStream);
+            }
+
+            boolean foundAnyLogs = false;
+
+            while (nodeFiles.hasNext()) {

Review Comment:
   ## Dereferenced variable may be null
   
   Variable [nodeFiles](1) may be null here because of [this](2) assignment.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1252)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/enums/ClusterClient.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.enums;
+
+import org.apache.dolphinscheduler.plugin.task.flink.client.IClusterClient;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.CheckpointInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.executor.KerberosSecurityContext;
+import org.apache.dolphinscheduler.plugin.task.flink.executor.YarnApplicationClusterExecutor;
+import org.apache.dolphinscheduler.plugin.task.flink.executor.YarnPerJobClusterExecutor;
+import org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.HdfsUtil;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.YarnLogHelper;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.FunctionUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public enum ClusterClient implements IClusterClient {
+
+    INSTANCE;
+
+    private static final Logger logger = LoggerFactory.getLogger(ClusterClient.class);
+
+    private static Cache<String, YarnClient> YARN_CLIENT_CACHE =
+            CacheBuilder.newBuilder()
+                    .expireAfterWrite(12, TimeUnit.HOURS)
+                    .removalListener(new YarnClientRemovalListener())
+                    .build();
+
+    @Override
+    public ResultInfo submitFlinkJob(ParamsInfo jobParamsInfo) throws Exception {
+        FlinkStreamDeployMode mode =
+                jobParamsInfo.getRunMode() != null
+                        ? FlinkStreamDeployMode.YARN_PER_JOB
+                        : jobParamsInfo.getRunMode();
+        ResultInfo resultInfo;
+        switch (mode) {
+            case YARN_APPLICATION:
+                resultInfo = new YarnApplicationClusterExecutor(jobParamsInfo).submitJob();
+                break;
+            case YARN_PER_JOB:
+            default:
+                resultInfo = new YarnPerJobClusterExecutor(jobParamsInfo).submitJob();
+        }
+        return resultInfo;
+    }
+
+    @Override
+    public ResultInfo submitFlinkJobWithKerberos(ParamsInfo jobParamsInfo) throws Exception {
+        return KerberosSecurityContext.runSecured(
+                jobParamsInfo,
+                FunctionUtils.uncheckedSupplier(() -> submitFlinkJob(jobParamsInfo)));
+    }
+
+    @Override
+    public ResultInfo killYarnJob(ParamsInfo jobParamsInfo) throws IOException {
+        return new YarnPerJobClusterExecutor(jobParamsInfo).killJob();
+    }
+
+    @Override
+    public ResultInfo killYarnJobWithKerberos(ParamsInfo jobParamsInfo) throws Exception {
+        return KerberosSecurityContext.runSecured(
+                jobParamsInfo, FunctionUtils.uncheckedSupplier(() -> killYarnJob(jobParamsInfo)));
+    }
+
+    @Override
+    public YarnTaskStatus getYarnJobStatus(ParamsInfo jobParamsInfo) throws Exception {
+        String applicationId = jobParamsInfo.getApplicationId();
+        String hadoopConfDir = jobParamsInfo.getHadoopConfDir();
+        Preconditions.checkNotNull(applicationId, "yarn applicationId is not null!");
+        Preconditions.checkNotNull(hadoopConfDir, "hadoop conf dir is not null!");
+
+        YarnClient yarnClient =
+                YARN_CLIENT_CACHE.get(
+                        hadoopConfDir,
+                        () -> {
+                            try {
+                                logger.info("create yarn client,create time:{}", LocalDateTime.now());
+                                return new YarnPerJobClusterExecutor(jobParamsInfo)
+                                        .createYarnClient();
+                            } catch (IOException e) {
+                                logger.error("create yarn client error!", e);
+                            }
+                            return null;
+                        });
+
+        if (!Objects.isNull(yarnClient)) {
+            try {
+                ApplicationId appId = ConverterUtils.toApplicationId(applicationId);

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ConverterUtils.toApplicationId](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1245)



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/executor/AbstractClusterExecutor.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.plugin.task.flink.executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOG4J_NAME;
+import static org.apache.flink.yarn.configuration.YarnLogConfigUtil.CONFIG_FILE_LOGBACK_NAME;
+
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ParamsInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.entity.ResultInfo;
+import org.apache.dolphinscheduler.plugin.task.flink.factory.YarnClusterDescriptorFactory;
+import org.apache.dolphinscheduler.plugin.task.flink.utils.JobGraphBuildUtil;
+import org.apache.dolphinscheduler.spi.utils.Constants;
+import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractClusterExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractClusterExecutor.class);
+
+    private static final String DEFAULT_TOTAL_PROCESS_MEMORY = "1024m";
+
+    public ParamsInfo jobParamsInfo;
+
+    public AbstractClusterExecutor(ParamsInfo jobParamsInfo) {
+        this.jobParamsInfo = jobParamsInfo;
+    }
+
+    /**
+     * submit job
+     *
+     * @return
+     */
+    public abstract ResultInfo submitJob();
+
+    public YarnClient createYarnClient() throws IOException {
+        YarnClient yarnClient =
+                YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromHadoopConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+        logger.info(
+                "yarn client successfully created, hadoop conf dir:{}",
+                jobParamsInfo.getHadoopConfDir());
+        return yarnClient;
+    }
+
+    /**
+     * kill yarn job and clean application files in hdfs.
+     *
+     * @return
+     */
+    public ResultInfo killJob() throws IOException {
+        String applicationId = jobParamsInfo.getApplicationId();
+        if (StringUtils.isEmpty(applicationId)) {
+            throw new NullPointerException("kill yarn job applicationId is required!");
+        }
+        logger.info("killed applicationId is:{}", applicationId);
+        YarnConfiguration yarnConfiguration =
+                YarnClusterDescriptorFactory.INSTANCE.parseYarnConfFromConfDir(
+                        jobParamsInfo.getHadoopConfDir());
+
+        try (
+                YarnClient yarnClient =
+                        YarnClusterDescriptorFactory.INSTANCE.createYarnClientFromYarnConf(
+                                yarnConfiguration);) {
+            yarnClient.killApplication(ConverterUtils.toApplicationId(applicationId));
+            logger.info("killed applicationId {} was unsuccessful.", applicationId);
+        } catch (YarnException e) {
+            logger.error("killed applicationId {0} was failed.", e);
+            return new ResultInfo("", "");
+        }
+
+        try (FileSystem fs = FileSystem.get(yarnConfiguration)) {
+            Path applicationDir =
+                    new Path(
+                            checkNotNull(fs.getHomeDirectory()),
+                            ".flink/" + checkNotNull(applicationId) + '/');
+            if (!fs.delete(applicationDir, true)) {
+                logger.error(
+                        "Deleting yarn application files under {} was unsuccessful.",
+                        applicationDir);
+            } else {
+                logger.error(
+                        "Deleting yarn application files under {} was successful.", applicationDir);
+            }
+        } catch (Exception e) {
+            logger.error("Deleting yarn application files was failed!", e);
+        }
+        return new ResultInfo("", "");
+    }
+
+    public ResultInfo cancelJob(boolean doSavepoint) {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) {
+                if (doSavepoint) {
+                    CompletableFuture<String> savepointFuture =
+                            clusterClient.cancelWithSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT);
+                    Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job savepoint path: {}", result.toString());
+                } else {
+                    CompletableFuture<Acknowledge> cancelFuture = clusterClient.cancel(flinkJobId);
+                    Object result = cancelFuture.get(2, TimeUnit.MINUTES);
+                    logger.info("flink job cancel result: {}", result.toString());
+                }
+            } catch (Exception e) {
+                try {
+                    logger.error("cancel job error, will kill job:", e);
+                    clusterDescriptor.killCluster(applicationId);
+                } catch (FlinkException e1) {
+                    logger.error("yarn cluster Descriptor kill cluster error:", e);
+                    return new ResultInfo("", "");
+                }
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("cancel job failed,appId:{}, jobId:", appId, jobId), e);
+            return new ResultInfo(appId, jobId);
+        }
+
+        return new ResultInfo(appId, jobId);
+    }
+
+    public ResultInfo savePoint() {
+        String appId = jobParamsInfo.getApplicationId();
+        String jobId = jobParamsInfo.getFlinkJobId();
+
+        logger.info("cancel Job appId:{}, jobId:{}", appId, jobId);
+
+        ApplicationId applicationId = ConverterUtils.toApplicationId(appId);
+        JobID flinkJobId = new JobID(org.apache.flink.util.StringUtils.hexStringToByte(jobId));
+
+        Configuration flinkConfig = getFlinkConfigFromParamsInfo();
+        try (
+                YarnClusterDescriptor clusterDescriptor =
+                        (YarnClusterDescriptor) YarnClusterDescriptorFactory.INSTANCE.createClusterDescriptor(
+                                jobParamsInfo.getHadoopConfDir(), flinkConfig)) {
+
+            ClusterClientProvider<ApplicationId> retrieve =
+                    clusterDescriptor.retrieve(applicationId);
+            try (ClusterClient<ApplicationId> clusterClient = retrieve.getClusterClient()) {
+                CompletableFuture<String> savepointFuture =
+                        clusterClient.triggerSavepoint(flinkJobId, null, SavepointFormatType.DEFAULT);
+                Object result = savepointFuture.get(2, TimeUnit.MINUTES);
+                logger.info("flink job savepoint path: {}", result.toString());
+            } catch (Exception e) {
+                logger.error("flink job savepoint error", e);
+            }
+
+        } catch (Exception e) {
+            logger.error(String.format("flink job savepoint failed, appId:{}, jobId:", appId, jobId), e);

Review Comment:
   ## Unused format argument
   
   This format call refers to 0 argument(s) but supplies 2 argument(s).
   
   [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/1251)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org