You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/04/12 02:18:29 UTC

[kylin] branch engine-flink updated: KYLIN-3915 Make HADOOP_CLASSPATH configurable for Flink engine

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/engine-flink by this push:
     new 2ade921  KYLIN-3915 Make HADOOP_CLASSPATH configurable for Flink engine
2ade921 is described below

commit 2ade9215712e1b8020f2583721667a966a1525fe
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Mar 27 23:23:08 2019 +0800

    KYLIN-3915 Make HADOOP_CLASSPATH configurable for Flink engine
---
 .../java/org/apache/kylin/engine/flink/FlinkExecutable.java | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 83cc815..6afd4b3 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -45,6 +45,7 @@ import org.apache.kylin.metadata.model.Segments;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -151,8 +152,7 @@ public class FlinkExecutable extends AbstractExecutable {
 
             String jars = this.getParam(JARS);
 
-            String hadoopConf = null;
-            hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
+            String hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
 
             if (StringUtils.isEmpty(hadoopConf)) {
                 throw new RuntimeException(
@@ -161,6 +161,8 @@ public class FlinkExecutable extends AbstractExecutable {
 
             logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
 
+            String hadoopClasspathEnv = new File(hadoopConf).getParentFile().getAbsolutePath();
+
             String jobJar = config.getKylinJobJarPath();
             if (StringUtils.isEmpty(jars)) {
                 jars = jobJar;
@@ -173,9 +175,9 @@ public class FlinkExecutable extends AbstractExecutable {
 
             StringBuilder sb = new StringBuilder();
             if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
-                sb.append("set HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster ");
+                sb.append("set HADOOP_CONF_DIR=%s && set HADOOP_CLASSPATH=%s && %s/bin/flink run -m yarn-cluster ");
             } else {
-                sb.append("export HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster ");
+                sb.append("export HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=%s && %s/bin/flink run -m yarn-cluster ");
             }
 
             Map<String, String> flinkConfs = config.getFlinkConfigOverride();
@@ -194,12 +196,11 @@ public class FlinkExecutable extends AbstractExecutable {
                 }
 
                 String onYarnConfigOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey());
-
                 sb.append(" ").append(onYarnConfigOptionKey).append(" ").append(entry.getValue());
             }
 
             sb.append(" -c org.apache.kylin.common.util.FlinkEntry %s %s ");
-            final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf,
+            final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, hadoopClasspathEnv,
                     KylinConfig.getFlinkHome(), jars, formatArgs());
             logger.info("cmd: " + cmd);
             final ExecutorService executorService = Executors.newSingleThreadExecutor();