You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/04/12 02:18:29 UTC
[kylin] branch engine-flink updated: KYLIN-3915 Make
HADOOP_CLASSPATH configurable for Flink engine
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch engine-flink
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/engine-flink by this push:
new 2ade921 KYLIN-3915 Make HADOOP_CLASSPATH configurable for Flink engine
2ade921 is described below
commit 2ade9215712e1b8020f2583721667a966a1525fe
Author: yanghua <ya...@gmail.com>
AuthorDate: Wed Mar 27 23:23:08 2019 +0800
KYLIN-3915 Make HADOOP_CLASSPATH configurable for Flink engine
---
.../java/org/apache/kylin/engine/flink/FlinkExecutable.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
index 83cc815..6afd4b3 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/FlinkExecutable.java
@@ -45,6 +45,7 @@ import org.apache.kylin.metadata.model.Segments;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -151,8 +152,7 @@ public class FlinkExecutable extends AbstractExecutable {
String jars = this.getParam(JARS);
- String hadoopConf = null;
- hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
+ String hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
if (StringUtils.isEmpty(hadoopConf)) {
throw new RuntimeException(
@@ -161,6 +161,8 @@ public class FlinkExecutable extends AbstractExecutable {
logger.info("Using " + hadoopConf + " as HADOOP_CONF_DIR");
+ String hadoopClasspathEnv = new File(hadoopConf).getParentFile().getAbsolutePath();
+
String jobJar = config.getKylinJobJarPath();
if (StringUtils.isEmpty(jars)) {
jars = jobJar;
@@ -173,9 +175,9 @@ public class FlinkExecutable extends AbstractExecutable {
StringBuilder sb = new StringBuilder();
if (Shell.osType == Shell.OSType.OS_TYPE_WIN) {
- sb.append("set HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster ");
+ sb.append("set HADOOP_CONF_DIR=%s && set HADOOP_CLASSPATH=%s && %s/bin/flink run -m yarn-cluster ");
} else {
- sb.append("export HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=/usr/hdp/2.4.0.0-169/hadoop/ && %s/bin/flink run -m yarn-cluster ");
+ sb.append("export HADOOP_CONF_DIR=%s && export HADOOP_CLASSPATH=%s && %s/bin/flink run -m yarn-cluster ");
}
Map<String, String> flinkConfs = config.getFlinkConfigOverride();
@@ -194,12 +196,11 @@ public class FlinkExecutable extends AbstractExecutable {
}
String onYarnConfigOptionKey = FlinkOnYarnConfigMapping.flinkOnYarnConfigMap.get(entry.getKey());
-
sb.append(" ").append(onYarnConfigOptionKey).append(" ").append(entry.getValue());
}
sb.append(" -c org.apache.kylin.common.util.FlinkEntry %s %s ");
- final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf,
+ final String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, hadoopClasspathEnv,
KylinConfig.getFlinkHome(), jars, formatArgs());
logger.info("cmd: " + cmd);
final ExecutorService executorService = Executors.newSingleThreadExecutor();