You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/08 13:33:45 UTC

[1/4] kylin git commit: KYLIN-2167 FactDistinctColumnsReducer may get wrong max/min partition col value

Repository: kylin
Updated Branches:
  refs/heads/master 47de9611b -> e7a20a063


KYLIN-2167 FactDistinctColumnsReducer may get wrong max/min partition col value


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e7a20a06
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e7a20a06
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e7a20a06

Branch: refs/heads/master
Commit: e7a20a063a3f007aa2a0ed5f39c616880ba46118
Parents: 4d9a923
Author: shaofengshi <sh...@apache.org>
Authored: Tue Nov 8 14:23:11 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Nov 8 21:29:22 2016 +0800

----------------------------------------------------------------------
 .../mr/steps/FactDistinctColumnsReducer.java    |  8 ----
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  | 40 +++++++++-----------
 2 files changed, 18 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e7a20a06/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 5b00381..b09e614 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -152,14 +152,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
                     cuboidHLLMap.put(cuboidId, hll);
                 }
             }
-        } else if (isPartitionCol == true) {
-            // for partition col min/max value
-            ByteArray value = new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1));
-            if (colValues.size() > 1) {
-                colValues.set(1, value);
-            } else {
-                colValues.add(value);
-            }
         } else {
             colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1)));
             if (colValues.size() == 1000000) { //spill every 1 million

http://git-wip-us.apache.org/repos/asf/kylin/blob/e7a20a06/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index d285799..4d71f5d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+import java.text.ParseException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -82,23 +83,8 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
 
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-        final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1);
-        final ReadableTable.TableReader tableReader = readableTable.getReader();
-        String minValue = null, maxValue = null;
-        try {
-            while (tableReader.next()) {
-                if (minValue == null) {
-                    minValue = tableReader.getRow()[0];
-                }
-                maxValue = tableReader.getRow()[0];
-            }
-        } finally {
-            IOUtils.closeQuietly(tableReader);
-        }
-
         final DataType partitionColType = partitionCol.getType();
-        FastDateFormat dateFormat;
+        final FastDateFormat dateFormat;
         if (partitionColType.isDate()) {
             dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN);
         } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) {
@@ -113,14 +99,24 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
             throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type");
         }
 
+        final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+        final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1);
+        final ReadableTable.TableReader tableReader = readableTable.getReader();
+        long minValue = Long.MAX_VALUE, maxValue = Long.MIN_VALUE;
         try {
-            long startTime = dateFormat.parse(minValue).getTime();
-            long endTime = dateFormat.parse(maxValue).getTime();
-            segment.setDateRangeStart(startTime);
-            segment.setDateRangeEnd(endTime);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
+            while (tableReader.next()) {
+                long time = dateFormat.parse(tableReader.getRow()[0]).getTime();
+                minValue = Math.min(minValue, time);
+                maxValue = Math.max(maxValue, time);
+            }
+        } catch (ParseException e) {
+            throw new IOException(e);
+        } finally {
+            IOUtils.closeQuietly(tableReader);
         }
+
+        segment.setDateRangeStart(minValue);
+        segment.setDateRangeEnd(maxValue);
     }
 
 }


[2/4] kylin git commit: KYLIN-2135 minor format update

Posted by sh...@apache.org.
KYLIN-2135 minor format update

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dd496a69
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dd496a69
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dd496a69

Branch: refs/heads/master
Commit: dd496a6945ba20192c2fa2bda845c055357ab44a
Parents: 7421403
Author: shaofengshi <sh...@apache.org>
Authored: Thu Nov 3 18:49:50 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Nov 8 21:29:22 2016 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/DFSFileTableReader.java     | 92 ++++++++++----------
 .../engine/mr/steps/FactDistinctColumnsJob.java | 34 ++++----
 2 files changed, 61 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/dd496a69/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index dda1d6f..173c908 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -23,14 +23,15 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -57,7 +58,7 @@ public class DFSFileTableReader implements TableReader {
 
     private String filePath;
     private String delim;
-    private List<RowReader> readerList;
+    private List<RowReader> readerList;
 
     private String curLine;
     private String[] curColumns;
@@ -72,33 +73,33 @@ public class DFSFileTableReader implements TableReader {
         this.filePath = filePath;
         this.delim = delim;
         this.expectedColumnNumber = expectedColumnNumber;
-        this.readerList = new ArrayList<RowReader>();
+        this.readerList = new ArrayList<RowReader>();
 
         FileSystem fs = HadoopUtil.getFileSystem(filePath);
 
-        ArrayList<FileStatus> allFiles = new ArrayList<>();
-        FileStatus status = fs.getFileStatus(new Path(filePath));
-        if (status.isFile()) {
-            allFiles.add(status);
-        } else {
-            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
-            allFiles.addAll(Arrays.asList(listStatus));
-        }
-
-        try {
-            for (FileStatus f : allFiles) {
-                RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
-                this.readerList.add(rowReader);
-            }
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(filePath));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+
+        try {
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         } catch (IOException e) {
             if (isExceptionSayingNotSeqFile(e) == false)
                 throw e;
 
-            this.readerList = new ArrayList<RowReader>();
-            for (FileStatus f : allFiles) {
-                RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
-                this.readerList.add(rowReader);
-            }
+            this.readerList = new ArrayList<RowReader>();
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         }
     }
 
@@ -114,20 +115,20 @@ public class DFSFileTableReader implements TableReader {
 
     @Override
     public boolean next() throws IOException {
-        int curReaderIndex = -1;
-        RowReader curReader;
-
-        while (++curReaderIndex < readerList.size()) {
-            curReader = readerList.get(curReaderIndex);
-            curLine = curReader.nextLine();
-            curColumns = null;
-
-            if (curLine != null) {
-                return true;
-            }
-        }
-
-        return false;
+        int curReaderIndex = -1;
+        RowReader curReader;
+
+        while (++curReaderIndex < readerList.size()) {
+            curReader = readerList.get(curReaderIndex);
+            curLine = curReader.nextLine();
+            curColumns = null;
+
+            if (curLine != null) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     public String getLine() {
@@ -176,15 +177,10 @@ public class DFSFileTableReader implements TableReader {
     }
 
     @Override
-    public void close() {
-        for (RowReader reader : readerList) {
-            try {
-                if (reader != null)
-                    reader.close();
-            } catch (IOException e) {
-                logger.warn("close file failed:", e);
-            }
-        }
+    public void close() {
+        for (RowReader reader : readerList) {
+            IOUtils.closeQuietly(reader);
+        }
     }
 
     private String autoDetectDelim(String line) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/dd496a69/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 92da7d1..551ce33 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -78,27 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             CubeInstance cube = cubeMgr.getCube(cubeName);
             List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
 
-            int reducerCount = columnsNeedDict.size();
-            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
-
-            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
-            for(int index : uhcIndex) {
-                if(index == 1) {
-                    reducerCount += uhcReducerCount - 1;
-                }
-            }
-
-            if (reducerCount > 255) {
-                throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' ");
-            }
-
-
+            int reducerCount = columnsNeedDict.size();
+            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
+
+            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
+            for(int index : uhcIndex) {
+                if(index == 1) {
+                    reducerCount += uhcReducerCount - 1;
+                }
+            }
+
+            if (reducerCount > 255) {
+                throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.job.uhc.reducer.count'");
+            }
+
+
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
-
+
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
@@ -114,7 +114,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                 logger.info("Found segment: " + segment);
             }
             setupMapper(cube.getSegmentById(segmentID));
-            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
+            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
 
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 


[3/4] kylin git commit: KYLIN-2135 update UpdateCubeInfoAfterBuildStep for the new folder structure

Posted by sh...@apache.org.
KYLIN-2135 update UpdateCubeInfoAfterBuildStep for the new folder structure


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4d9a9231
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4d9a9231
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4d9a9231

Branch: refs/heads/master
Commit: 4d9a92319ae6f0f778328f06d153cc6a7c9c93a8
Parents: dd496a6
Author: shaofengshi <sh...@apache.org>
Authored: Tue Nov 8 13:54:35 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Nov 8 21:29:22 2016 +0800

----------------------------------------------------------------------
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java  | 35 +++++++-------------
 1 file changed, 12 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4d9a9231/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index f7af42e..d285799 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -18,22 +18,17 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.FastDateFormat;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.DFSFileTable;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -41,6 +36,7 @@ import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,26 +82,19 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
 
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
-        final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path outputFile = new Path(outputPath, partitionCol.getName());
-
-        String minValue = null, maxValue = null, currentValue = null;
-        FSDataInputStream inputStream = null;
-        BufferedReader bufferedReader = null;
+        final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH);
+        final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1);
+        final ReadableTable.TableReader tableReader = readableTable.getReader();
+        String minValue = null, maxValue = null;
         try {
-            FileSystem fs = HadoopUtil.getFileSystem(outputPath);
-            inputStream = fs.open(outputFile);
-            bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
-            minValue = currentValue = bufferedReader.readLine();
-            while (currentValue != null) {
-                maxValue = currentValue;
-                currentValue = bufferedReader.readLine();
+            while (tableReader.next()) {
+                if (minValue == null) {
+                    minValue = tableReader.getRow()[0];
+                }
+                maxValue = tableReader.getRow()[0];
             }
-        } catch (IOException e) {
-            throw e;
         } finally {
-            IOUtils.closeQuietly(bufferedReader);
-            IOUtils.closeQuietly(inputStream);
+            IOUtils.closeQuietly(tableReader);
         }
 
         final DataType partitionColType = partitionCol.getType();


[4/4] kylin git commit: KYLIN-2135 Enlarge FactDistinctColumns reducer number

Posted by sh...@apache.org.
KYLIN-2135 Enlarge FactDistinctColumns reducer number

Signed-off-by: shaofengshi <sh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/74214030
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/74214030
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/74214030

Branch: refs/heads/master
Commit: 74214030272ffef275ccf0359b583b3278aec468
Parents: 47de961
Author: kangkaisen <ka...@live.com>
Authored: Wed Oct 26 19:35:20 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Nov 8 21:29:22 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  5 ++
 .../java/org/apache/kylin/cube/CubeManager.java | 35 ++++++++++++
 .../kylin/engine/mr/DFSFileTableReader.java     | 59 ++++++++++++++++----
 .../kylin/engine/mr/common/BatchConstants.java  |  5 ++
 .../mr/steps/FactDistinctColumnPartitioner.java | 11 +---
 .../engine/mr/steps/FactDistinctColumnsJob.java | 18 +++++-
 .../mr/steps/FactDistinctColumnsMapperBase.java | 17 +++++-
 .../mr/steps/FactDistinctColumnsReducer.java    | 34 ++++++++++-
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 12 +++-
 9 files changed, 170 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d9d10bb..6d3e807 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -374,6 +374,11 @@ abstract public class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true"));
     }
 
+    //UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
+    public int getUHCReducerCount() {
+        return Integer.parseInt(getOptional("kylin.job.uhc.reducer.count", "3"));
+    }
+
     public String getOverrideHiveTableLocation(String table) {
         return getOptional("hive.table.location." + table.toUpperCase());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 87bb93d..9893040 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -43,6 +44,7 @@ import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DictionaryDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
@@ -1049,4 +1051,37 @@ public class CubeManager implements IRealizationProvider {
         }
         return holes;
     }
+
+    private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+    //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+    public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+        List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc);
+        int[] uhcIndex = new int[factDictCols.size()];
+
+        //add GlobalDictionaryColumns
+        List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
+        if (dictionaryDescList != null) {
+            for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+                if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
+                    for (int i = 0; i < factDictCols.size(); i++) {
+                        if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
+                            uhcIndex[i] = 1;
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        //add ShardByColumns
+        Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
+        for (int i = 0; i < factDictCols.size(); i++) {
+            if (shardByColumns.contains(factDictCols.get(i))) {
+                uhcIndex[i] = 1;
+            }
+        }
+
+        return uhcIndex;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
index 300b123..dda1d6f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -23,10 +23,14 @@ import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -53,7 +57,7 @@ public class DFSFileTableReader implements TableReader {
 
     private String filePath;
     private String delim;
-    private RowReader reader;
+    private List<RowReader> readerList;
 
     private String curLine;
     private String[] curColumns;
@@ -68,17 +72,33 @@ public class DFSFileTableReader implements TableReader {
         this.filePath = filePath;
         this.delim = delim;
         this.expectedColumnNumber = expectedColumnNumber;
+        this.readerList = new ArrayList<RowReader>();
 
         FileSystem fs = HadoopUtil.getFileSystem(filePath);
 
-        try {
-            this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
-
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(filePath));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(filePath));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+
+        try {
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         } catch (IOException e) {
             if (isExceptionSayingNotSeqFile(e) == false)
                 throw e;
 
-            this.reader = new CsvRowReader(fs, filePath);
+            this.readerList = new ArrayList<RowReader>();
+            for (FileStatus f : allFiles) {
+                RowReader rowReader = new CsvRowReader(fs, f.getPath().toString());
+                this.readerList.add(rowReader);
+            }
         }
     }
 
@@ -94,9 +114,20 @@ public class DFSFileTableReader implements TableReader {
 
     @Override
     public boolean next() throws IOException {
-        curLine = reader.nextLine();
-        curColumns = null;
-        return curLine != null;
+        int curReaderIndex = -1;
+        RowReader curReader;
+
+        while (++curReaderIndex < readerList.size()) {
+            curReader = readerList.get(curReaderIndex);
+            curLine = curReader.nextLine();
+            curColumns = null;
+
+            if (curLine != null) {
+                return true;
+            }
+        }
+
+        return false;
     }
 
     public String getLine() {
@@ -145,9 +176,15 @@ public class DFSFileTableReader implements TableReader {
     }
 
     @Override
-    public void close() throws IOException {
-        if (reader != null)
-            reader.close();
+    public void close() {
+        for (RowReader reader : readerList) {
+            try {
+                if (reader != null)
+                    reader.close();
+            } catch (IOException e) {
+                logger.warn("close file failed:", e);
+            }
+        }
     }
 
     private String autoDetectDelim(String line) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index e4a8808..078d80f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -81,4 +81,9 @@ public interface BatchConstants {
     String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
     int NORMAL_RECORD_LOG_THRESHOLD = 100000;
     int ERROR_RECORD_LOG_THRESHOLD = 100;
+
+    /**
+     * dictionaries builder class
+     */
+    String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index 6973c4b..b36e422 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.kylin.common.util.BytesUtil;
@@ -26,22 +25,16 @@ import org.apache.kylin.common.util.BytesUtil;
 /**
  */
 public class FactDistinctColumnPartitioner extends Partitioner<Text, Text> {
-    private Configuration conf;
-
     @Override
     public int getPartition(Text key, Text value, int numReduceTasks) {
-
         if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
             // the last reducer is for merging hll
             return numReduceTasks - 1;
         } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) {
-            // the last reducer is for merging hll
+            // the last but one reducer is for partition col
             return numReduceTasks - 2;
         } else {
-            int colIndex = BytesUtil.readUnsigned(key.getBytes(), 0, 1);
-            return colIndex;
+            return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index 28ee335..92da7d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -78,11 +78,27 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             CubeInstance cube = cubeMgr.getCube(cubeName);
             List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
 
+            int reducerCount = columnsNeedDict.size();
+            int uhcReducerCount = cube.getConfig().getUHCReducerCount();
+
+            int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor());
+            for(int index : uhcIndex) {
+                if(index == 1) {
+                    reducerCount += uhcReducerCount - 1;
+                }
+            }
+
+            if (reducerCount > 255) {
+                throw new IOException("The max reducer number for FactDistinctColumnsJob is 255, please decrease the 'kylin.job.global.dictionary.column.reducer.count' ");
+            }
+
+
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
             job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
@@ -98,7 +114,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                 logger.info("Found segment: " + segment);
             }
             setupMapper(cube.getSegmentById(segmentID));
-            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
+            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? reducerCount + 2 : reducerCount);
 
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
index 3fa966d..196bf1e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperBase.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
@@ -58,6 +60,10 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     protected CubeJoinedFlatTableEnrich intermediateTableDesc;
     protected int[] dictionaryColumnIndex;
 
+    protected int uhcReducerCount;
+    protected int[] uhcIndex;
+    protected Map<Integer, Integer> columnIndexToReducerBeginId = new HashMap<>();
+
     @Override
     protected void setup(Context context) throws IOException {
         Configuration conf = context.getConfiguration();
@@ -73,7 +79,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
 
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
 
-        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),  cubeDesc);
+        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
         dictionaryColumnIndex = new int[factDictCols.size()];
         for (int i = 0; i < factDictCols.size(); i++) {
             TblColRef colRef = factDictCols.get(i);
@@ -81,6 +87,15 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
             dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
         }
 
+        uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+        uhcReducerCount = cube.getConfig().getUHCReducerCount();
+        int count = 0;
+        for (int i = 0; i < uhcIndex.length; i++) {
+            columnIndexToReducerBeginId.put(i, count * (uhcReducerCount - 1) + i);
+            if (uhcIndex[i] == 1) {
+                count++;
+            }
+        }
     }
 
     protected void handleErrorRecord(String[] record, Exception ex) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index ecbc6c2..5b00381 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -67,6 +68,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
     private boolean isStatistics = false;
     private boolean isPartitionCol = false;
     private KylinConfig cubeConfig;
+    private int uhcReducerCount;
+    private Map<Integer, Integer> ReducerIdToColumnIndex = new HashMap<>();
+    private int taskId;
+
     protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsReducer.class);
 
     @Override
@@ -83,7 +88,10 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
 
         boolean collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
         int numberOfTasks = context.getNumReduceTasks();
-        int taskId = context.getTaskAttemptID().getTaskID().getId();
+        taskId = context.getTaskAttemptID().getTaskID().getId();
+
+        uhcReducerCount = cube.getConfig().getUHCReducerCount();
+        initReducerIdToColumnIndex(config);
 
         if (collectStatistics && (taskId == numberOfTasks - 1)) {
             // hll
@@ -102,11 +110,25 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
             // col
             isStatistics = false;
             isPartitionCol = false;
-            col = columnList.get(taskId);
+            col = columnList.get(ReducerIdToColumnIndex.get(taskId));
             colValues = Lists.newLinkedList();
         }
     }
 
+    private void initReducerIdToColumnIndex(KylinConfig config) throws IOException {
+        int[] uhcIndex = CubeManager.getInstance(config).getUHCIndex(cubeDesc);
+        int count = 0;
+        for (int i = 0; i < uhcIndex.length; i++) {
+            ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + i, i);
+            if (uhcIndex[i] == 1) {
+                for (int j = 1; j < uhcReducerCount; j++) {
+                    ReducerIdToColumnIndex.put(count * (uhcReducerCount - 1) + j + i, i);
+                }
+                count++;
+            }
+        }
+    }
+
     @Override
     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
@@ -153,10 +175,16 @@ public class FactDistinctColumnsReducer extends KylinReducer<Text, Text, NullWri
         final Configuration conf = context.getConfiguration();
         final FileSystem fs = FileSystem.get(conf);
         final String outputPath = conf.get(BatchConstants.CFG_OUTPUT_PATH);
-        final Path outputFile = new Path(outputPath, col.getName());
+        final Path colDir = new Path(outputPath, col.getName());
+        final String fileName = col.getName() + "-" + taskId % uhcReducerCount;
+        final Path outputFile = new Path(colDir, fileName);
 
         FSDataOutputStream out = null;
         try {
+            if (!fs.exists(colDir)) {
+                fs.mkdirs(colDir);
+            }
+
             if (fs.exists(outputFile)) {
                 out = fs.append(outputFile);
                 logger.info("append file " + outputFile);

http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index 177c9f6..7a183b8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -141,7 +141,17 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
                 if (fieldValue == null)
                     continue;
                 int offset = keyBuffer.position();
-                keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
+
+                int reducerIndex;
+                if (uhcIndex[i] == 0) {
+                    //for the normal dictionary column
+                    reducerIndex = columnIndexToReducerBeginId.get(i);
+                } else {
+                    //for the uhc
+                    reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+                }
+
+                keyBuffer.put(Bytes.toBytes(reducerIndex)[3]);
                 keyBuffer.put(Bytes.toBytes(fieldValue));
                 outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
                 context.write(outputKey, EMPTY_TEXT);