You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/21 12:52:03 UTC
[08/52] [abbrv] kylin git commit: Feature changed: automatically
append hive dependent jars to 'tmpjars'
Feature changed: automatically append hive dependent jars to 'tmpjars'
Signed-off-by: shaofengshi <sh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bfccac90
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bfccac90
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bfccac90
Branch: refs/heads/1.x-HBase1.1.3
Commit: bfccac90f54a39f87738eb7a6955a0f0e404bc60
Parents: 73cfbef
Author: Zhong Yanghong <ya...@ebay.com>
Authored: Sun Jan 24 09:35:22 2016 +0000
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Jan 25 09:47:11 2016 +0800
----------------------------------------------------------------------
.../kylin/job/hadoop/AbstractHadoopJob.java | 123 ++++++++++++++++---
1 file changed, 103 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/bfccac90/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index a851756..f5c85eb 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -30,6 +30,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -127,6 +129,26 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
return retVal;
}
+ private static final String KYLIN_HIVE_DEPENDENCY_JARS = "[^,]*hive-exec.jar|[^,]*hive-metastore.jar|[^,]*hive-hcatalog-core[0-9.-]*jar";
+
+ String filterKylinHiveDependency(String kylinHiveDependency) {
+ if (StringUtils.isBlank(kylinHiveDependency))
+ return "";
+
+ StringBuilder jarList = new StringBuilder();
+
+ Pattern hivePattern = Pattern.compile(KYLIN_HIVE_DEPENDENCY_JARS);
+ Matcher matcher = hivePattern.matcher(kylinHiveDependency);
+
+ while (matcher.find()) {
+ if (jarList.length() > 0)
+ jarList.append(",");
+ jarList.append(matcher.group());
+ }
+
+ return jarList.toString();
+ }
+
private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
protected void setJobClasspath(Job job) {
@@ -141,14 +163,13 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
- logger.info("append kylin.hive.dependency: " + kylinHiveDependency + " and kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
+ logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
Configuration jobConf = job.getConfiguration();
String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
if (classpath == null || classpath.length() == 0) {
logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
classpath = getDefaultMapRedClasspath();
- classpath = classpath.replace(":", ","); // yarn classpath is comma separated
logger.info("The default mapred classpath is: " + classpath);
}
@@ -158,40 +179,102 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
classpath = classpath + "," + kylinHBaseDependency;
}
+ jobConf.set(MAP_REDUCE_CLASSPATH, classpath);
+ logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
+
+ /*
+ * set extra dependencies as tmpjars & tmpfiles if configured
+ */
+ StringBuilder kylinDependency = new StringBuilder();
+
+ // for hive dependencies
if (kylinHiveDependency != null) {
// yarn classpath is comma separated
kylinHiveDependency = kylinHiveDependency.replace(":", ",");
- classpath = classpath + "," + kylinHiveDependency;
+
+ logger.info("Hive Dependencies Before Filtered: " + kylinHiveDependency);
+ String filteredHive = filterKylinHiveDependency(kylinHiveDependency);
+ logger.info("Hive Dependencies After Filtered: " + filteredHive);
+
+ if (kylinDependency.length() > 0)
+ kylinDependency.append(",");
+ kylinDependency.append(filteredHive);
}
- jobConf.set(MAP_REDUCE_CLASSPATH, classpath);
- logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
+ // for KylinJobMRLibDir
+ String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir();
+ if (!StringUtils.isBlank(mrLibDir)) {
+ File dirFileMRLIB = new File(mrLibDir);
+ if (dirFileMRLIB.exists()) {
+ if (kylinDependency.length() > 0)
+ kylinDependency.append(",");
+ kylinDependency.append(mrLibDir);
+ } else {
+ logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!");
+ }
+ }
- // set extra dependencies as tmpjars & tmpfiles if configured
- setJobTmpJarsAndFiles(job);
+ setJobTmpJarsAndFiles(job, kylinDependency.toString());
}
- private void setJobTmpJarsAndFiles(Job job) {
- String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir();
- if (StringUtils.isBlank(mrLibDir))
+ private void setJobTmpJarsAndFiles(Job job, String kylinDependency) {
+ if (StringUtils.isBlank(kylinDependency))
return;
+ String[] fNameList = kylinDependency.split(",");
+
try {
Configuration jobConf = job.getConfiguration();
- FileSystem fs = FileSystem.get(new Configuration(jobConf));
- FileStatus[] fList = fs.listStatus(new Path(mrLibDir));
-
+ FileSystem fs = FileSystem.getLocal(jobConf);
+
StringBuilder jarList = new StringBuilder();
StringBuilder fileList = new StringBuilder();
-
+
+ for (String fileName : fNameList) {
+ Path p = new Path(fileName);
+ if (fs.getFileStatus(p).isDirectory()) {
+ appendTmpDir(job, fileName);
+ continue;
+ }
+
+ StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
+ if (list.length() > 0)
+ list.append(",");
+ list.append(fs.getFileStatus(p).getPath().toString());
+ }
+
+ appendTmpFiles(fileList.toString(), jobConf);
+ appendTmpJars(jarList.toString(), jobConf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void appendTmpDir(Job job, String tmpDir) {
+ if (StringUtils.isBlank(tmpDir))
+ return;
+
+ try {
+ Configuration jobConf = job.getConfiguration();
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ FileStatus[] fList = fs.listStatus(new Path(tmpDir));
+
+ StringBuilder jarList = new StringBuilder();
+ StringBuilder fileList = new StringBuilder();
+
for (FileStatus file : fList) {
Path p = file.getPath();
+ if (fs.getFileStatus(p).isDirectory()) {
+ appendTmpDir(job, p.toString());
+ continue;
+ }
+
StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
if (list.length() > 0)
list.append(",");
- list.append(mrLibDir + "/" + file.getPath().getName());
+ list.append(fs.getFileStatus(p).getPath().toString());
}
-
+
appendTmpFiles(fileList.toString(), jobConf);
appendTmpJars(jarList.toString(), jobConf);
@@ -203,7 +286,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
private void appendTmpJars(String jarList, Configuration conf) {
if (StringUtils.isBlank(jarList))
return;
-
+
String tmpJars = conf.get("tmpjars", null);
if (tmpJars == null) {
tmpJars = jarList;
@@ -217,7 +300,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
private void appendTmpFiles(String fileList, Configuration conf) {
if (StringUtils.isBlank(fileList))
return;
-
+
String tmpFiles = conf.get("tmpfiles", null);
if (tmpFiles == null) {
tmpFiles = fileList;
@@ -338,10 +421,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
else
hdfsMetaDir = "file:///" + hdfsMetaDir;
logger.info("HDFS meta dir is: " + hdfsMetaDir);
-
+
appendTmpFiles(hdfsMetaDir, conf);
}
-
+
private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
ResourceStore from = ResourceStore.getStore(kylinConfig);
KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());