You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/16 07:44:44 UTC
[incubator-inlong] branch master updated: [INLONG-3751][Manager] Add configuration for plugins to support Audit metrics (#3752)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9b1928827 [INLONG-3751][Manager] Add configuration for plugins to support Audit metrics (#3752)
9b1928827 is described below
commit 9b1928827629a78930b995f7eab2d7fb7a49479c
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Sat Apr 16 15:44:37 2022 +0800
[INLONG-3751][Manager] Add configuration for plugins to support Audit metrics (#3752)
---
.../java/org/apache/inlong/manager/plugin/flink/FlinkService.java | 6 ++++--
.../org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java | 2 ++
.../org/apache/inlong/manager/plugin/flink/enums/Constants.java | 2 ++
.../org/apache/inlong/manager/plugin/util/FlinkConfiguration.java | 2 ++
.../manager-plugins/src/main/resources/flink-sort-plugin.properties | 2 ++
5 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 61c8d1c7d..963fae569 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -186,7 +186,7 @@ public class FlinkService {
private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
String localJarPath = flinkInfo.getLocalJarPath();
File jarFile = new File(localJarPath);
- String[] programArgs = genProgramArgs(flinkInfo);
+ String[] programArgs = genProgramArgs(flinkInfo,flinkConfig);
PackagedProgram program = PackagedProgram.newBuilder()
.setConfiguration(configuration)
@@ -234,7 +234,7 @@ public class FlinkService {
/**
* Build the program of the Flink job.
*/
- private String[] genProgramArgs(FlinkInfo flinkInfo) {
+ private String[] genProgramArgs(FlinkInfo flinkInfo,FlinkConfig flinkConfig) {
List<String> list = new ArrayList<>();
list.add("-cluster-id");
list.add(flinkInfo.getJobName());
@@ -244,6 +244,8 @@ public class FlinkService {
list.add(flinkInfo.getSourceType());
list.add("-sink.type");
list.add(flinkInfo.getSinkType());
+ list.add("-metrics.audit.proxy.hosts");
+ list.add(flinkConfig.getAuditProxyHosts());
// TODO Support more than one stream with one group
if (flinkInfo.getInlongStreamInfoList() != null
&& !flinkInfo.getInlongStreamInfoList().isEmpty()) {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
index e125e51f5..986774820 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
@@ -32,4 +32,6 @@ public class FlinkConfig {
private Integer parallelism;
+ private String auditProxyHosts;
+
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 0a44c2bd8..4ffcc4212 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -30,6 +30,8 @@ public class Constants {
public static final String SAVEPOINT_DIRECTORY = "flink.savepoint.directory";
+ public static final String METRICS_AUDIT_PROXY_HOSTS = "metrics.audit.proxy.hosts";
+
//dataflow
public static final String SOURCE_INFO = "source_info";
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
index db91d5271..9f325f4da 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
@@ -29,6 +29,7 @@ import java.util.Properties;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.METRICS_AUDIT_PROXY_HOSTS;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
@@ -99,6 +100,7 @@ public class FlinkConfiguration {
flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM)));
flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY));
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
+ flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS));
return flinkConfig;
}
diff --git a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties b/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
index 7a9bd0cf1..f4867d1e6 100644
--- a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
+++ b/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
@@ -32,3 +32,5 @@ flink.jobmanager.port=6123
flink.savepoint.directory=file:///data/inlong-sort/savepoints
# Flink parallelism
flink.parallelism=1
+# metrics.audit.proxy.hosts
+metrics.audit.proxy.hosts=127.0.0.1:10081
\ No newline at end of file