You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/09 08:11:34 UTC

kylin git commit: KYLIN-2170 refactor doMap/doReduce/doCleanup

Repository: kylin
Updated Branches:
  refs/heads/yang21 3b07c26a0 -> 1e24228ea


KYLIN-2170 refactor doMap/doReduce/doCleanup


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/1e24228e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/1e24228e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/1e24228e

Branch: refs/heads/yang21
Commit: 1e24228ea9e4a070923a854e6a50fbe561ccc378
Parents: 3b07c26
Author: Li Yang <li...@apache.org>
Authored: Wed Nov 9 16:11:24 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Wed Nov 9 16:11:24 2016 +0800

----------------------------------------------------------------------
 build/conf/kylin_job_conf_inmem.xml             |  2 +-
 .../org/apache/kylin/engine/mr/KylinMapper.java | 47 +++++++++++++-
 .../apache/kylin/engine/mr/KylinReducer.java    | 51 ++++++++++++++-
 .../engine/mr/steps/BaseCuboidMapperBase.java   |  2 +-
 .../kylin/engine/mr/steps/CuboidReducer.java    |  2 +-
 .../mr/steps/FactDistinctColumnPartitioner.java |  1 +
 .../mr/steps/FactDistinctColumnsCombiner.java   |  2 +-
 .../mr/steps/FactDistinctColumnsMapperBase.java |  2 +-
 .../mr/steps/FactDistinctColumnsReducer.java    | 42 ++++++------
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 38 +++++------
 .../engine/mr/steps/HiveToBaseCuboidMapper.java |  2 +-
 .../engine/mr/steps/InMemCuboidMapper.java      | 28 ++++----
 .../engine/mr/steps/InMemCuboidReducer.java     |  2 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |  2 +-
 .../kylin/engine/mr/steps/NDCuboidMapper.java   |  2 +-
 .../steps/RowKeyDistributionCheckerMapper.java  | 16 ++---
 .../steps/RowKeyDistributionCheckerReducer.java |  2 +-
 .../cardinality/ColumnCardinalityMapper.java    | 26 ++++----
 .../cardinality/ColumnCardinalityReducer.java   | 40 +++++-------
 .../storage/hbase/steps/CubeHFileMapper.java    |  2 +-
 .../hbase/steps/RangeKeyDistributionMapper.java | 14 ++--
 .../steps/RangeKeyDistributionReducer.java      | 68 +++++++++-----------
 .../storage/hbase/steps/RowValueDecoder.java    |  4 --
 23 files changed, 227 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/build/conf/kylin_job_conf_inmem.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin_job_conf_inmem.xml b/build/conf/kylin_job_conf_inmem.xml
index d6799d5..7e6dc08 100644
--- a/build/conf/kylin_job_conf_inmem.xml
+++ b/build/conf/kylin_job_conf_inmem.xml
@@ -94,7 +94,7 @@
 
     <property>
         <name>mapreduce.map.java.opts</name>
-        <value>-Xmx2700m</value>
+        <value>-Xmx2700m -XX:OnOutOfMemoryError="kill -9 %p"</value>
         <description></description>
     </property>
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
index 29c6844..a527b3d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.engine.mr;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.slf4j.Logger;
@@ -25,11 +27,54 @@ import org.slf4j.LoggerFactory;
 
 /**
  */
-public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+abstract public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
     private static final Logger logger = LoggerFactory.getLogger(KylinMapper.class);
 
     protected void bindCurrentConfiguration(Configuration conf) {
         logger.info("The conf for current mapper will be " + System.identityHashCode(conf));
         HadoopUtil.setCurrentConfiguration(conf);
     }
+
+    @Override
+    final public void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+        try {
+            doMap(key, value, context);
+        } catch (IOException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (InterruptedException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (RuntimeException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (Error ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        }
+    }
+    
+    abstract protected void doMap(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException;
+    
+    @Override
+    final protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+        try {
+            doCleanup(context);
+        } catch (IOException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (InterruptedException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (RuntimeException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (Error ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        }
+    }
+    
+    protected void doCleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
index 846c849..2987032 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -18,13 +18,62 @@
 
 package org.apache.kylin.engine.mr;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
-public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+abstract public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+    private static final Logger logger = LoggerFactory.getLogger(KylinReducer.class);
+    
     protected void bindCurrentConfiguration(Configuration conf) {
         HadoopUtil.setCurrentConfiguration(conf);
     }
+
+    @Override
+    final public void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+        try {
+            doReduce(key, values, context);
+        } catch (IOException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (InterruptedException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (RuntimeException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (Error ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        }
+    }
+    
+    abstract protected void doReduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException;
+    
+    @Override
+    final protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+        try {
+            doCleanup(context);
+        } catch (IOException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (InterruptedException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (RuntimeException ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        } catch (Error ex) { // KYLIN-2170
+            logger.error("", ex);
+            throw ex;
+        }
+    }
+
+    protected void doCleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 4f0d3fd..0649a0c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -55,7 +55,7 @@ import com.google.common.collect.Lists;
 
 /**
  */
-public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
+abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
     protected static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapperBase.class);
     public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
     public static final byte[] ONE = Bytes.toBytes("1");

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index d6e1d7e..9543f0a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -86,7 +86,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
     }
 
     @Override
-    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+    public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
         aggs.reset();
 
         for (Text value : values) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index a631cf4..7801563 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -26,6 +26,7 @@ import org.apache.kylin.common.util.BytesUtil;
 /**
  */
 public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> {
+    @SuppressWarnings("unused")
     private Configuration conf;
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
index 1821828..2dda047 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsCombiner.java
@@ -34,7 +34,7 @@ public class FactDistinctColumnsCombiner extends KylinReducer<Text, Text, Text,
     }
 
     @Override
-    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+    public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
         // for hll, each key only has one output, no need to do local combine;
         // for normal col, values are empty text

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 3fa966d..cb30b94 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -41,7 +41,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  */
-public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
+abstract public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
 
     protected String cubeName;
     protected CubeInstance cube;

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index a9c5d4b..6f0bf4c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -101,7 +101,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
     }
 
     @Override
-    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+    public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
         if (isStatistics == false) {
             colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
@@ -161,29 +161,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
     }
 
     @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-            if (isStatistics == false) {
-                if (!outputTouched || colValues.size() > 0) {
-                    outputDistinctValues(col, colValues, context);
-                    colValues.clear();
-                }
-            } else {
-                //output the hll info;
-                long grandTotal = 0;
-                for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
-                    grandTotal += hll.getCountEstimate();
-                }
-                double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
-                
-                int mapperNumber = baseCuboidRowCountInMappers.size();
-    
-                writeMapperAndCuboidStatistics(context); // for human check
-                CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), //
-                        cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        if (isStatistics == false) {
+            if (!outputTouched || colValues.size() > 0) {
+                outputDistinctValues(col, colValues, context);
+                colValues.clear();
             }
-        } catch (Throwable ex) {
-            logger.error("", ex);
+        } else {
+            //output the hll info;
+            long grandTotal = 0;
+            for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) {
+                grandTotal += hll.getCountEstimate();
+            }
+            double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+            
+            int mapperNumber = baseCuboidRowCountInMappers.size();
+
+            writeMapperAndCuboidStatistics(context); // for human check
+            CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), //
+                    cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 2154bc6..061fc80 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -106,7 +106,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     }
 
     @Override
-    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+    public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
         String[] row = flatTableInputFormat.parseMapperInput(record);
         try {
             for (int i = 0; i < factDictCols.size(); i++) {
@@ -157,27 +157,23 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     }
 
     @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-            if (collectStatistics) {
-                ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-                // output each cuboid's hll to reducer, key is 0 - cuboidId
-                HyperLogLogPlusCounter hll;
-                for (int i = 0; i < cuboidIds.length; i++) {
-                    hll = allCuboidsHLL[i];
-    
-                    keyBuffer.clear();
-                    keyBuffer.put(MARK_FOR_HLL); // one byte
-                    keyBuffer.putLong(cuboidIds[i]);
-                    outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
-                    hllBuf.clear();
-                    hll.writeRegisters(hllBuf);
-                    outputValue.set(hllBuf.array(), 0, hllBuf.position());
-                    context.write(outputKey, outputValue);
-                }
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        if (collectStatistics) {
+            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            HyperLogLogPlusCounter hll;
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = allCuboidsHLL[i];
+
+                keyBuffer.clear();
+                keyBuffer.put(MARK_FOR_HLL); // one byte
+                keyBuffer.putLong(cuboidIds[i]);
+                outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                context.write(outputKey, outputValue);
             }
-        } catch (Throwable ex) {
-            ex.printStackTrace();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index 83926cc..d9c5312 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -38,7 +38,7 @@ public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, O
     }
 
     @Override
-    public void map(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
+    public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
         counter++;
         if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
             logger.info("Handled " + counter + " records!");

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index dac93cb..15bfd2e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -113,7 +113,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
     }
 
     @Override
-    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+    public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
         // put each row to the queue
         String[] row = flatTableInputFormat.parseMapperInput(record);
         List<String> rowAsList = Arrays.asList(row);
@@ -130,25 +130,21 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
     }
 
     @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
         logger.info("Totally handled " + counter + " records!");
 
-        try {
-            while (!future.isDone()) {
-                if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) {
-                    break;
-                }
-            }
-    
-            try {
-                future.get();
-            } catch (Exception e) {
-                throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+        while (!future.isDone()) {
+            if (queue.offer(Collections.<String> emptyList(), 1, TimeUnit.SECONDS)) {
+                break;
             }
-            queue.clear();
-        } catch (Throwable ex) {
-            logger.error("", ex);
         }
+
+        try {
+            future.get();
+        } catch (Exception e) {
+            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
+        }
+        queue.clear();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index cfecf23..d0a7062 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -73,7 +73,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
     }
 
     @Override
-    public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
+    public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
 
         aggs.reset();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 55b8474..67c0f4c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -169,7 +169,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     }
 
     @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
         long cuboidID = rowKeySplitter.split(key.getBytes());
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
         RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index b566c2e..8107e52 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -120,7 +120,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     }
 
     @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
         long cuboidId = rowKeySplitter.split(key.getBytes());
         Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
index 21e97a3..fca91a6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
@@ -63,7 +63,7 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex
     }
 
     @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
         for (Text t : keyList) {
             if (key.compareTo(t) < 0) {
                 Long v = resultMap.get(t);
@@ -76,15 +76,11 @@ public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Tex
     }
 
     @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-            LongWritable outputValue = new LongWritable();
-            for (Entry<Text, Long> kv : resultMap.entrySet()) {
-                outputValue.set(kv.getValue());
-                context.write(kv.getKey(), outputValue);
-            }
-        } catch (Throwable ex) {
-            ex.printStackTrace();
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        LongWritable outputValue = new LongWritable();
+        for (Entry<Text, Long> kv : resultMap.entrySet()) {
+            outputValue.set(kv.getValue());
+            context.write(kv.getKey(), outputValue);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
index 332cba5..d203e8c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
@@ -38,7 +38,7 @@ public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWri
     }
 
     @Override
-    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+    public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
 
         long length = 0;
         for (LongWritable v : values) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index 8c624e3..06a07ca 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -66,7 +66,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
     }
 
     @Override
-    public void map(T key, Object value, Context context) throws IOException, InterruptedException {
+    public void doMap(T key, Object value, Context context) throws IOException, InterruptedException {
         ColumnDesc[] columns = tableDesc.getColumns();
         String[] values = tableInputFormat.parseMapperInput(value);
 
@@ -95,20 +95,16 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritab
     }
 
     @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-            Iterator<Integer> it = hllcMap.keySet().iterator();
-            ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-            while (it.hasNext()) {
-                int key = it.next();
-                HyperLogLogPlusCounter hllc = hllcMap.get(key);
-                buf.clear();
-                hllc.writeRegisters(buf);
-                buf.flip();
-                context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
-            }
-        } catch (Throwable ex) {
-            ex.printStackTrace();
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        Iterator<Integer> it = hllcMap.keySet().iterator();
+        ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+        while (it.hasNext()) {
+            int key = it.next();
+            HyperLogLogPlusCounter hllc = hllcMap.get(key);
+            buf.clear();
+            hllc.writeRegisters(buf);
+            buf.flip();
+            context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 2551af3..ea66999 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -49,7 +49,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
     }
 
     @Override
-    public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
+    public void doReduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
         int skey = key.get();
         for (BytesWritable v : values) {
             ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
@@ -68,28 +68,22 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
     }
 
     @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-            List<Integer> keys = new ArrayList<Integer>();
-            Iterator<Integer> it = hllcMap.keySet().iterator();
-            while (it.hasNext()) {
-                keys.add(it.next());
-            }
-            Collections.sort(keys);
-            it = keys.iterator();
-            while (it.hasNext()) {
-                int key = it.next();
-                HyperLogLogPlusCounter hllc = hllcMap.get(key);
-                ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-                buf.clear();
-                hllc.writeRegisters(buf);
-                buf.flip();
-                context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
-                // context.write(new Text("ErrorRate_" + key), new
-                // LongWritable((long)hllc.getErrorRate()));
-            }
-        } catch (Throwable ex) {
-            ex.printStackTrace();
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        List<Integer> keys = new ArrayList<Integer>();
+        Iterator<Integer> it = hllcMap.keySet().iterator();
+        while (it.hasNext()) {
+            keys.add(it.next());
+        }
+        Collections.sort(keys);
+        it = keys.iterator();
+        while (it.hasNext()) {
+            int key = it.next();
+            HyperLogLogPlusCounter hllc = hllcMap.get(key);
+            ByteBuffer buf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+            buf.clear();
+            hllc.writeRegisters(buf);
+            buf.flip();
+            context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
index 8205ff7..371a83b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapper.java
@@ -74,7 +74,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita
     }
 
     @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
         outputKey.set(key.getBytes(), 0, key.getLength());
         KeyValue outputValue;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
index c82d58d..c66ccb3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionMapper.java
@@ -44,7 +44,7 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo
     }
 
     @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
+    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
         lastKey = key;
 
         int bytesLength = key.getLength() + value.getLength();
@@ -61,14 +61,10 @@ public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, Lo
     }
 
     @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-            if (lastKey != null) {
-                outputValue.set(bytesRead);
-                context.write(lastKey, outputValue);
-            }
-        } catch (Throwable ex) {
-            ex.printStackTrace();
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        if (lastKey != null) {
+            outputValue.set(bytesRead);
+            context.write(lastKey, outputValue);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
index e9918d4..63433dd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RangeKeyDistributionReducer.java
@@ -84,7 +84,7 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
     }
 
     @Override
-    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
+    public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
         for (LongWritable v : values) {
             bytesRead += v.get();
         }
@@ -96,42 +96,38 @@ public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable
     }
 
     @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-            int nRegion = Math.round((float) gbPoints.size() / cut);
-            nRegion = Math.max(minRegionCount, nRegion);
-            nRegion = Math.min(maxRegionCount, nRegion);
-    
-            int gbPerRegion = gbPoints.size() / nRegion;
-            gbPerRegion = Math.max(1, gbPerRegion);
-    
-            if (hfileSizeGB <= 0) {
-                hfileSizeGB = gbPerRegion;
-            }
-            int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB);
-            hfilePerRegion = Math.max(1, hfilePerRegion);
-    
-            System.out.println(nRegion + " regions");
-            System.out.println(gbPerRegion + " GB per region");
-            System.out.println(hfilePerRegion + " hfile per region");
-    
-            Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
-            SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class);
-            int hfileCountInOneRegion = 0;
-            for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
-                hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
-                if (++hfileCountInOneRegion >= hfilePerRegion) {
-                    Text key = gbPoints.get(i);
-                    outputValue.set(i);
-                    System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
-                    context.write(key, outputValue);
-    
-                    hfileCountInOneRegion = 0;
-                }
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        int nRegion = Math.round((float) gbPoints.size() / cut);
+        nRegion = Math.max(minRegionCount, nRegion);
+        nRegion = Math.min(maxRegionCount, nRegion);
+
+        int gbPerRegion = gbPoints.size() / nRegion;
+        gbPerRegion = Math.max(1, gbPerRegion);
+
+        if (hfileSizeGB <= 0) {
+            hfileSizeGB = gbPerRegion;
+        }
+        int hfilePerRegion = (int) (gbPerRegion / hfileSizeGB);
+        hfilePerRegion = Math.max(1, hfilePerRegion);
+
+        System.out.println(nRegion + " regions");
+        System.out.println(gbPerRegion + " GB per region");
+        System.out.println(hfilePerRegion + " hfile per region");
+
+        Path hfilePartitionFile = new Path(output + "/part-r-00000_hfile");
+        SequenceFile.Writer hfilePartitionWriter = new SequenceFile.Writer(hfilePartitionFile.getFileSystem(context.getConfiguration()), context.getConfiguration(), hfilePartitionFile, ImmutableBytesWritable.class, NullWritable.class);
+        int hfileCountInOneRegion = 0;
+        for (int i = hfileSizeGB; i < gbPoints.size(); i += hfileSizeGB) {
+            hfilePartitionWriter.append(new ImmutableBytesWritable(gbPoints.get(i).getBytes()), NullWritable.get());
+            if (++hfileCountInOneRegion >= hfilePerRegion) {
+                Text key = gbPoints.get(i);
+                outputValue.set(i);
+                System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
+                context.write(key, outputValue);
+
+                hfileCountInOneRegion = 0;
             }
-            hfilePartitionWriter.close();
-        } catch (Throwable ex) {
-            logger.error("", ex);
         }
+        hfilePartitionWriter.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/1e24228e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 86104e2..b02183a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -31,15 +31,11 @@ import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.storage.hbase.util.Results;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  */
 public class RowValueDecoder implements Cloneable {
 
-    private static final Logger logger = LoggerFactory.getLogger(RowValueDecoder.class);
-
     private final HBaseColumnDesc hbaseColumn;
     private final byte[] hbaseColumnFamily;
     private final byte[] hbaseColumnQualifier;