You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/08 03:03:38 UTC
[inlong] branch master updated: [INLONG-4915][Manager] Add Pulsar source type and improve connector jar discovery mechanism (#4918)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 683ea805b [INLONG-4915][Manager] Add Pulsar source type and improve connector jar discovery mechanism (#4918)
683ea805b is described below
commit 683ea805b6fb5e0935916793944b244e32298a91
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Jul 8 11:03:33 2022 +0800
[INLONG-4915][Manager] Add Pulsar source type and improve connector jar discovery mechanism (#4918)
---
.../java/org/apache/inlong/manager/plugin/flink/FlinkService.java | 5 +++--
.../apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java | 2 ++
2 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 5a8161579..5b0b2252b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -198,7 +198,7 @@ public class FlinkService {
final File jarFile = new File(localJarPath);
final String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
- List<URL> classPaths = flinkInfo.getConnectorJarPaths().stream().map(p -> {
+ List<URL> connectorJars = flinkInfo.getConnectorJarPaths().stream().map(p -> {
try {
return new File(p).toURI().toURL();
} catch (MalformedURLException e) {
@@ -210,10 +210,11 @@ public class FlinkService {
.setConfiguration(configuration)
.setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
.setJarFile(jarFile)
- .setUserClassPaths(classPaths)
+ .setUserClassPaths(connectorJars)
.setArguments(programArgs)
.setSavepointRestoreSettings(settings).build();
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism, false);
+ jobGraph.addJars(connectorJars);
RestClusterClient<StandaloneClusterId> client = getFlinkClient();
CompletableFuture<JobID> result = client.submitJob(jobGraph);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
index 9f383c610..87f2ec2e1 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
@@ -40,6 +40,8 @@ public enum ConnectorJarType {
SQLSERVER_SOURCE("sqlserverExtract", "sqlserver-cdc"),
+ PULSAR_SOURCE("pulsarExtract", "pulsar"),
+
/**
* load datasource type
*/