You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/03/29 11:35:37 UTC
[kylin] branch master updated: KYLIN-3904 Support more dependency
jars in FlinkExecutable for FlinkCubeHFile
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 1a7a5a2 KYLIN-3904 Support more dependency jars in FlinkExecutable for FlinkCubeHFile
1a7a5a2 is described below
commit 1a7a5a2644b40b9a5cdd74a2c4b6c7e0ab78193e
Author: harveyyue <yw...@126.com>
AuthorDate: Tue Mar 10 15:02:55 2020 +0800
KYLIN-3904 Support more dependency jars in FlinkExecutable for FlinkCubeHFile
---
.../org/apache/kylin/engine/flink/FlinkExecutable.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 5344cb5..5fb7190 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -48,7 +48,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
@@ -163,9 +165,6 @@ public class FlinkExecutable extends AbstractExecutable {
String hadoopClasspathEnv = new File(hadoopConf).getParentFile().getAbsolutePath();
String jobJar = config.getKylinJobJarPath();
- if (StringUtils.isEmpty(jars)) {
- jars = jobJar;
- }
String segmentID = this.getParam(FlinkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
CubeSegment segment = cube.getSegmentById(segmentID);
@@ -205,16 +204,24 @@ public class FlinkExecutable extends AbstractExecutable {
//flink on yarn specific option (pattern : -yn 1)
if (configOptionKey.startsWith("-y") && !entry.getValue().isEmpty()) {
sb.append(" ").append(configOptionKey).append(" ").append(entry.getValue());
- } else if(!configOptionKey.startsWith("-y")){
+ } else if (!configOptionKey.startsWith("-y")) {
//flink on yarn specific option (pattern : -yD taskmanager.network.memory.min=536346624)
sb.append(" ").append(configOptionKey).append("=").append(entry.getValue());
}
}
}
+ if (StringUtils.isNotBlank(jars)) {
+ String[] splitJars = jars.split(",\\s*");
+ Set<String> setJars = new HashSet();
+ setJars.addAll(Arrays.asList(splitJars));
+ for (String jar : setJars) {
+ sb.append(String.format(Locale.ROOT, " -C file://%s", jar));
+ }
+ }
sb.append(" -c org.apache.kylin.common.util.FlinkEntry -p %s %s %s ");
final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, hadoopClasspathEnv,
- KylinConfig.getFlinkHome(), parallelism, jars, formatArgs());
+ KylinConfig.getFlinkHome(), parallelism, jobJar, formatArgs());
logger.info("cmd: " + cmd);
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final CliCommandExecutor exec = new CliCommandExecutor();