You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/16 07:44:44 UTC

[incubator-inlong] branch master updated: [INLONG-3751][Manager] Add configuration for plugins to support Audit metrics (#3752)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b1928827 [INLONG-3751][Manager] Add configuration for plugins to support Audit metrics (#3752)
9b1928827 is described below

commit 9b1928827629a78930b995f7eab2d7fb7a49479c
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Sat Apr 16 15:44:37 2022 +0800

    [INLONG-3751][Manager] Add configuration for plugins to support Audit metrics (#3752)
---
 .../java/org/apache/inlong/manager/plugin/flink/FlinkService.java   | 6 ++++--
 .../org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java     | 2 ++
 .../org/apache/inlong/manager/plugin/flink/enums/Constants.java     | 2 ++
 .../org/apache/inlong/manager/plugin/util/FlinkConfiguration.java   | 2 ++
 .../manager-plugins/src/main/resources/flink-sort-plugin.properties | 2 ++
 5 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 61c8d1c7d..963fae569 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -186,7 +186,7 @@ public class FlinkService {
     private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
         String localJarPath = flinkInfo.getLocalJarPath();
         File jarFile = new File(localJarPath);
-        String[] programArgs = genProgramArgs(flinkInfo);
+        String[] programArgs = genProgramArgs(flinkInfo,flinkConfig);
 
         PackagedProgram program = PackagedProgram.newBuilder()
                 .setConfiguration(configuration)
@@ -234,7 +234,7 @@ public class FlinkService {
     /**
      * Build the program of the Flink job.
      */
-    private String[] genProgramArgs(FlinkInfo flinkInfo) {
+    private String[] genProgramArgs(FlinkInfo flinkInfo,FlinkConfig flinkConfig) {
         List<String> list = new ArrayList<>();
         list.add("-cluster-id");
         list.add(flinkInfo.getJobName());
@@ -244,6 +244,8 @@ public class FlinkService {
         list.add(flinkInfo.getSourceType());
         list.add("-sink.type");
         list.add(flinkInfo.getSinkType());
+        list.add("-metrics.audit.proxy.hosts");
+        list.add(flinkConfig.getAuditProxyHosts());
         // TODO Support more than one stream with one group
         if (flinkInfo.getInlongStreamInfoList() != null
                 && !flinkInfo.getInlongStreamInfoList().isEmpty()) {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
index e125e51f5..986774820 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
@@ -32,4 +32,6 @@ public class FlinkConfig {
 
     private Integer parallelism;
 
+    private String auditProxyHosts;
+
 }
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 0a44c2bd8..4ffcc4212 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -30,6 +30,8 @@ public class Constants {
 
     public static final String SAVEPOINT_DIRECTORY = "flink.savepoint.directory";
 
+    public static final String METRICS_AUDIT_PROXY_HOSTS = "metrics.audit.proxy.hosts";
+
     //dataflow
     public static final String SOURCE_INFO = "source_info";
 
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
index db91d5271..9f325f4da 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
@@ -29,6 +29,7 @@ import java.util.Properties;
 
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.METRICS_AUDIT_PROXY_HOSTS;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
@@ -99,6 +100,7 @@ public class FlinkConfiguration {
         flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM)));
         flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY));
         flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
+        flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS));
         return flinkConfig;
     }
 
diff --git a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties b/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
index 7a9bd0cf1..f4867d1e6 100644
--- a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
+++ b/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
@@ -32,3 +32,5 @@ flink.jobmanager.port=6123
 flink.savepoint.directory=file:///data/inlong-sort/savepoints
 # Flink parallelism
 flink.parallelism=1
+# metrics.audit.proxy.hosts
+metrics.audit.proxy.hosts=127.0.0.1:10081
\ No newline at end of file