You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/16 08:38:16 UTC
[incubator-inlong] branch release-1.1.0 updated: [INLONG-3753][Manager] Add configuration for plugins to support metrics (#3754)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/release-1.1.0 by this push:
new 8df060b10 [INLONG-3753][Manager] Add configuration for plugins to support metrics (#3754)
8df060b10 is described below
commit 8df060b101f544644f6986d74f387aa82208bef1
Author: jiancheng Lv <63...@qq.com>
AuthorDate: Sat Apr 16 16:38:10 2022 +0800
[INLONG-3753][Manager] Add configuration for plugins to support metrics (#3754)
Co-authored-by: jianchenglv <6>
Co-authored-by: dockerzhang <do...@apache.org>
---
.../java/org/apache/inlong/manager/plugin/flink/FlinkService.java | 6 ++++--
.../apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java | 8 ++++----
.../org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java | 4 ++++
.../org/apache/inlong/manager/plugin/flink/enums/Constants.java | 6 ++++--
.../org/apache/inlong/manager/plugin/util/FlinkConfiguration.java | 4 ++++
.../src/main/resources/flink-sort-plugin.properties | 5 +++++
6 files changed, 25 insertions(+), 8 deletions(-)
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 61c8d1c7d..4e7a7fd4c 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -186,7 +186,7 @@ public class FlinkService {
private String submitJobBySavepoint(FlinkInfo flinkInfo, SavepointRestoreSettings settings) throws Exception {
String localJarPath = flinkInfo.getLocalJarPath();
File jarFile = new File(localJarPath);
- String[] programArgs = genProgramArgs(flinkInfo);
+ String[] programArgs = genProgramArgs(flinkInfo, flinkConfig);
PackagedProgram program = PackagedProgram.newBuilder()
.setConfiguration(configuration)
@@ -234,7 +234,7 @@ public class FlinkService {
/**
* Build the program of the Flink job.
*/
- private String[] genProgramArgs(FlinkInfo flinkInfo) {
+ private String[] genProgramArgs(FlinkInfo flinkInfo, FlinkConfig flinkConfig) {
List<String> list = new ArrayList<>();
list.add("-cluster-id");
list.add(flinkInfo.getJobName());
@@ -244,6 +244,8 @@ public class FlinkService {
list.add(flinkInfo.getSourceType());
list.add("-sink.type");
list.add(flinkInfo.getSinkType());
+ list.add("-metrics.audit.proxy.hosts");
+ list.add(flinkConfig.getAuditProxyHosts());
// TODO Support more than one stream with one group
if (flinkInfo.getInlongStreamInfoList() != null
&& !flinkInfo.getInlongStreamInfoList().isEmpty()) {
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
index 1350a7276..c321c925e 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
-import org.apache.inlong.manager.plugin.flink.enums.Constants;
import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
import static org.apache.flink.api.common.JobStatus.FINISHED;
@@ -78,8 +77,9 @@ public class IntegrationTaskRunner implements Runnable {
case RESTART:
try {
StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
- stopWithSavepointRequest.setDrain(Constants.DRAIN);
- stopWithSavepointRequest.setTargetDirectory(Constants.SAVEPOINT_DIRECTORY);
+ FlinkConfig flinkConfig = flinkService.getFlinkConfig();
+ stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
+ stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
flinkInfo.setSavepointPath(location);
log.info("the jobId: {} savepoint: {} ", flinkInfo.getJobId(), location);
@@ -116,8 +116,8 @@ public class IntegrationTaskRunner implements Runnable {
case STOP:
try {
StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
- stopWithSavepointRequest.setDrain(Constants.DRAIN);
FlinkConfig flinkConfig = flinkService.getFlinkConfig();
+ stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
String location = flinkService.stopJob(flinkInfo.getJobId(), stopWithSavepointRequest);
flinkInfo.setSavepointPath(location);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
index e125e51f5..3bde24ffe 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
@@ -32,4 +32,8 @@ public class FlinkConfig {
private Integer parallelism;
+ private boolean drain;
+
+ private String auditProxyHosts;
+
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 0a44c2bd8..2f60164b3 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -30,6 +30,10 @@ public class Constants {
public static final String SAVEPOINT_DIRECTORY = "flink.savepoint.directory";
+ public static final String DRAIN = "flink.drain";
+
+ public static final String METRICS_AUDIT_PROXY_HOSTS = "metrics.audit.proxy.hosts";
+
//dataflow
public static final String SOURCE_INFO = "source_info";
@@ -62,6 +66,4 @@ public class Constants {
public static final String SEPARATOR = ":";
- public static final boolean DRAIN = false;
-
}
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
index db91d5271..c591b89b7 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
@@ -30,6 +30,8 @@ import java.util.Properties;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.METRICS_AUDIT_PROXY_HOSTS;
+import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
import static org.apache.inlong.manager.plugin.flink.enums.Constants.SAVEPOINT_DIRECTORY;
@@ -99,6 +101,8 @@ public class FlinkConfiguration {
flinkConfig.setParallelism(Integer.valueOf(properties.getProperty(PARALLELISM)));
flinkConfig.setSavepointDirectory(properties.getProperty(SAVEPOINT_DIRECTORY));
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
+ flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
+ flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS));
return flinkConfig;
}
diff --git a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties b/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
index 7a9bd0cf1..388c6aaa5 100644
--- a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
+++ b/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
@@ -32,3 +32,8 @@ flink.jobmanager.port=6123
flink.savepoint.directory=file:///data/inlong-sort/savepoints
# Flink parallelism
flink.parallelism=1
+# flink stop request drain
+flink.drain=false
+
+# metrics.audit.proxy.hosts
+metrics.audit.proxy.hosts=127.0.0.1:10081