You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/08 03:03:38 UTC

[inlong] branch master updated: [INLONG-4915][Manager] Add Pulsar source type and improve connector jar discovery mechanism (#4918)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 683ea805b [INLONG-4915][Manager] Add Pulsar source type and improve connector jar discovery mechanism (#4918)
683ea805b is described below

commit 683ea805b6fb5e0935916793944b244e32298a91
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Jul 8 11:03:33 2022 +0800

    [INLONG-4915][Manager] Add Pulsar source type and improve connector jar discovery mechanism (#4918)
---
 .../java/org/apache/inlong/manager/plugin/flink/FlinkService.java    | 5 +++--
 .../apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java   | 2 ++
 2 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 5a8161579..5b0b2252b 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -198,7 +198,7 @@ public class FlinkService {
         final File jarFile = new File(localJarPath);
         final String[] programArgs = genProgramArgsV2(flinkInfo, flinkConfig);
 
-        List<URL> classPaths = flinkInfo.getConnectorJarPaths().stream().map(p -> {
+        List<URL> connectorJars = flinkInfo.getConnectorJarPaths().stream().map(p -> {
             try {
                 return new File(p).toURI().toURL();
             } catch (MalformedURLException e) {
@@ -210,10 +210,11 @@ public class FlinkService {
                 .setConfiguration(configuration)
                 .setEntryPointClassName(Constants.ENTRYPOINT_CLASS)
                 .setJarFile(jarFile)
-                .setUserClassPaths(classPaths)
+                .setUserClassPaths(connectorJars)
                 .setArguments(programArgs)
                 .setSavepointRestoreSettings(settings).build();
         JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism, false);
+        jobGraph.addJars(connectorJars);
 
         RestClusterClient<StandaloneClusterId> client = getFlinkClient();
         CompletableFuture<JobID> result = client.submitJob(jobGraph);
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
index 9f383c610..87f2ec2e1 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
@@ -40,6 +40,8 @@ public enum ConnectorJarType {
 
     SQLSERVER_SOURCE("sqlserverExtract", "sqlserver-cdc"),
 
+    PULSAR_SOURCE("pulsarExtract", "pulsar"),
+
     /**
      * load datasource type
      */