You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/16 08:38:16 UTC

[incubator-inlong] branch release-1.1.0 updated: [INLONG-3753][Manager] Add configuration for plugins to support metrics (#3754)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/release-1.1.0 by this push:
     new 8df060b10 [INLONG-3753][Manager] Add configuration for plugins to support metrics (#3754)
8df060b10 is described below

commit 8df060b101f544644f6986d74f387aa82208bef1
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Sat Apr 16 16:38:10 2022 +0800

    [INLONG-3753][Manager] Add configuration for plugins to support metrics (#3754)
    
    Co-authored-by: jianchenglv <6>
    Co-authored-by: dockerzhang <do...@apache.org>
---
 .../java/org/apache/inlong/manager/plugin/flink/FlinkService.java | 6 ++++--
 .../apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java | 8 ++++----
 .../org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java   | 4 ++++
 .../org/apache/inlong/manager/plugin/flink/enums/Constants.java   | 6 ++++--
 .../org/apache/inlong/manager/plugin/util/FlinkConfiguration.java | 4 ++++
 .../src/main/resources/flink-sort-plugin.properties               | 5 +++++
 6 files changed, 25 insertions(+), 8 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 61c8d1c7d..4e7a7fd4c 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -186,7 +186,7 @@ public class FlinkService {
     private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
         String localJarPath = flinkInfo.getLocalJarPath();
         File jarFile = new File(localJarPath);
-        String[] programArgs = genProgramArgs(flinkInfo);
+        String[] programArgs = genProgramArgs(flinkInfo, flinkConfig);
 
         PackagedProgram program = PackagedProgram.newBuilder()
                 .setConfiguration(configuration)
@@ -234,7 +234,7 @@ public class FlinkService {
     /**
      * Build the program of the Flink job.
      */
-    private String[] genProgramArgs(FlinkInfo flinkInfo) {
+    private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
         List<String> list = new ArrayList<>();
         list.add("-cluster-id");
         list.add(flinkInfo.getJobName());
@@ -244,6 +244,8 @@ public class FlinkService {
         list.add(flinkInfo.getSourceType());
         list.add("-sink.type");
         list.add(flinkInfo.getSinkType());
+        list.add("-metrics.audit.proxy.hosts");
+        list.add(flinkConfig.getAuditProxyHosts());
         // TODO Support more than one stream with one group
         if (flinkInfo.getInlongStreamInfoList() != null
                 && !flinkInfo.getInlongStreamInfoList().isEmpty()) {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
index 1350a7276..c321c925e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
 
 import static org.apache.flink.api.common.JobStatus.FINISHED;
@@ -78,8 +77,9 @@ public class IntegrationTaskRunner implements Runnable {
             case RESTART:
                 try {
                     StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
-                    stopWithSavepointRequest.setDrain(Constants.DRAIN);
-                    stopWithSavepointRequest.setTargetDirectory(Constants.SAVEPOINT_DIRECTORY);
+                    FlinkConfig flinkConfig = flinkService.getFlinkConfig();
+                    stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
+                    stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
                     String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
                     flinkInfo.setSavepointPath(location);
                     log.info("the jobId: {} savepoint: {} ", flinkInfo.getJobId(), location);
@@ -116,8 +116,8 @@ public class IntegrationTaskRunner implements Runnable {
             case STOP:
                 try {
                     StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
-                    stopWithSavepointRequest.setDrain(Constants.DRAIN);
                     FlinkConfig flinkConfig = flinkService.getFlinkConfig();
+                    stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
                     stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
                     String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
                     flinkInfo.setSavepointPath(location);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
index e125e51f5..3bde24ffe 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
@@ -32,4 +32,8 @@ public class FlinkConfig {
 
     private Integer parallelism;
 
+    private boolean drain;
+
+    private String auditProxyHosts;
+
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 0a44c2bd8..2f60164b3 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -30,6 +30,10 @@ public class Constants {
 
     public static final String SAVEPOINT_DIRECTORY = "flink.savepoint.directory";
 
+    public static final String DRAIN = "flink.drain";
+
+    public static final String METRICS_AUDIT_PROXY_HOSTS = "metrics.audit.proxy.hosts";
+
     //dataflow
     public static final String SOURCE_INFO = "source_info";
 
@@ -62,6 +66,4 @@ public class Constants {
 
     public static final String SEPARATOR = ":";
 
-    public static final boolean DRAIN = false;
-
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
index db91d5271..c591b89b7 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
@@ -30,6 +30,8 @@ import java.util.Properties;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.METRICS_AUDIT_PROXY_HOSTS;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
 
@@ -99,6 +101,8 @@ public class FlinkConfiguration {
         flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM)));
         flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY));
         flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
+        flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
+        flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS));
         return flinkConfig;
     }
 
diff --git a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties b/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
index 7a9bd0cf1..388c6aaa5 100644
--- a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
+++ b/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
@@ -32,3 +32,8 @@ flink.jobmanager.port=6123
 flink.savepoint.directory=file:///data/inlong-sort/savepoints
 # Flink parallelism
 flink.parallelism=1
+# flink stop request drain
+flink.drain=false
+
+# metrics.audit.proxy.hosts
+metrics.audit.proxy.hosts=127.0.0.1:10081