You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/13 06:28:57 UTC

[incubator-inlong] branch master updated: [INLONG-4648][Manager] Read the path of Sort connectors from configuration file (#4651)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e8f36c68f [INLONG-4648][Manager] Read the path of Sort connectors from configuration file (#4651)
e8f36c68f is described below

commit e8f36c68f1970e964e2e8583877a16d0d85ef2be
Author: woofyzhao <49...@qq.com>
AuthorDate: Mon Jun 13 14:28:52 2022 +0800

    [INLONG-4648][Manager] Read the path of Sort connectors from configuration file (#4651)
---
 .../manager/plugin/flink/FlinkOperation.java       | 36 +++++++++++++++++++---
 1 file changed, 32 insertions(+), 4 deletions(-)

diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 68cc07d6c..ccefecd73 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.plugin.flink;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
@@ -26,8 +27,14 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
 import org.apache.inlong.manager.plugin.util.FlinkUtils;
 
+import java.io.BufferedInputStream;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -39,19 +46,34 @@ import static org.apache.flink.api.common.JobStatus.RUNNING;
 @Slf4j
 public class FlinkOperation {
 
+    private static final String CONFIG_FILE = "application.properties";
+    private static final String CONNECTOR_DIR_KEY = "sort.connector.dir";
     private static final String JOB_TERMINATED_MSG = "the job not found by id %s, "
             + "or task already terminated or savepoint path is null";
     private static final String INLONG_MANAGER = "inlong-manager";
     private static final String INLONG_SORT = "inlong-sort";
     private static final String SORT_JAR_PATTERN = "^sort-dist.*jar$";
-    private static final String SORT_PLUGIN = "sort-plugin";
-
+    private static Properties properties;
     private final FlinkService flinkService;
 
     public FlinkOperation(FlinkService flinkService) {
         this.flinkService = flinkService;
     }
 
+    /**
+     * Get sort connector directory
+     */
+    private static String getConnectorDir(String parent) throws IOException {
+        if (properties == null) {
+            properties = new Properties();
+            String path = Thread.currentThread().getContextClassLoader().getResource("").getPath() + CONFIG_FILE;
+            try (InputStream inputStream = new BufferedInputStream(Files.newInputStream(Paths.get(path)))) {
+                properties.load(inputStream);
+            }
+        }
+        return properties.getProperty(CONNECTOR_DIR_KEY, Paths.get(parent, INLONG_SORT, "connectors").toString());
+    }
+
     /**
      * Get Sort connector jar patterns from the Flink info.
      */
@@ -139,8 +161,14 @@ public class FlinkOperation {
         flinkInfo.setLocalJarPath(jarPath);
         log.info("get sort jar path success, path: {}", jarPath);
 
-        String pluginPath = startPath + SORT_PLUGIN;
-        List<String> connectorPaths = FlinkUtils.listFiles(pluginPath, getConnectorJarPattern(flinkInfo), -1);
+        String connectorDir = getConnectorDir(startPath);
+        List<String> connectorPaths = FlinkUtils.listFiles(connectorDir, getConnectorJarPattern(flinkInfo), -1);
+        if (CollectionUtils.isEmpty(connectorPaths)) {
+            String message = String.format("no sort connectors found in %s", connectorDir);
+            log.error(message);
+            throw new RuntimeException(message);
+        }
+
         flinkInfo.setConnectorJarPaths(connectorPaths);
         log.info("get sort connector paths success, paths: {}", connectorPaths);