You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:39 UTC

[02/50] incubator-kylin git commit: KYLIN-630 add distinct column mapper for II storage

KYLIN-630 add distinct column mapper for II storage


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1b52438e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1b52438e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1b52438e

Branch: refs/heads/streaming-localdict
Commit: 1b52438e2eec3dd271b66b6a6352ccf1bc0278d3
Parents: 8e0695b
Author: honma <ho...@ebay.com>
Authored: Thu Mar 26 16:03:30 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Mar 26 16:03:30 2015 +0800

----------------------------------------------------------------------
 .../model/IIJoinedFlatTableDesc.java            |  12 +-
 .../apache/kylin/invertedindex/model/IIRow.java |  13 ++
 .../org/apache/kylin/job/JoinedFlatTable.java   |   1 -
 .../kylin/job/hadoop/AbstractHadoopJob.java     |   2 +-
 .../kylin/job/hadoop/cube/CubeHFileMapper.java  |   2 +-
 .../kylin/job/hadoop/cube/CuboidReducer.java    |   2 +-
 .../job/hadoop/cube/FactDistinctColumnsJob.java |   2 +-
 .../hadoop/cube/FactDistinctColumnsMapper.java  | 129 -------------------
 .../cube/FactDistinctColumnsMapperBase.java     |   2 +-
 .../hadoop/cube/FactDistinctColumnsReducer.java |   2 +-
 .../cube/FactDistinctHiveColumnsMapper.java     | 129 +++++++++++++++++++
 .../cube/FactDistinctIIColumnsMapper.java       | 129 +++++++++++++++++++
 .../job/hadoop/cube/MergeCuboidMapper.java      |   2 +-
 .../kylin/job/hadoop/cube/NDCuboidMapper.java   |   2 +-
 .../job/hadoop/cube/NewBaseCuboidMapper.java    |   2 +-
 .../job/hadoop/cubev2/InMemCuboidMapper.java    |   2 +-
 .../job/hadoop/cubev2/InMemCuboidReducer.java   |   2 +-
 .../invertedindex/InvertedIndexMapper.java      |   2 +-
 .../invertedindex/InvertedIndexPartitioner.java |   2 +-
 .../invertedindex/InvertedIndexReducer.java     |   2 +-
 .../metadata/model/IJoinedFlatTableDesc.java    |   2 -
 .../metadata/model/IntermediateColumnDesc.java  |   4 +
 .../endpoint/HbaseServerKVIterator.java         |   9 +-
 23 files changed, 296 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
index 44114da..14934dc 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
@@ -19,13 +19,13 @@
 package org.apache.kylin.invertedindex.model;
 
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
 
-import com.google.common.collect.Lists;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.*;
+import com.google.common.collect.Lists;
 
 /**
  * Created by Hongbin Ma(Binmahone) on 12/30/14.
@@ -35,7 +35,6 @@ public class IIJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     private IIDesc iiDesc;
     private String tableName;
     private List<IntermediateColumnDesc> columnList = Lists.newArrayList();
-    private Map<String, String> tableAliasMap;
 
     public IIJoinedFlatTableDesc(IIDesc iiDesc) {
         this.iiDesc = iiDesc;
@@ -57,6 +56,7 @@ public class IIJoinedFlatTableDesc implements IJoinedFlatTableDesc {
         return tableName + "_" + jobUUID.replace("-", "_");
     }
 
+    @Override
     public List<IntermediateColumnDesc> getColumnList() {
         return columnList;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
index aba4fff..f3d398a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIRow.java
@@ -34,7 +34,9 @@
 
 package org.apache.kylin.invertedindex.model;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.util.BytesUtil;
 
 /**
  * Created by qianzhou on 3/10/15.
@@ -50,6 +52,7 @@ public final class IIRow {
         this.value = value;
         this.dictionary = dictionary;
     }
+
     public IIRow() {
         this(new ImmutableBytesWritable(), new ImmutableBytesWritable(), new ImmutableBytesWritable());
     }
@@ -61,7 +64,17 @@ public final class IIRow {
     public ImmutableBytesWritable getValue() {
         return value;
     }
+
     public ImmutableBytesWritable getDictionary() {
         return dictionary;
     }
+
+    public void updateWith(Cell c) {
+        if (BytesUtil.compareBytes(IIDesc.HBASE_QUALIFIER_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_QUALIFIER_BYTES.length) == 0) {
+            this.getKey().set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
+            this.getValue().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+        } else if (BytesUtil.compareBytes(IIDesc.HBASE_DICTIONARY_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_DICTIONARY_BYTES.length) == 0) {
+            this.getDictionary().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index cc3dc1b..100fbca 100644
--- a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -27,7 +27,6 @@ import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
 
-import org.apache.kylin.cube.model.DimensionDesc;
 import org.w3c.dom.Document;
 import org.w3c.dom.NodeList;
 import org.xml.sax.SAXException;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 038fe2f..9f73488 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -330,7 +330,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         return input.getSplits(job).size();
     }
 
-    public static KylinConfig loadKylinPropsAndMetadata(Configuration conf) throws IOException {
+    public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
         File metaDir = new File("meta");
         System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
         logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
index 17dc24e..1236f8c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
@@ -62,7 +62,7 @@ public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWrita
         super.publishConfiguration(context.getConfiguration());
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
 
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         CubeManager cubeMgr = CubeManager.getInstance(config);
         cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
index 7181fa1..b747dff 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
@@ -64,7 +64,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
         super.publishConfiguration(context.getConfiguration());
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
 
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
         measuresDescs = cubeDesc.getMeasures();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 094014e..17c5e9b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -101,7 +101,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
                 dbTableNames[1]);
         
         job.setInputFormatClass(HCatInputFormat.class);
-        job.setMapperClass(FactDistinctColumnsMapper.class);
+        job.setMapperClass(FactDistinctHiveColumnsMapper.class);
         job.setCombinerClass(FactDistinctColumnsCombiner.class);
         job.setMapOutputKeyClass(ShortWritable.class);
         job.setMapOutputValueClass(Text.class);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
deleted file mode 100644
index 3a50249..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.apache.kylin.job.constant.BatchConstants;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, HCatRecord> {
-
-    private HCatSchema schema = null;
-    private CubeJoinedFlatTableDesc intermediateTableDesc;
-
-    protected boolean collectStatistics = false;
-    protected CuboidScheduler cuboidScheduler = null;
-    protected List<String> rowKeyValues = null;
-    protected HyperLogLogPlusCounter hll;
-    protected int nRowKey;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.setup(context);
-
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-
-
-        collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
-        if (collectStatistics) {
-            cuboidScheduler = new CuboidScheduler(cubeDesc);
-            hll = new HyperLogLogPlusCounter(16);
-            rowKeyValues = Lists.newArrayList();
-            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
-        }
-    }
-
-    @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-        try {
-            int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
-            HCatFieldSchema fieldSchema;
-            for (int i : factDictCols) {
-                outputKey.set((short) i);
-                fieldSchema = schema.get(flatTableIndexes[i]);
-                Object fieldValue = record.get(fieldSchema.getName(), schema);
-                if (fieldValue == null)
-                    continue;
-                byte[] bytes = Bytes.toBytes(fieldValue.toString());
-                outputValue.set(bytes, 0, bytes.length);
-                context.write(outputKey, outputValue);
-            }
-        } catch (Exception ex) {
-            handleErrorRecord(record, ex);
-        }
-
-        if (collectStatistics) {
-            String[] row = HiveTableReader.getRowAsStringArray(record);
-            putRowKeyToHLL(row, baseCuboidId);
-        }
-    }
-
-    private void putRowKeyToHLL(String[] row, long cuboidId) {
-        rowKeyValues.clear();
-        long mask = Long.highestOneBit(baseCuboidId);
-        for (int i = 0; i < nRowKey; i++) {
-            if ((mask & cuboidId) == 1) {
-                rowKeyValues.add(row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]);
-            }
-            mask = mask >> 1;
-        }
-
-        String key = StringUtils.join(rowKeyValues, ",");
-        hll.add(key);
-
-        Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
-        for (Long childId : children) {
-            putRowKeyToHLL(row, childId);
-        }
-
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        if (collectStatistics) {
-            // output hll to reducer, key is -1
-            // keyBuf = Bytes.toBytes(-1);
-            outputKey.set((short) -1);
-            ByteBuffer hllBuf = ByteBuffer.allocate(64 * 1024);
-            hll.writeRegisters(hllBuf);
-            outputValue.set(hllBuf.array());
-            context.write(outputKey, outputValue);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
index 603277c..c0455ff 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
@@ -40,7 +40,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     protected void setup(Context context) throws IOException {
         Configuration conf = context.getConfiguration();
         publishConfiguration(conf);
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         cube = CubeManager.getInstance(config).getCube(cubeName);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 383def4..2052d08 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -61,7 +61,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
         super.publishConfiguration(context.getConfiguration());
 
         Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         CubeDesc cubeDesc = cube.getDescriptor();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
new file mode 100644
index 0000000..64ae353
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.job.constant.BatchConstants;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, HCatRecord> {
+
+    private HCatSchema schema = null;
+    private CubeJoinedFlatTableDesc intermediateTableDesc;
+
+    protected boolean collectStatistics = false;
+    protected CuboidScheduler cuboidScheduler = null;
+    protected List<String> rowKeyValues = null;
+    protected HyperLogLogPlusCounter hll;
+    protected int nRowKey;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+
+        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
+        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+
+
+        collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
+        if (collectStatistics) {
+            cuboidScheduler = new CuboidScheduler(cubeDesc);
+            hll = new HyperLogLogPlusCounter(16);
+            rowKeyValues = Lists.newArrayList();
+            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+        }
+    }
+
+    @Override
+    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+        try {
+            int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+            HCatFieldSchema fieldSchema;
+            for (int i : factDictCols) {
+                outputKey.set((short) i);
+                fieldSchema = schema.get(flatTableIndexes[i]);
+                Object fieldValue = record.get(fieldSchema.getName(), schema);
+                if (fieldValue == null)
+                    continue;
+                byte[] bytes = Bytes.toBytes(fieldValue.toString());
+                outputValue.set(bytes, 0, bytes.length);
+                context.write(outputKey, outputValue);
+            }
+        } catch (Exception ex) {
+            handleErrorRecord(record, ex);
+        }
+
+        if (collectStatistics) {
+            String[] row = HiveTableReader.getRowAsStringArray(record);
+            putRowKeyToHLL(row, baseCuboidId);
+        }
+    }
+
+    private void putRowKeyToHLL(String[] row, long cuboidId) {
+        rowKeyValues.clear();
+        long mask = Long.highestOneBit(baseCuboidId);
+        for (int i = 0; i < nRowKey; i++) {
+            if ((mask & cuboidId) == 1) {
+                rowKeyValues.add(row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]);
+            }
+            mask = mask >> 1;
+        }
+
+        String key = StringUtils.join(rowKeyValues, ",");
+        hll.add(key);
+
+        Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+        for (Long childId : children) {
+            putRowKeyToHLL(row, childId);
+        }
+
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        if (collectStatistics) {
+            // output hll to reducer, key is -1
+            // keyBuf = Bytes.toBytes(-1);
+            outputKey.set((short) -1);
+            ByteBuffer hllBuf = ByteBuffer.allocate(64 * 1024);
+            hll.writeRegisters(hllBuf);
+            outputValue.set(hllBuf.array());
+            context.write(outputKey, outputValue);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
new file mode 100644
index 0000000..75e127e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.*;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<ImmutableBytesWritable, Result> {
+
+    private IIJoinedFlatTableDesc intermediateTableDesc;
+    private ArrayList<IIRow> buffer = Lists.newArrayList();
+    private Iterable<Slice> slices;
+
+    private String iiName;
+    private IIInstance ii;
+    private IIDesc iiDesc;
+
+    private int[] baseCuboidCol2FlattenTableCol;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+
+        Configuration conf = context.getConfiguration();
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        iiName = conf.get(BatchConstants.CFG_II_NAME);
+        ii = IIManager.getInstance(config).getII(iiName);
+        iiDesc = ii.getDescriptor();
+
+        intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
+        TableRecordInfo info = new TableRecordInfo(iiDesc);
+        KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
+        slices = codec.decodeKeyValue(buffer);
+
+        baseCuboidCol2FlattenTableCol = new int[factDictCols.size()];
+        for (int i = 0; i < factDictCols.size(); ++i) {
+            int index = findTblCol(intermediateTableDesc.getColumnList(), columns.get(factDictCols.get(i)));
+            baseCuboidCol2FlattenTableCol[i] = index;
+        }
+    }
+
+    private int findTblCol(List<IntermediateColumnDesc> columns, final TblColRef col) {
+        return Iterators.indexOf(columns.iterator(), new Predicate<IntermediateColumnDesc>() {
+            @Override
+            public boolean apply(IntermediateColumnDesc input) {
+                return input.getColRef().equals(col);
+            }
+        });
+    }
+
+    @Override
+    public void map(ImmutableBytesWritable key, Result cells, Context context) throws IOException, InterruptedException {
+        IIRow iiRow = new IIRow();
+        for (Cell c : cells.rawCells()) {
+            iiRow.updateWith(c);
+        }
+        buffer.add(iiRow);
+
+        if (slices.iterator().hasNext()) {
+            byte[] vBytesBuffer = null;
+            Slice slice = slices.iterator().next();
+
+            for (RawTableRecord record : slice) {
+                for (int i = 0; i < factDictCols.size(); ++i) {
+                    int baseCuboidIndex = factDictCols.get(i);
+                    outputKey.set((short) baseCuboidIndex);
+                    int indexInRecord = baseCuboidCol2FlattenTableCol[i];
+
+                    Dictionary<?> dictionary = slice.getLocalDictionaries().get(indexInRecord);
+                    if (vBytesBuffer == null || dictionary.getSizeOfValue() > vBytesBuffer.length) {
+                        vBytesBuffer = new byte[dictionary.getSizeOfValue() * 2];
+                    }
+
+                    int vid = record.getValueID(baseCuboidIndex);
+                    if (vid == dictionary.nullId()) {
+                        continue;
+                    }
+                    int vBytesSize = dictionary.getValueBytesFromId(vid, vBytesBuffer, 0);
+
+                    outputValue.set(vBytesBuffer, 0, vBytesSize);
+                    context.write(outputKey, outputValue);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
index 431f2b7..417e996 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
@@ -114,7 +114,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
         segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
 
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         cubeManager = CubeManager.getInstance(config);
         cube = cubeManager.getCube(cubeName);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
index dc65baa..e476bd7 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
@@ -68,7 +68,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
         segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
 
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
index e75457e..79c334c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
@@ -125,7 +125,7 @@ public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, T
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
         segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
 
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         metadataManager = MetadataManager.getInstance(config);
         cube = CubeManager.getInstance(config).getCube(cubeName);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index a58369f..5a3565a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -81,7 +81,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Tex
 
         Configuration conf = context.getConfiguration();
 
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         cube = CubeManager.getInstance(config).getCube(cubeName);
         cubeDesc = cube.getDescriptor();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
index de2539c..48fe3a1 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidReducer.java
@@ -44,7 +44,7 @@ public class InMemCuboidReducer extends KylinReducer<Text, Text, Text, Text> {
         super.publishConfiguration(context.getConfiguration());
         cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
 
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
         measuresDescs = cubeDesc.getMeasures();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
index 735a945..0344043 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
@@ -58,7 +58,7 @@ public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, L
 
         Configuration conf = context.getConfiguration();
 
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         IIManager mgr = IIManager.getInstance(config);
         IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
         IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
index 141565f..fa4dccf 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
@@ -54,7 +54,7 @@ public class InvertedIndexPartitioner extends Partitioner<LongWritable, Immutabl
     public void setConf(Configuration conf) {
         this.conf = conf;
         try {
-            KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+            KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
             IIManager mgr = IIManager.getInstance(config);
             IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
             IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
index a1c0811..9f238b0 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -53,7 +53,7 @@ public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBy
         super.publishConfiguration(context.getConfiguration());
 
         Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
         IIManager mgr = IIManager.getInstance(config);
         IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
         IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index 917e12b..abf87b7 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -20,8 +20,6 @@ package org.apache.kylin.metadata.model;
 
 import java.util.List;
 
-import org.apache.kylin.metadata.model.DataModelDesc;
-
 /**
  * Created by Hongbin Ma(Binmahone) on 12/30/14.
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
index 860773c..1b931a0 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/IntermediateColumnDesc.java
@@ -36,6 +36,10 @@ public class IntermediateColumnDesc {
         return id;
     }
 
+    public TblColRef getColRef() {
+        return this.colRef;
+    }
+
     public String getColumnName() {
         return colRef.getName();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1b52438e/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
index a7b67d8..5d9f633 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/HbaseServerKVIterator.java
@@ -63,7 +63,6 @@ public class HbaseServerKVIterator implements Iterable<IIRow>, Closeable {
             this.regionScanner = innerScanner;
         }
 
-
         @Override
         public boolean hasNext() {
             return hasMore;
@@ -81,12 +80,7 @@ public class HbaseServerKVIterator implements Iterable<IIRow>, Closeable {
                 throw new IllegalStateException("Hbase row contains less than 1 cell");
             }
             for (Cell c : results) {
-                if (BytesUtil.compareBytes(IIDesc.HBASE_QUALIFIER_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_QUALIFIER_BYTES.length) == 0) {
-                    row.getKey().set(c.getRowArray(), c.getRowOffset(), c.getRowLength());
-                    row.getValue().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
-                } else if (BytesUtil.compareBytes(IIDesc.HBASE_DICTIONARY_BYTES, 0, c.getQualifierArray(), c.getQualifierOffset(), IIDesc.HBASE_DICTIONARY_BYTES.length) == 0) {
-                    row.getDictionary().set(c.getValueArray(), c.getValueOffset(), c.getValueLength());
-                }
+                row.updateWith(c);
             }
             return row;
         }
@@ -97,7 +91,6 @@ public class HbaseServerKVIterator implements Iterable<IIRow>, Closeable {
         }
     }
 
-
     @Override
     public Iterator<IIRow> iterator() {
         return new IIRowIterator(innerScanner);