You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2017/02/14 03:19:03 UTC

[08/11] kylin git commit: KYLIN-2442 calculate raw data size using custom counter

KYLIN-2442 calculate raw data size using custom counter


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/405dee26
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/405dee26
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/405dee26

Branch: refs/heads/KYLIN-2428
Commit: 405dee26d7fe463d15bf5f1d7690359c9e83f678
Parents: 43c0566
Author: Li Yang <li...@apache.org>
Authored: Fri Feb 10 16:56:21 2017 +0800
Committer: Li Yang <li...@apache.org>
Committed: Mon Feb 13 10:20:54 2017 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |   1 -
 .../kylin/engine/mr/common/HadoopCmdOutput.java |   9 +-
 .../engine/mr/common/MapReduceExecutable.java   |   8 +-
 .../mr/steps/FactDistinctColumnPartitioner.java |   4 +-
 .../engine/mr/steps/FactDistinctColumnsJob.java |   2 +-
 .../mr/steps/FactDistinctColumnsMapper.java     | 262 +++++++++++++++++++
 .../mr/steps/FactDistinctHiveColumnsMapper.java | 230 ----------------
 7 files changed, 272 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 36c12a1..1ec23b6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -100,7 +100,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
         return baseCuboidStep;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
index 9d016cc..2a480e6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopCmdOutput.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.RawDataCounter;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +68,7 @@ public class HadoopCmdOutput {
 
     private String mapInputRecords;
     private String hdfsBytesWritten;
-    private String hdfsBytesRead;
+    private String rawInputBytesRead;
 
     public String getMapInputRecords() {
         return mapInputRecords;
@@ -77,8 +78,8 @@ public class HadoopCmdOutput {
         return hdfsBytesWritten;
     }
 
-    public String getHdfsBytesRead() {
-        return hdfsBytesRead;
+    public String getRawInputBytesRead() {
+        return rawInputBytesRead;
     }
 
     public void updateJobCounter() {
@@ -95,7 +96,7 @@ public class HadoopCmdOutput {
 
             mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
             hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
-            hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
+            rawInputBytesRead = String.valueOf(counters.findCounter(RawDataCounter.BYTES).getValue());
         } catch (Exception e) {
             logger.error(e.getLocalizedMessage(), e);
             output.append(e.getLocalizedMessage());

http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index 6de07ca..2e7a289 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -202,14 +202,14 @@ public class MapReduceExecutable extends AbstractExecutable {
     private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
         hadoopCmdOutput.updateJobCounter();
         info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
-        info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
+        info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getRawInputBytesRead());
         info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
 
         String saveAs = getParam(KEY_COUNTER_SAVEAS);
         if (saveAs != null) {
             String[] saveAsNames = saveAs.split(",");
             saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
-            saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
+            saveCounterAs(hadoopCmdOutput.getRawInputBytesRead(), saveAsNames, 1, info);
             saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
         }
     }
@@ -244,10 +244,6 @@ public class MapReduceExecutable extends AbstractExecutable {
         setParam(KEY_PARAMS, param);
     }
 
-    public String getCounterSaveAs() {
-        return getParam(KEY_COUNTER_SAVEAS);
-    }
-
     public void setCounterSaveAs(String value) {
         setParam(KEY_COUNTER_SAVEAS, value);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
index e8817a5..5fcfe42 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java
@@ -29,10 +29,10 @@ public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortabl
     @Override
     public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) {
         Text key = skey.getText();
-        if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_HLL) {
+        if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_HLL) {
             // the last reducer is for merging hll
             return numReduceTasks - 1;
-        } else if (key.getBytes()[0] == FactDistinctHiveColumnsMapper.MARK_FOR_PARTITION_COL) {
+        } else if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) {
             // the last but one reducer is for partition col
             return numReduceTasks - 2;
         } else {

http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
index aded600..ee0989a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java
@@ -140,7 +140,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
         IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
         flatTableInputFormat.configureJob(job);
 
-        job.setMapperClass(FactDistinctHiveColumnsMapper.class);
+        job.setMapperClass(FactDistinctColumnsMapper.class);
         job.setCombinerClass(FactDistinctColumnsCombiner.class);
         job.setMapOutputKeyClass(SelfDefineSortableKey.class);
         job.setMapOutputValueClass(Text.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
new file mode 100644
index 0000000..9d0ff10
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+/**
+ */
+public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
+
+    private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsMapper.class);
+    
+    public static enum RawDataCounter { BYTES };
+
+    protected boolean collectStatistics = false;
+    protected CuboidScheduler cuboidScheduler = null;
+    protected int nRowKey;
+    private Integer[][] allCuboidsBitSet = null;
+    private HLLCounter[] allCuboidsHLL = null;
+    private Long[] cuboidIds;
+    private HashFunction hf = null;
+    private int rowCount = 0;
+    private int samplingPercentage;
+    private ByteArray[] row_hashcodes = null;
+    private ByteBuffer tmpbuf;
+    private static final Text EMPTY_TEXT = new Text();
+    public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
+    public static final byte MARK_FOR_HLL = (byte) 0xFF;
+
+    private int partitionColumnIndex = -1;
+    private boolean needFetchPartitionCol = true;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+        tmpbuf = ByteBuffer.allocate(4096);
+        collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
+        if (collectStatistics) {
+            samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+            cuboidScheduler = new CuboidScheduler(cubeDesc);
+            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+            List<Long> cuboidIdList = Lists.newArrayList();
+            List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+            addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
+
+            allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]);
+            cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
+
+            allCuboidsHLL = new HLLCounter[cuboidIds.length];
+            for (int i = 0; i < cuboidIds.length; i++) {
+                allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+            }
+
+            hf = Hashing.murmur3_32();
+            row_hashcodes = new ByteArray[nRowKey];
+            for (int i = 0; i < nRowKey; i++) {
+                row_hashcodes[i] = new ByteArray();
+            }
+
+            TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            if (partitionColRef != null) {
+                partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
+            }
+
+            // check whether need fetch the partition col values
+            if (partitionColumnIndex < 0) {
+                // if partition col not on cube, no need
+                needFetchPartitionCol = false;
+            } else {
+                needFetchPartitionCol = true;
+            }
+        }
+    }
+
+    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) {
+        allCuboids.add(cuboidId);
+        Integer[] indice = new Integer[Long.bitCount(cuboidId)];
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        int position = 0;
+        for (int i = 0; i < nRowKey; i++) {
+            if ((mask & cuboidId) > 0) {
+                indice[position] = i;
+                position++;
+            }
+            mask = mask >> 1;
+        }
+
+        allCuboidsBitSet.add(indice);
+        Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+        for (Long childId : children) {
+            addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
+        }
+    }
+
+    @Override
+    public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+        String[] row = flatTableInputFormat.parseMapperInput(record);
+        
+        context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
+
+        for (int i = 0; i < factDictCols.size(); i++) {
+            String fieldValue = row[dictionaryColumnIndex[i]];
+            if (fieldValue == null)
+                continue;
+
+            int reducerIndex;
+            if (uhcIndex[i] == 0) {
+                //for the normal dictionary column
+                reducerIndex = columnIndexToReducerBeginId.get(i);
+            } else {
+                //for the uhc
+                reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+            }
+
+            tmpbuf.clear();
+            tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+            tmpbuf.put(Bytes.toBytes(fieldValue));
+            outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+            sortableKey.setText(outputKey);
+            //judge type
+            sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType());
+            context.write(sortableKey, EMPTY_TEXT);
+
+            // log a few rows for troubleshooting
+            if (rowCount < 10) {
+                logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
+            }
+        }
+
+        if (collectStatistics) {
+            if (rowCount % 100 < samplingPercentage) {
+                putRowKeyToHLL(row);
+            }
+
+            if (needFetchPartitionCol == true) {
+                String fieldValue = row[partitionColumnIndex];
+                if (fieldValue != null) {
+                    tmpbuf.clear();
+                    tmpbuf.put(MARK_FOR_PARTITION_COL);
+                    tmpbuf.put(Bytes.toBytes(fieldValue));
+                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                    sortableKey.setText(outputKey);
+                    sortableKey.setTypeId((byte) 0);
+                    context.write(sortableKey, EMPTY_TEXT);
+                }
+            }
+        }
+        rowCount++;
+    }
+
+    private long countSizeInBytes(String[] row) {
+        int size = 0;
+        for (String s : row) {
+            size += s == null ? 1 : utf8Length(s);
+            size++; // delimiter
+        }
+        return size;
+    }
+    
+    // calculating length in UTF-8 of Java String without actually encoding it
+    public static int utf8Length(CharSequence sequence) {
+        int count = 0;
+        for (int i = 0, len = sequence.length(); i < len; i++) {
+            char ch = sequence.charAt(i);
+            if (ch <= 0x7F) {
+                count++;
+            } else if (ch <= 0x7FF) {
+                count += 2;
+            } else if (Character.isHighSurrogate(ch)) {
+                count += 4;
+                ++i;
+            } else {
+                count += 3;
+            }
+        }
+        return count;
+    }
+
+    private void putRowKeyToHLL(String[] row) {
+
+        //generate hash for each row key column
+        for (int i = 0; i < nRowKey; i++) {
+            Hasher hc = hf.newHasher();
+            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+            if (colValue != null) {
+                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+            } else {
+                row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+            }
+        }
+
+        // user the row key column hash to get a consolidated hash for each cuboid
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            Hasher hc = hf.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+            }
+
+            allCuboidsHLL[i].add(hc.hash().asBytes());
+        }
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        if (collectStatistics) {
+            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            HLLCounter hll;
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = allCuboidsHLL[i];
+
+                tmpbuf.clear();
+                tmpbuf.put(MARK_FOR_HLL); // one byte
+                tmpbuf.putLong(cuboidIds[i]);
+                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                sortableKey.setText(outputKey);
+                sortableKey.setTypeId((byte) 0);
+                context.write(sortableKey, outputValue);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/405dee26/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
deleted file mode 100644
index ed65343..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.hllc.HLLCounter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-
-/**
- */
-public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
-
-    private static final Logger logger = LoggerFactory.getLogger(FactDistinctHiveColumnsMapper.class);
-
-    protected boolean collectStatistics = false;
-    protected CuboidScheduler cuboidScheduler = null;
-    protected int nRowKey;
-    private Integer[][] allCuboidsBitSet = null;
-    private HLLCounter[] allCuboidsHLL = null;
-    private Long[] cuboidIds;
-    private HashFunction hf = null;
-    private int rowCount = 0;
-    private int samplingPercentage;
-    private ByteArray[] row_hashcodes = null;
-    private ByteBuffer tmpbuf;
-    private static final Text EMPTY_TEXT = new Text();
-    public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
-    public static final byte MARK_FOR_HLL = (byte) 0xFF;
-
-    private int partitionColumnIndex = -1;
-    private boolean needFetchPartitionCol = true;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.setup(context);
-        tmpbuf = ByteBuffer.allocate(4096);
-        collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
-        if (collectStatistics) {
-            samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
-            cuboidScheduler = new CuboidScheduler(cubeDesc);
-            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
-
-            List<Long> cuboidIdList = Lists.newArrayList();
-            List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
-            addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
-
-            allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]);
-            cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
-
-            allCuboidsHLL = new HLLCounter[cuboidIds.length];
-            for (int i = 0; i < cuboidIds.length; i++) {
-                allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
-            }
-
-            hf = Hashing.murmur3_32();
-            row_hashcodes = new ByteArray[nRowKey];
-            for (int i = 0; i < nRowKey; i++) {
-                row_hashcodes[i] = new ByteArray();
-            }
-
-            TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
-            if (partitionColRef != null) {
-                partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
-            }
-
-            // check whether need fetch the partition col values
-            if (partitionColumnIndex < 0) {
-                // if partition col not on cube, no need
-                needFetchPartitionCol = false;
-            } else {
-                needFetchPartitionCol = true;
-            }
-        }
-    }
-
-    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) {
-        allCuboids.add(cuboidId);
-        Integer[] indice = new Integer[Long.bitCount(cuboidId)];
-
-        long mask = Long.highestOneBit(baseCuboidId);
-        int position = 0;
-        for (int i = 0; i < nRowKey; i++) {
-            if ((mask & cuboidId) > 0) {
-                indice[position] = i;
-                position++;
-            }
-            mask = mask >> 1;
-        }
-
-        allCuboidsBitSet.add(indice);
-        Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
-        for (Long childId : children) {
-            addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
-        }
-    }
-
-    @Override
-    public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
-        String[] row = flatTableInputFormat.parseMapperInput(record);
-
-        for (int i = 0; i < factDictCols.size(); i++) {
-            String fieldValue = row[dictionaryColumnIndex[i]];
-            if (fieldValue == null)
-                continue;
-
-            int reducerIndex;
-            if (uhcIndex[i] == 0) {
-                //for the normal dictionary column
-                reducerIndex = columnIndexToReducerBeginId.get(i);
-            } else {
-                //for the uhc
-                reducerIndex = columnIndexToReducerBeginId.get(i) + (fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
-            }
-
-            tmpbuf.clear();
-            tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
-            tmpbuf.put(Bytes.toBytes(fieldValue));
-            outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-            sortableKey.setText(outputKey);
-            //judge type
-            sortableKey.setTypeIdByDatatype(factDictCols.get(i).getType());
-            context.write(sortableKey, EMPTY_TEXT);
-
-            // log a few rows for troubleshooting
-            if (rowCount < 10) {
-                logger.info("Sample output: " + factDictCols.get(i) + " '" + fieldValue + "' => reducer " + reducerIndex);
-            }
-        }
-
-        if (collectStatistics) {
-            if (rowCount % 100 < samplingPercentage) {
-                putRowKeyToHLL(row);
-            }
-
-            if (needFetchPartitionCol == true) {
-                String fieldValue = row[partitionColumnIndex];
-                if (fieldValue != null) {
-                    tmpbuf.clear();
-                    tmpbuf.put(MARK_FOR_PARTITION_COL);
-                    tmpbuf.put(Bytes.toBytes(fieldValue));
-                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                    sortableKey.setText(outputKey);
-                    sortableKey.setTypeId((byte) 0);
-                    context.write(sortableKey, EMPTY_TEXT);
-                }
-            }
-        }
-        rowCount++;
-    }
-
-    private void putRowKeyToHLL(String[] row) {
-
-        //generate hash for each row key column
-        for (int i = 0; i < nRowKey; i++) {
-            Hasher hc = hf.newHasher();
-            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
-            if (colValue != null) {
-                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
-            } else {
-                row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
-            }
-        }
-
-        // user the row key column hash to get a consolidated hash for each cuboid
-        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-            Hasher hc = hf.newHasher();
-            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
-            }
-
-            allCuboidsHLL[i].add(hc.hash().asBytes());
-        }
-    }
-
-    @Override
-    protected void doCleanup(Context context) throws IOException, InterruptedException {
-        if (collectStatistics) {
-            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
-            // output each cuboid's hll to reducer, key is 0 - cuboidId
-            HLLCounter hll;
-            for (int i = 0; i < cuboidIds.length; i++) {
-                hll = allCuboidsHLL[i];
-
-                tmpbuf.clear();
-                tmpbuf.put(MARK_FOR_HLL); // one byte
-                tmpbuf.putLong(cuboidIds[i]);
-                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                hllBuf.clear();
-                hll.writeRegisters(hllBuf);
-                outputValue.set(hllBuf.array(), 0, hllBuf.position());
-                sortableKey.setText(outputKey);
-                sortableKey.setTypeId((byte) 0);
-                context.write(sortableKey, outputValue);
-            }
-        }
-    }
-}