You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/03/29 11:35:37 UTC

[kylin] branch master updated: KYLIN-3904 Support more dependency jars in FlinkExecutable for FlinkCubeHFile

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a7a5a2  KYLIN-3904 Support more dependency jars in FlinkExecutable for FlinkCubeHFile
1a7a5a2 is described below

commit 1a7a5a2644b40b9a5cdd74a2c4b6c7e0ab78193e
Author: harveyyue <yw...@126.com>
AuthorDate: Tue Mar 10 15:02:55 2020 +0800

    KYLIN-3904 Support more dependency jars in FlinkExecutable for FlinkCubeHFile
---
 .../org/apache/kylin/engine/flink/FlinkExecutable.java  | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 5344cb5..5fb7190 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -48,7 +48,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Locale;
@@ -163,9 +165,6 @@ public class FlinkExecutable extends AbstractExecutable {
             String hadoopClasspathEnv = new File(hadoopConf).getParentFile().getAbsolutePath();
 
             String jobJar = config.getKylinJobJarPath();
-            if (StringUtils.isEmpty(jars)) {
-                jars = jobJar;
-            }
 
             String segmentID = this.getParam(FlinkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
             CubeSegment segment = cube.getSegmentById(segmentID);
@@ -205,16 +204,24 @@ public class FlinkExecutable extends AbstractExecutable {
                     //flink on yarn specific option (pattern : -yn 1)
                     if (configOptionKey.startsWith("-y") && !entry.getValue().isEmpty()) {
                         sb.append(" ").append(configOptionKey).append(" ").append(entry.getValue());
-                    } else if(!configOptionKey.startsWith("-y")){
+                    } else if (!configOptionKey.startsWith("-y")) {
                         //flink on yarn specific option (pattern : -yD taskmanager.network.memory.min=536346624)
                         sb.append(" ").append(configOptionKey).append("=").append(entry.getValue());
                     }
                 }
             }
+            if (StringUtils.isNotBlank(jars)) {
+                String[] splitJars = jars.split(",\\s*");
+                Set<String> setJars = new HashSet();
+                setJars.addAll(Arrays.asList(splitJars));
+                for (String jar : setJars) {
+                    sb.append(String.format(Locale.ROOT, " -C file://%s", jar));
+                }
+            }
 
             sb.append(" -c org.apache.kylin.common.util.FlinkEntry -p %s %s %s ");
             final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, hadoopClasspathEnv,
-                    KylinConfig.getFlinkHome(), parallelism, jars, formatArgs());
+                    KylinConfig.getFlinkHome(), parallelism, jobJar, formatArgs());
             logger.info("cmd: " + cmd);
             final ExecutorService executorService = Executors.newSingleThreadExecutor();
             final CliCommandExecutor exec = new CliCommandExecutor();