You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/04 04:43:46 UTC

[17/47] kylin git commit: Revert "KYLIN-2242 write multiple files in FactDistinctColumnsReducer with MultipleOutputs"

Revert "KYLIN-2242 write multiple files in FactDistinctColumnsReducer with MultipleOutputs"

This reverts commit 7de8aa1203a72bad105ed692f7100535939b03af.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7576a09f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7576a09f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7576a09f

Branch: refs/heads/KYLIN-2361
Commit: 7576a09f29b75f2eb80a1b5ca9c65fe5c81b1144
Parents: 7de8aa1
Author: shaofengshi <sh...@apache.org>
Authored: Sun Jan 22 11:38:17 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sun Jan 22 11:38:17 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/HadoopUtil.java    |  16 --
 .../kylin/engine/mr/JobBuilderSupport.java      |   2 +-
 .../kylin/engine/mr/common/BatchConstants.java  |   9 +-
 .../engine/mr/steps/CreateDictionaryJob.java    |  43 ++--
 .../engine/mr/steps/FactDistinctColumnsJob.java |  32 +--
 .../mr/steps/FactDistinctColumnsReducer.java    | 240 ++++++++++++-------
 .../engine/mr/steps/SaveStatisticsStep.java     |  10 +-
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  |  10 +-
 8 files changed, 187 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index b9ffe38..bdc4c3e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -26,10 +26,8 @@ import java.net.URISyntaxException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
 import org.apache.kylin.common.KylinConfig;
 import org.slf4j.Logger;
@@ -142,18 +140,4 @@ public class HadoopUtil {
         }
     }
 
-    public static Path getFilterOnlyPath(FileSystem fs, Path baseDir, final String filter) throws IOException {
-        FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() {
-            @Override
-            public boolean accept(Path path) {
-                return path.getName().startsWith(filter);
-            }
-        });
-
-        if (fileStatus.length == 1) {
-            return fileStatus[0].getPath();
-        } else {
-            return null;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index c34a904..696b22a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -171,7 +171,7 @@ public class JobBuilderSupport {
     }
 
     public String getStatisticsPath(String jobId) {
-        return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+        return getRealizationRootPath(jobId) + "/statistics";
     }
 
     // ============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 602b4bb..0281539 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -53,16 +53,9 @@ public interface BatchConstants {
     String CFG_STATISTICS_ENABLED = "statistics.enabled";
     String CFG_STATISTICS_OUTPUT = "statistics.ouput";//spell error, for compatibility issue better not change it
     String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
+    String CFG_STATISTICS_CUBE_ESTIMATION_FILENAME = "cube_statistics.txt";
     String CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME = "cuboid_statistics.seq";
 
-    String CFG_MAPRED_OUTPUT_COMPRESS = "mapred.output.compress";
-
-    String CFG_OUTPUT_COLUMN = "column";
-    String CFG_OUTPUT_DICT = "dict";
-    String CFG_OUTPUT_STATISTICS = "statistics";
-    String CFG_OUTPUT_PARTITION = "partition";
-
-
     /**
      * command line ARGuments
      */

http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index e5d053b..95d8cb1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -18,20 +18,15 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ByteBufferBackedInputStream;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.HadoopUtil;
@@ -68,27 +63,21 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
 
             @Override
             public Dictionary<String> getDictionary(TblColRef col) throws IOException {
-                Path colDir = new Path(factColumnsInputPath, col.getName());
-                FileSystem fs = HadoopUtil.getFileSystem(colDir.toString());
-
-                Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
-                if (dictFile == null) {
+                Path dictFile = new Path(factColumnsInputPath, col.getIdentity() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
+                FileSystem fs = HadoopUtil.getWorkingFileSystem();
+                if (fs.exists(dictFile) == false)
                     return null;
-                }
-
-                try (SequenceFile.Reader reader = new SequenceFile.Reader(HadoopUtil.getCurrentConfiguration(), SequenceFile.Reader.file(dictFile))) {
-                    NullWritable key = NullWritable.get();
-                    BytesWritable value = new BytesWritable();
-                    reader.next(key, value);
-
-                    ByteBuffer buffer = new ByteArray(value.getBytes()).asBuffer();
-                    try (DataInputStream is = new DataInputStream(new ByteBufferBackedInputStream(buffer))) {
-                        String dictClassName = is.readUTF();
-                        Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
-                        dict.readFields(is);
-                        logger.info("DictionaryProvider read dict from file: " + dictFile);
-                        return dict;
-                    }
+                
+                FSDataInputStream is = null;
+                try {
+                    is = fs.open(dictFile);
+                    String dictClassName = is.readUTF();
+                    Dictionary<String> dict = (Dictionary<String>) ClassUtil.newInstance(dictClassName);
+                    dict.readFields(is);
+                    logger.info("DictionaryProvider read dict from file: " + dictFile);
+                    return dict;
+                } finally {
+                    IOUtils.closeQuietly(is);
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index aded600..ce01eb6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -23,16 +23,11 @@ import java.util.List;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
@@ -87,8 +82,8 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             int uhcReducerCount = cube.getConfig().getUHCReducerCount();
 
             int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
-            for (int index : uhcIndex) {
-                if (index == 1) {
+            for(int index : uhcIndex) {
+                if(index == 1) {
                     reducerCount += uhcReducerCount - 1;
                 }
             }
@@ -97,6 +92,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                 throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.engine.mr.uhc-reducer-count'");
             }
 
+
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
@@ -121,12 +117,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
 
             attachCubeMetadata(cube, job.getConfiguration());
 
-            /**
-             * don't compress the reducer output so that {@link CreateDictionaryJob} and {@link UpdateCubeInfoAfterBuildStep}
-             * could read the reducer file directly
-             */
-            job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false");
-
             return waitForCompletion(job);
 
         } finally {
@@ -148,22 +138,18 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
 
     private void setupReducer(Path output, int numberOfReducers) throws IOException {
         job.setReducerClass(FactDistinctColumnsReducer.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
         job.setPartitionerClass(FactDistinctColumnPartitioner.class);
         job.setNumReduceTasks(numberOfReducers);
 
-        //make each reducer output to respective dir
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
-        MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class);
-
-
+        // important, reducer writes HDFS directly at the moment
+        job.setReduceSpeculativeExecution(false);
+        
         FileOutputFormat.setOutputPath(job, output);
         job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
 
-        //prevent to create zero-sized default output
-        LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
-
         deletePath(job.getConfiguration(), output);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 5d2fb72..711d991 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -18,25 +18,27 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -45,7 +47,7 @@ import org.apache.kylin.dict.IDictionaryBuilder;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -61,12 +63,14 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
     private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
 
     private List<TblColRef> columnList;
+    private String statisticsOutput = null;
     private List<Long> baseCuboidRowCountInMappers;
     protected Map<Long, HLLCounter> cuboidHLLMap = null;
     protected long baseCuboidId;
     protected CubeDesc cubeDesc;
     private long totalRowsBeforeMerge = 0;
     private int samplingPercentage;
+    private List<ByteArray> colValues;
     private TblColRef col = null;
     private boolean isStatistics = false;
     private KylinConfig cubeConfig;
@@ -84,14 +88,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
     public static final String DICT_FILE_POSTFIX = ".rldict";
     public static final String PARTITION_COL_INFO_FILE_POSTFIX = ".pci";
 
-    private MultipleOutputs mos;
-
     @Override
     protected void setup(Context context) throws IOException {
         super.bindCurrentConfiguration(context.getConfiguration());
         Configuration conf = context.getConfiguration();
-        mos = new MultipleOutputs(context);
-
         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
@@ -109,20 +109,26 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         if (collectStatistics && (taskId == numberOfTasks - 1)) {
             // hll
             isStatistics = true;
+            statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
             baseCuboidRowCountInMappers = Lists.newArrayList();
             cuboidHLLMap = Maps.newHashMap();
             samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
             logger.info("Reducer " + taskId + " handling stats");
         } else if (collectStatistics && (taskId == numberOfTasks - 2)) {
             // partition col
+            isStatistics = false;
             isPartitionCol = true;
             col = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
             if (col == null) {
                 logger.info("Do not have partition col. This reducer will keep empty");
             }
+            colValues = Lists.newLinkedList();
+            logger.info("Reducer " + taskId + " handling partition column " + col);
         } else {
             // normal col
+            isStatistics = false;
             col = columnList.get(reducerIdToColumnIndex.get(taskId));
+            colValues = Lists.newLinkedList();
 
             // local build dict
             isReducerLocalBuildDict = config.isReducerLocalBuildDict();
@@ -188,13 +194,15 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
                 logAFewRows(value);
                 builder.addValue(value);
             } else {
-                byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
-                // output written to baseDir/colName/-r-00000 (etc)
-                String fileName = col.getName() + "/";
-                mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
+                colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
+                if (colValues.size() == 1000000) { //spill every 1 million
+                    logger.info("spill values to disk...");
+                    outputDistinctValues(col, colValues, context);
+                    colValues.clear();
+                }
             }
         }
-
+        
         rowCount++;
     }
 
@@ -204,104 +212,162 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
         }
     }
 
-    @Override
-    protected void doCleanup(Context context) throws IOException, InterruptedException {
-        if (isStatistics) {
-            //output the hll info;
-            List<Long> allCuboids = Lists.newArrayList();
-            allCuboids.addAll(cuboidHLLMap.keySet());
-            Collections.sort(allCuboids);
+    private void outputDistinctValues(TblColRef col, Collection<ByteArray> values, Context context) throws IOException {
+        final Configuration conf = context.getConfiguration();
+        final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
+        final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
+        final Path colDir = new Path(outputPath, col.getIdentity());
+        final String fileName = col.getIdentity() + "-" + taskId % uhcReducerCount;
+        final Path outputFile = new Path(colDir, fileName);
+
+        FSDataOutputStream out = null;
+        try {
+            if (!fs.exists(colDir)) {
+                fs.mkdirs(colDir);
+            }
 
-            logMapperAndCuboidStatistics(allCuboids); // for human check
-            outputStatistics(allCuboids);
-        } else if (isPartitionCol) {
-            // partition col
-            outputPartitionInfo();
-        } else {
-            // normal col
-            if (isReducerLocalBuildDict) {
-                Dictionary<String> dict = builder.build();
-                outputDict(col, dict);
+            if (fs.exists(outputFile)) {
+                out = fs.append(outputFile);
+                logger.info("append file " + outputFile);
+            } else {
+                out = fs.create(outputFile);
+                logger.info("create file " + outputFile);
             }
-        }
 
-        mos.close();
+            for (ByteArray value : values) {
+                out.write(value.array(), value.offset(), value.length());
+                out.write('\n');
+            }
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
     }
 
-    private void outputPartitionInfo() throws IOException, InterruptedException {
-        if (col != null) {
-            // output written to baseDir/colName/colName.pci-r-00000 (etc)
-            String partitionFileName = col.getName() + "/" + col.getName() + PARTITION_COL_INFO_FILE_POSTFIX;
+    private void outputDict(TblColRef col, Dictionary<String> dict, Context context) throws IOException {
+        final String fileName = col.getIdentity() + DICT_FILE_POSTFIX;
+        FSDataOutputStream out = getOutputStream(context, fileName);
+        try {
+            String dictClassName = dict.getClass().getName();
+            out.writeUTF(dictClassName);
+            dict.write(out);
+            logger.info("reducer id is:+" + taskId + " colName:" + col.getName() + "  writing dict at file : " + fileName + "  dict class:" + dictClassName);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
 
-            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMinValue), partitionFileName);
-            mos.write(BatchConstants.CFG_OUTPUT_PARTITION, NullWritable.get(), new LongWritable(timeMaxValue), partitionFileName);
+    private void outputPartitionInfo(Context context) throws IOException {
+        final String fileName = col.getIdentity() + PARTITION_COL_INFO_FILE_POSTFIX;
+        FSDataOutputStream out = getOutputStream(context, fileName);
+        try {
+            out.writeLong(timeMinValue);
+            out.writeLong(timeMaxValue);
             logger.info("write partition info for col : " + col.getName() + "  minValue:" + timeMinValue + " maxValue:" + timeMaxValue);
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    private FSDataOutputStream getOutputStream(Context context, String outputFileName) throws IOException {
+        final Configuration conf = context.getConfiguration();
+        final FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
+        final Path outputPath = new Path(conf.get(BatchConstants.CFG_OUTPUT_PATH));
+        final Path outputFile = new Path(outputPath, outputFileName);
+        if (!fs.exists(outputPath)) {
+            fs.mkdirs(outputPath);
         }
+        FSDataOutputStream out = fs.create(outputFile);
+        return out;
     }
 
-    private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
-        // output written to baseDir/colName/colName.rldict-r-00000 (etc)
-        String dictFileName = col.getName() + "/" + col.getName() + DICT_FILE_POSTFIX;
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        if (isStatistics) {
+            // output the hll info
+            long grandTotal = 0;
+            for (HLLCounter hll : cuboidHLLMap.values()) {
+                grandTotal += hll.getCountEstimate();
+            }
+            double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
 
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) {
-            outputStream.writeUTF(dict.getClass().getName());
-            dict.write(outputStream);
+            int mapperNumber = baseCuboidRowCountInMappers.size();
 
-            mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName);
+            writeMapperAndCuboidStatistics(context); // for human check
+            CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), //
+                    cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio);
+        } else if (isPartitionCol) {
+            // partition col
+            if (col != null) {
+                outputPartitionInfo(context);
+            }
+        } else {
+            // normal col
+            if (isReducerLocalBuildDict) {
+                Dictionary<String> dict = builder.build();
+                outputDict(col, dict, context);
+            } else {
+                if (colValues.size() > 0) {
+                    outputDistinctValues(col, colValues, context);
+                    colValues.clear();
+                }
+            }
         }
     }
 
-    private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
-        // output written to baseDir/statistics/statistics-r-00000 (etc)
-        String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
+    private void writeMapperAndCuboidStatistics(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
+        Path path = new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION_FILENAME);
+        FSDataOutputStream out = fs.create(path);
 
-        ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+        try {
+            String msg;
 
-        // mapper overlap ratio at key -1
-        long grandTotal = 0;
-        for (HLLCounter hll : cuboidHLLMap.values()) {
-            grandTotal += hll.getCountEstimate();
-        }
-        double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
-        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
+            List<Long> allCuboids = Lists.newArrayList();
+            allCuboids.addAll(cuboidHLLMap.keySet());
+            Collections.sort(allCuboids);
 
-        // mapper number at key -2
-        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
+            msg = "Total cuboid number: \t" + allCuboids.size();
+            writeLine(out, msg);
+            msg = "Samping percentage: \t" + samplingPercentage;
+            writeLine(out, msg);
+
+            writeLine(out, "The following statistics are collected based on sampling data.");
+            writeLine(out, "Number of Mappers: " + baseCuboidRowCountInMappers.size());
+            for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
+                if (baseCuboidRowCountInMappers.get(i) > 0) {
+                    msg = "Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i);
+                    writeLine(out, msg);
+                }
+            }
 
-        // sampling percentage at key 0
-        mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
+            long grantTotal = 0;
+            for (long i : allCuboids) {
+                grantTotal += cuboidHLLMap.get(i).getCountEstimate();
+                msg = "Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate();
+                writeLine(out, msg);
+            }
 
-        for (long i : allCuboids) {
-            valueBuf.clear();
-            cuboidHLLMap.get(i).writeRegisters(valueBuf);
-            valueBuf.flip();
-            mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
-        }
-    }
+            msg = "Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge;
+            writeLine(out, msg);
 
-    private void logMapperAndCuboidStatistics(List<Long> allCuboids) throws IOException {
-        logger.info("Total cuboid number: \t" + allCuboids.size());
-        logger.info("Samping percentage: \t" + samplingPercentage);
-        logger.info("The following statistics are collected based on sampling data.");
-        logger.info("Number of Mappers: " + baseCuboidRowCountInMappers.size());
+            msg = "After merge, the cube has row count: \t " + grantTotal;
+            writeLine(out, msg);
 
-        for (int i = 0; i < baseCuboidRowCountInMappers.size(); i++) {
-            if (baseCuboidRowCountInMappers.get(i) > 0) {
-                logger.info("Base Cuboid in Mapper " + i + " row count: \t " + baseCuboidRowCountInMappers.get(i));
+            if (grantTotal > 0) {
+                msg = "The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal;
+                writeLine(out, msg);
             }
-        }
 
-        long grantTotal = 0;
-        for (long i : allCuboids) {
-            grantTotal += cuboidHLLMap.get(i).getCountEstimate();
-            logger.info("Cuboid " + i + " row count is: \t " + cuboidHLLMap.get(i).getCountEstimate());
+        } finally {
+            IOUtils.closeQuietly(out);
         }
+    }
+
+    private void writeLine(FSDataOutputStream out, String msg) throws IOException {
+        out.write(msg.getBytes());
+        out.write('\n');
 
-        logger.info("Sum of all the cube segments (before merge) is: \t " + totalRowsBeforeMerge);
-        logger.info("After merge, the cube has row count: \t " + grantTotal);
-        if (grantTotal > 0) {
-            logger.info("The mapper overlap ratio is: \t" + totalRowsBeforeMerge / grantTotal);
-        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index 28f99fb..2671042 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -60,11 +60,9 @@ public class SaveStatisticsStep extends AbstractExecutable {
         ResourceStore rs = ResourceStore.getStore(kylinConf);
         try {
             FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            Path statisticsDir = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()));
-            Path statisticsFilePath = HadoopUtil.getFilterOnlyPath(fs, statisticsDir, BatchConstants.CFG_OUTPUT_STATISTICS);
-            if (statisticsFilePath == null) {
-                throw new IOException("fail to find the statistics file in base dir: " + statisticsDir);
-            }
+            Path statisticsFilePath = new Path(CubingExecutableUtil.getStatisticsPath(this.getParams()), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            if (!fs.exists(statisticsFilePath))
+                throw new IOException("File " + statisticsFilePath + " does not exists");
 
             FSDataInputStream is = fs.open(statisticsFilePath);
             try {
@@ -112,7 +110,7 @@ public class SaveStatisticsStep extends AbstractExecutable {
                 double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold();
                 logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
                 logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold);
-
+ 
                 // in-mem cubing is good when
                 // 1) the cluster has enough mapper slots to run in parallel
                 // 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage

http://git-wip-us.apache.org/repos/asf/kylin/blob/7576a09f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index 81d5c42..dc80399 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -80,13 +80,8 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
         final String factColumnsInputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        Path colDir = new Path(factColumnsInputPath, partitionCol.getName());
-        FileSystem fs = HadoopUtil.getFileSystem(colDir.toString());
-        Path outputFile = HadoopUtil.getFilterOnlyPath(fs, colDir, partitionCol.getName() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
-        if (outputFile == null) {
-            throw new IOException("fail to find the partition file in base dir: " + colDir);
-        }
-
+        Path outputFile = new Path(factColumnsInputPath, partitionCol.getIdentity() + FactDistinctColumnsReducer.PARTITION_COL_INFO_FILE_POSTFIX);
+        FileSystem fs = HadoopUtil.getFileSystem(outputFile.toString());
         FSDataInputStream is = null;
         long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;
         try {
@@ -102,4 +97,5 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         segment.setDateRangeStart(minValue);
         segment.setDateRangeEnd(maxValue);
     }
+
 }