You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/11 14:51:42 UTC

kylin git commit: minor, mr code clean up

Repository: kylin
Updated Branches:
  refs/heads/yang21 b42338448 -> f5e619802


minor, mr code clean up


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f5e61980
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f5e61980
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f5e61980

Branch: refs/heads/yang21
Commit: f5e619802c54aaca03a1771c9a38af33f6adf50c
Parents: b423384
Author: Yang Li <li...@apache.org>
Authored: Fri Nov 11 22:51:23 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Fri Nov 11 22:51:23 2016 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/kylin/engine/mr/KylinMapper.java    | 6 ++++--
 .../main/java/org/apache/kylin/engine/mr/KylinReducer.java   | 6 ++++--
 .../java/org/apache/kylin/engine/mr/steps/CuboidJob.java     | 8 +++-----
 .../org/apache/kylin/engine/mr/steps/InMemCuboidJob.java     | 3 ++-
 4 files changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
index a527b3d..a01f7a2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  */
-abstract public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class);
 
     protected void bindCurrentConfiguration(Configuration conf) {
@@ -54,7 +54,9 @@ abstract public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapp
         }
     }
     
-    abstract protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException;
+    protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+        super.map(key, value, context);
+    }
     
     @Override
     final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
index 2987032..2b63ce0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -27,7 +27,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  */
-abstract public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class);
     
     protected void bindCurrentConfiguration(Configuration conf) {
@@ -53,7 +53,9 @@ abstract public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Red
         }
     }
     
-    abstract protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException;
+    protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+        super.reduce(key, values, context);
+    }
     
     @Override
     final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 6b0c86e..9486e60 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -21,11 +21,11 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 
 import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer.Context;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
 public class CuboidJob extends AbstractHadoopJob {
 
     protected static final Logger logger = LoggerFactory.getLogger(CuboidJob.class);
-    private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
 
     @SuppressWarnings("rawtypes")
     private Class<? extends Mapper> mapperClass;
@@ -165,7 +164,6 @@ public class CuboidJob extends AbstractHadoopJob {
     }
 
     protected void setReduceTaskNum(Job job, CubeDesc cubeDesc, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        Configuration jobConf = job.getConfiguration();
         KylinConfig kylinConfig = cubeDesc.getConfig();
 
         double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
@@ -200,12 +198,12 @@ public class CuboidJob extends AbstractHadoopJob {
         // no more than 500 reducer by default
         numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
 
-        jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks);
+        job.setNumReduceTasks(numReduceTasks);
 
         logger.info("Having total map input MB " + Math.round(totalMapInputMB));
         logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
         logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio);
-        logger.info("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks);
+        logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/f5e61980/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 013f2c9..d9558a4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer.Context;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
@@ -157,7 +158,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
         logger.info("Having total map input MB " + Math.round(totalSizeInM));
         logger.info("Having per reduce MB " + perReduceInputMB);
-        logger.info("Setting " + "mapred.reduce.tasks" + "=" + numReduceTasks);
+        logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks);
         return numReduceTasks;
     }