You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/11 14:51:42 UTC
kylin git commit: minor, mr code clean up
Repository: kylin
Updated Branches:
refs/heads/yang21 b42338448 -> f5e619802
minor, mr code clean up
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f5e61980
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f5e61980
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f5e61980
Branch: refs/heads/yang21
Commit: f5e619802c54aaca03a1771c9a38af33f6adf50c
Parents: b423384
Author: Yang Li <li...@apache.org>
Authored: Fri Nov 11 22:51:23 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Fri Nov 11 22:51:23 2016 +0800
----------------------------------------------------------------------
.../main/java/org/apache/kylin/engine/mr/KylinMapper.java | 6 ++++--
.../main/java/org/apache/kylin/engine/mr/KylinReducer.java | 6 ++++--
.../java/org/apache/kylin/engine/mr/steps/CuboidJob.java | 8 +++-----
.../org/apache/kylin/engine/mr/steps/InMemCuboidJob.java | 3 ++-
4 files changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
index a527b3d..a01f7a2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
/**
*/
-abstract public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class);
protected void bindCurrentConfiguration(Configuration conf) {
@@ -54,7 +54,9 @@ abstract public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapp
}
}
- abstract protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException;
+ protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+ super.map(key, value, context);
+ }
@Override
final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
index 2987032..2b63ce0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
/**
*/
-abstract public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class);
protected void bindCurrentConfiguration(Configuration conf) {
@@ -53,7 +53,9 @@ abstract public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Red
}
}
- abstract protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException;
+ protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+ super.reduce(key, values, context);
+ }
@Override
final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 6b0c86e..9486e60 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -21,11 +21,11 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
public class CuboidJob extends AbstractHadoopJob {
protected static final Logger logger = LoggerFactory.getLogger(CuboidJob.class);
- private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
@SuppressWarnings("rawtypes")
private Class<? extends Mapper> mapperClass;
@@ -165,7 +164,6 @@ public class CuboidJob extends AbstractHadoopJob {
}
protected void setReduceTaskNum(Job job, CubeDesc cubeDesc, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
- Configuration jobConf = job.getConfiguration();
KylinConfig kylinConfig = cubeDesc.getConfig();
double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
@@ -200,12 +198,12 @@ public class CuboidJob extends AbstractHadoopJob {
// no more than 500 reducer by default
numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
- jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks);
+ job.setNumReduceTasks(numReduceTasks);
logger.info("Having total map input MB " + Math.round(totalMapInputMB));
logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio);
- logger.info("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks);
+ logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks);
}
/**
http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 013f2c9..d9558a4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
@@ -157,7 +158,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
logger.info("Having total map input MB " + Math.round(totalSizeInM));
logger.info("Having per reduce MB " + perReduceInputMB);
- logger.info("Setting " + "mapred.reduce.tasks" + "=" + numReduceTasks);
+ logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks);
return numReduceTasks;
}