You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/02 04:29:29 UTC
incubator-kylin git commit: KYLIN-953 Let MapReduceExecutable pick up
patched hadoop config
Repository: incubator-kylin
Updated Branches:
refs/heads/1.x-staging 0f9acf002 -> bff402f46
KYLIN-953 Let MapReduceExecutable pick up patched hadoop config
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bff402f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bff402f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bff402f4
Branch: refs/heads/1.x-staging
Commit: bff402f466ccbb93102330eace7c6904dfb1d242
Parents: 0f9acf0
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Nov 2 11:29:21 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Nov 2 11:29:21 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/common/util/HadoopUtil.java | 26 ++++++++++++--------
.../kylin/job/common/MapReduceExecutable.java | 7 ++++--
.../kylin/rest/service/PerformService.java | 7 ++++--
3 files changed, 26 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bff402f4/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 2dc8be0..b815969 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -46,7 +46,7 @@ public class HadoopUtil {
public static Configuration getCurrentConfiguration() {
if (hadoopConfig.get() == null) {
- hadoopConfig.set(new Configuration());
+ hadoopConfig.set(newConfiguration());
}
return hadoopConfig.get();
}
@@ -89,8 +89,11 @@ public class HadoopUtil {
}
}
- /**
- */
+ public static Configuration newConfiguration() {
+ Configuration conf = new Configuration();
+ return healSickConfig(conf);
+ }
+
public static Configuration newHBaseConfiguration(String url) {
Configuration conf = HBaseConfiguration.create(getCurrentConfiguration());
@@ -103,6 +106,16 @@ public class HadoopUtil {
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs);
}
+ // reduce rpc retry
+ conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
+ conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
+ conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+ // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
+
+ return healSickConfig(conf);
+ }
+
+ private static Configuration healSickConfig(Configuration conf) {
// https://issues.apache.org/jira/browse/KYLIN-953
if (StringUtils.isBlank(conf.get("hadoop.tmp.dir"))) {
conf.set("hadoop.tmp.dir", "/tmp");
@@ -110,13 +123,6 @@ public class HadoopUtil {
if (StringUtils.isBlank(conf.get("hbase.fs.tmp.dir"))) {
conf.set("hbase.fs.tmp.dir", "/tmp");
}
-
- // reduce rpc retry
- conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
- conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5");
- conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
- // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true");
-
return conf;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bff402f4/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
index cbf453d..2220884 100644
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.RMHAUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.constant.JobStepStatusEnum;
import org.apache.kylin.job.exception.ExecuteException;
@@ -73,7 +74,8 @@ public class MapReduceExecutable extends AbstractExecutable {
return;
}
try {
- Job job = new Cluster(new Configuration()).getJob(JobID.forName(mrJobId));
+ Configuration conf = HadoopUtil.getCurrentConfiguration();
+ Job job = new Cluster(conf).getJob(JobID.forName(mrJobId));
if (job.getJobState() == JobStatus.State.FAILED) {
//remove previous mr job info
super.onExecuteStart(executableContext);
@@ -102,7 +104,8 @@ public class MapReduceExecutable extends AbstractExecutable {
Job job;
final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
- job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
+ Configuration conf = HadoopUtil.getCurrentConfiguration();
+ job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
} else {
final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bff402f4/server/src/main/java/org/apache/kylin/rest/service/PerformService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/PerformService.java b/server/src/main/java/org/apache/kylin/rest/service/PerformService.java
index 8d6ed7b..e34aa1f 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/PerformService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/PerformService.java
@@ -22,11 +22,13 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -101,7 +103,7 @@ public class PerformService extends BasicService {
List<String[]> allRows = null;
CSVReader reader = null;
FileSystem fs = null;
- Configuration conf = new Configuration();
+ Configuration conf = HadoopUtil.newConfiguration();
try {
fs = FileSystem.newInstance(conf);
@@ -114,7 +116,8 @@ public class PerformService extends BasicService {
} catch (IOException e) {
logger.info("failed to read hdfs file:", e);
} finally {
- fs.close();
+ IOUtils.closeQuietly(reader);
+ IOUtils.closeQuietly(fs);
}
return allRows;
}