You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/13 06:28:57 UTC
[incubator-inlong] branch master updated: [INLONG-4648][Manager] Read the path of Sort connectors from configuration file (#4651)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e8f36c68f [INLONG-4648][Manager] Read the path of Sort connectors from configuration file (#4651)
e8f36c68f is described below
commit e8f36c68f1970e964e2e8583877a16d0d85ef2be
Author: woofyzhao <49...@qq.com>
AuthorDate: Mon Jun 13 14:28:52 2022 +0800
[INLONG-4648][Manager] Read the path of Sort connectors from configuration file (#4651)
---
.../manager/plugin/flink/FlinkOperation.java | 36 +++++++++++++++++++---
1 file changed, 32 insertions(+), 4 deletions(-)
diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
index 68cc07d6c..ccefecd73 100644
--- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
+++ b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.plugin.flink;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
@@ -26,8 +27,14 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
import org.apache.inlong.manager.plugin.flink.enums.TaskCommitType;
import org.apache.inlong.manager.plugin.util.FlinkUtils;
+import java.io.BufferedInputStream;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -39,19 +46,34 @@ import static org.apache.flink.api.common.JobStatus.RUNNING;
@Slf4j
public class FlinkOperation {
+ private static final String CONFIG_FILE = "application.properties";
+ private static final String CONNECTOR_DIR_KEY = "sort.connector.dir";
private static final String JOB_TERMINATED_MSG = "the job not found by id %s, "
+ "or task already terminated or savepoint path is null";
private static final String INLONG_MANAGER = "inlong-manager";
private static final String INLONG_SORT = "inlong-sort";
private static final String SORT_JAR_PATTERN = "^sort-dist.*jar$";
- private static final String SORT_PLUGIN = "sort-plugin";
-
+ private static Properties properties;
private final FlinkService flinkService;
public FlinkOperation(FlinkService flinkService) {
this.flinkService = flinkService;
}
+ /**
+ * Get sort connector directory
+ */
+ private static String getConnectorDir(String parent) throws IOException {
+ if (properties == null) {
+ properties = new Properties();
+ String path = Thread.currentThread().getContextClassLoader().getResource("").getPath() + CONFIG_FILE;
+ try (InputStream inputStream = new BufferedInputStream(Files.newInputStream(Paths.get(path)))) {
+ properties.load(inputStream);
+ }
+ }
+ return properties.getProperty(CONNECTOR_DIR_KEY, Paths.get(parent, INLONG_SORT, "connectors").toString());
+ }
+
/**
* Get Sort connector jar patterns from the Flink info.
*/
@@ -139,8 +161,14 @@ public class FlinkOperation {
flinkInfo.setLocalJarPath(jarPath);
log.info("get sort jar path success, path: {}", jarPath);
- String pluginPath = startPath + SORT_PLUGIN;
- List<String> connectorPaths = FlinkUtils.listFiles(pluginPath, getConnectorJarPattern(flinkInfo), -1);
+ String connectorDir = getConnectorDir(startPath);
+ List<String> connectorPaths = FlinkUtils.listFiles(connectorDir, getConnectorJarPattern(flinkInfo), -1);
+ if (CollectionUtils.isEmpty(connectorPaths)) {
+ String message = String.format("no sort connectors found in %s", connectorDir);
+ log.error(message);
+ throw new RuntimeException(message);
+ }
+
flinkInfo.setConnectorJarPaths(connectorPaths);
log.info("get sort connector paths success, paths: {}", connectorPaths);