You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 16:15:53 UTC

[15/18] kylin git commit: KYLIN-1851:unfinished add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey

KYLIN-1851:unfinished add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey

Signed-off-by: Yang Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0804f95
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0804f95
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0804f95

Branch: refs/heads/KYLIN-2006
Commit: f0804f95ae59ef7adcc0e6e4fe9a3b3620586b96
Parents: ddec049
Author: xiefan46 <95...@qq.com>
Authored: Mon Nov 7 14:37:22 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Tue Nov 8 23:23:34 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/dict/ByteComparator.java   |  44 ++
 .../kylin/dict/NumberDictionaryForest.java      | 278 ++++++++
 .../dict/NumberDictionaryForestBuilder.java     |  58 ++
 .../apache/kylin/dict/TrieDictionaryForest.java | 406 ++++++++++++
 .../kylin/dict/TrieDictionaryForestBuilder.java | 125 ++++
 .../kylin/dict/TrieDictionaryForestTest.java    | 657 +++++++++++++++++++
 .../fdc2/FactDistinctColumnPartitioner2.java    |  47 ++
 .../fdc2/FactDistinctColumnsCombiner2.java      |  44 ++
 .../mr/steps/fdc2/FactDistinctColumnsJob2.java  | 149 +++++
 .../fdc2/FactDistinctColumnsMapperBase2.java    | 102 +++
 .../fdc2/FactDistinctHiveColumnsMapper2.java    | 232 +++++++
 .../mr/steps/fdc2/SelfDefineSortableKey.java    | 130 ++++
 .../kylin/engine/mr/steps/fdc2/TypeFlag.java    |  28 +
 .../mr/steps/NumberDictionaryForestTest.java    | 214 ++++++
 .../mr/steps/SelfDefineSortableKeyTest.java     | 228 +++++++
 15 files changed, 2742 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java
new file mode 100644
index 0000000..74d5ec5
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.ByteArray;
+
+import java.util.Comparator;
+
+/**
+ * Created by xiefan on 16-10-28.
+ */
+public class ByteComparator<T> implements Comparator<T> {
+    private BytesConverter<T> converter;
+
+    public ByteComparator(BytesConverter<T> converter) {
+        this.converter = converter;
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+        //return BytesUtil.safeCompareBytes(converter.convertToBytes(o1),converter.convertToBytes(o2));
+        byte[] b1 = converter.convertToBytes(o1);
+        byte[] b2 = converter.convertToBytes(o2);
+        ByteArray ba1 = new ByteArray(b1, 0, b1.length);
+        ByteArray ba2 = new ByteArray(b2, 0, b2.length);
+        return ba1.compareTo(ba2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java
new file mode 100644
index 0000000..8caa4b6
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Created by xiefan on 16-11-1.
+ * <p>
+ * notice:number dictionary forest currently could not handle
+ * very big or very small double and float value such as 4.9E-324
+ */
+public class NumberDictionaryForest<T> extends Dictionary<T> {
+
+    public static final int MAX_DIGITS_BEFORE_DECIMAL_POINT = 19;
+
+    // encode a number into an order preserving byte sequence
+    // for positives -- padding '0'
+    // for negatives -- '-' sign, padding '9', invert digits, and terminate by ';'
+    static class NumberBytesCodec {
+        int maxDigitsBeforeDecimalPoint;
+        byte[] buf;
+        int bufOffset;
+        int bufLen;
+
+        NumberBytesCodec(int maxDigitsBeforeDecimalPoint) {
+            this.maxDigitsBeforeDecimalPoint = maxDigitsBeforeDecimalPoint;
+            this.buf = new byte[maxDigitsBeforeDecimalPoint * 3];
+            this.bufOffset = 0;
+            this.bufLen = 0;
+        }
+
+        void encodeNumber(byte[] value, int offset, int len) {
+            if (len == 0) {
+                bufOffset = 0;
+                bufLen = 0;
+                return;
+            }
+
+            if (len > buf.length) {
+                throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Internal buffer is only " + buf.length + " bytes");
+            }
+
+            boolean negative = value[offset] == '-';
+
+            // terminate negative ';'
+            int start = buf.length - len;
+            int end = buf.length;
+            if (negative) {
+                start--;
+                end--;
+                buf[end] = ';';
+            }
+
+            // copy & find decimal point
+            int decimalPoint = end;
+            for (int i = start, j = offset; i < end; i++, j++) {
+                buf[i] = value[j];
+                if (buf[i] == '.' && i < decimalPoint) {
+                    decimalPoint = i;
+                }
+            }
+            // remove '-' sign
+            if (negative) {
+                start++;
+            }
+
+            // prepend '0'
+            int nZeroPadding = maxDigitsBeforeDecimalPoint - (decimalPoint - start);
+            if (nZeroPadding < 0 || nZeroPadding + 1 > start)
+                throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Expect " + maxDigitsBeforeDecimalPoint + " digits before decimal point at max.");
+            for (int i = 0; i < nZeroPadding; i++) {
+                buf[--start] = '0';
+            }
+
+            // consider negative
+            if (negative) {
+                buf[--start] = '-';
+                for (int i = start + 1; i < buf.length; i++) {
+                    int c = buf[i];
+                    if (c >= '0' && c <= '9') {
+                        buf[i] = (byte) ('9' - (c - '0'));
+                    }
+                }
+            } else {
+                buf[--start] = '0';
+            }
+
+            bufOffset = start;
+            bufLen = buf.length - start;
+        }
+
+        int decodeNumber(byte[] returnValue, int offset) {
+            if (bufLen == 0) {
+                return 0;
+            }
+
+            int in = bufOffset;
+            int end = bufOffset + bufLen;
+            int out = offset;
+
+            // sign
+            boolean negative = buf[in] == '-';
+            if (negative) {
+                returnValue[out++] = '-';
+                in++;
+                end--;
+            }
+
+            // remove padding
+            byte padding = (byte) (negative ? '9' : '0');
+            for (; in < end; in++) {
+                if (buf[in] != padding)
+                    break;
+            }
+
+            // all paddings before '.', special case for '0'
+            if (in == end || !(buf[in] >= '0' && buf[in] <= '9')) {
+                returnValue[out++] = '0';
+            }
+
+            // copy the rest
+            if (negative) {
+                for (; in < end; in++, out++) {
+                    int c = buf[in];
+                    if (c >= '0' && c <= '9') {
+                        c = '9' - (c - '0');
+                    }
+                    returnValue[out] = (byte) c;
+                }
+            } else {
+                System.arraycopy(buf, in, returnValue, out, end - in);
+                out += end - in;
+            }
+
+            return out - offset;
+        }
+    }
+
+    static ThreadLocal<NumberBytesCodec> localCodec =
+            new ThreadLocal<NumberBytesCodec>();
+
+    // ============================================================================
+
+    private TrieDictionaryForest<T> dict;
+
+    private BytesConverter<T> converter;
+
+    public NumberDictionaryForest() {
+    }
+
+    public NumberDictionaryForest(TrieDictionaryForest<T> dict, BytesConverter<T> converter) {
+        this.dict = dict;
+        this.converter = converter;
+    }
+
+    protected NumberBytesCodec getCodec() {
+        NumberBytesCodec codec = localCodec.get();
+        if (codec == null) {
+            codec = new NumberBytesCodec(MAX_DIGITS_BEFORE_DECIMAL_POINT);
+            localCodec.set(codec);
+        }
+        return codec;
+    }
+
+    @Override
+    public int getMinId() {
+        return dict.getMinId();
+    }
+
+    @Override
+    public int getMaxId() {
+        return dict.getMaxId();
+    }
+
+    @Override
+    public int getSizeOfId() {
+        return dict.getSizeOfId();
+    }
+
+    @Override
+    public int getSizeOfValue() {
+        return dict.getSizeOfValue();
+    }
+
+    @Override
+    public boolean contains(Dictionary<?> another) {
+        return dict.contains(another);
+    }
+
+    @Override
+    protected int getIdFromValueImpl(T value, int roundingFlag) {
+        if (value == null) return -1;
+        byte[] data = converter.convertToBytes(value);
+        return getIdFromValueBytesImpl(data, 0, data.length, roundingFlag);
+    }
+
+    @Override
+    protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+        NumberBytesCodec codec = getCodec();
+        codec.encodeNumber(value, offset, len);
+        return this.dict.getIdFromValueBytesImpl(codec.buf, codec.bufOffset, codec.bufLen, roundingFlag);
+    }
+
+    @Override
+    protected T getValueFromIdImpl(int id) {
+        byte[] data = getValueBytesFromIdImpl(id);
+        if (data == null) return null;
+        else return converter.convertFromBytes(data, 0, data.length);
+    }
+
+    @Override
+    protected byte[] getValueBytesFromIdImpl(int id) {
+        NumberBytesCodec codec = getCodec();
+        codec.bufOffset = 0;
+        byte[] buf = new byte[dict.getSizeOfValue()];
+        codec.bufLen = getValueBytesFromIdImpl(id, buf, 0);
+
+        if (codec.bufLen == buf.length) {
+            return buf;
+        } else {
+            byte[] result = new byte[codec.bufLen];
+            System.arraycopy(buf, 0, result, 0, codec.bufLen);
+            return result;
+        }
+    }
+
+    @Override
+    protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset) {
+        NumberBytesCodec codec = getCodec();
+        codec.bufOffset = 0;
+        codec.bufLen = this.dict.getValueBytesFromIdImpl(id, codec.buf, 0);
+        return codec.decodeNumber(returnValue, offset);
+    }
+
+    @Override
+    public void dump(PrintStream out) {
+        dict.dump(out);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        dict.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.dict = new TrieDictionaryForest<>();
+        dict.readFields(in);
+        this.converter = this.dict.getBytesConvert();
+    }
+
+    public BytesConverter<T> getConverter() {
+        return converter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
new file mode 100644
index 0000000..5444bb7
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+
+/**
+ * Created by xiefan on 16-11-2.
+ */
+public class NumberDictionaryForestBuilder<T> {
+
+    private TrieDictionaryForestBuilder<T> trieBuilder;
+
+    private BytesConverter<T> bytesConverter;
+
+    private NumberDictionaryForest.NumberBytesCodec codec = new NumberDictionaryForest.NumberBytesCodec(
+            NumberDictionaryForest.MAX_DIGITS_BEFORE_DECIMAL_POINT);
+
+    public NumberDictionaryForestBuilder(BytesConverter<T> bytesConverter) {
+        this(bytesConverter, 0);
+    }
+
+    public NumberDictionaryForestBuilder(BytesConverter<T> bytesConverter, int baseId) {
+        this.trieBuilder = new TrieDictionaryForestBuilder<T>(bytesConverter, baseId);
+        this.bytesConverter = bytesConverter;
+    }
+
+    public void addValue(T value) {
+        addValue(bytesConverter.convertToBytes(value));
+    }
+
+    public void addValue(byte[] value) {
+        codec.encodeNumber(value, 0, value.length);
+        byte[] copy = Bytes.copy(codec.buf, codec.bufOffset, codec.bufLen);
+        this.trieBuilder.addValue(copy);
+    }
+
+    public NumberDictionaryForest<T> build() {
+        TrieDictionaryForest<T> forest = trieBuilder.build();
+        return new NumberDictionaryForest<T>(forest, bytesConverter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
new file mode 100755
index 0000000..e9ccc56
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * use trie forest to optimize trie dictionary
+ * <p>
+ * the input data must in an increase order(sort by org.apache.kylin.dict.ByteComparator)
+ * <p>
+ * Created by xiefan on 16-10-26.
+ */
+public class TrieDictionaryForest<T> extends Dictionary<T> {
+
+    private static final Logger logger = LoggerFactory.getLogger(TrieDictionaryForest.class);
+
+    private ArrayList<TrieDictionary<T>> trees;
+
+    //private ArrayList<byte[]> valueDivide; //find tree
+
+    private ArrayList<ByteArray> valueDivide;
+
+    private ArrayList<Integer> accuOffset;  //find tree
+
+    private BytesConverter<T> bytesConvert;
+
+    private int baseId;
+
+    /*public AtomicLong getValueIndexTime = new AtomicLong(0);
+
+    public AtomicLong getValueTime = new AtomicLong(0);
+
+    public AtomicLong binarySearchTime = new AtomicLong(0);
+
+    public AtomicLong copyTime = new AtomicLong(0);
+
+    public AtomicLong getValueIndexTime2 = new AtomicLong(0);
+
+    public AtomicLong getValueTime2 = new AtomicLong(0);*/
+
+    public TrieDictionaryForest() { // default constructor for Writable interface
+
+    }
+
+    public TrieDictionaryForest(ArrayList<TrieDictionary<T>> trees,
+                                ArrayList<ByteArray> valueDivide, ArrayList<Integer> accuOffset, BytesConverter<T> bytesConverter, int baseId) {
+        this.trees = trees;
+        this.valueDivide = valueDivide;
+        this.accuOffset = accuOffset;
+        this.bytesConvert = bytesConverter;
+        this.baseId = baseId;
+    }
+
+
+    @Override
+    public int getMinId() {
+        if (trees.isEmpty()) return -1;
+        return trees.get(0).getMinId() + baseId;
+    }
+
+    @Override
+    public int getMaxId() {
+        if (trees.isEmpty()) return -1;
+        int index = trees.size() - 1;
+        int id = accuOffset.get(index) + trees.get(index).getMaxId() + baseId;
+        return id;
+    }
+
+    @Override
+    public int getSizeOfId() {
+        if (trees.isEmpty()) return -1;
+        int maxOffset = accuOffset.get(accuOffset.size() - 1);
+        TrieDictionary<T> lastTree = trees.get(trees.size() - 1);
+        int sizeOfId = BytesUtil.sizeForValue(baseId + maxOffset + lastTree.getMaxId() + 1);
+        return sizeOfId;
+    }
+
+    @Override
+    public int getSizeOfValue() {
+        int maxValue = -1;
+        for (TrieDictionary<T> tree : trees)
+            maxValue = Math.max(maxValue, tree.getSizeOfValue());
+        return maxValue;
+    }
+
+    //value --> id
+    @Override
+    protected int getIdFromValueImpl(T value, int roundingFlag)
+            throws IllegalArgumentException {
+        byte[] valueBytes = bytesConvert.convertToBytes(value);
+        return getIdFromValueBytesImpl(valueBytes, 0, valueBytes.length, roundingFlag);
+    }
+
+
+    //id = tree_inner_offset + accumulate_offset + baseId
+    @Override
+    protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag)
+            throws IllegalArgumentException {
+
+        //long startTime = System.currentTimeMillis();
+        ByteArray search = new ByteArray(value, offset, len);
+        //copyTime.addAndGet(System.currentTimeMillis() - startTime);
+        int index = findIndexByValue(search);
+        //int index = findIndexByValue(value);
+        //binarySearchTime.addAndGet(System.currentTimeMillis() - startTime);
+        if (index < 0) {
+            //System.out.println("value divide:"+valueDivide.size()+" "+valueDivide);
+            throw new IllegalArgumentException("Tree Not Found. index < 0.Value:" + new String(Arrays.copyOfRange(value, offset, len)));
+        }
+        TrieDictionary<T> tree = trees.get(index);
+        //getValueIndexTime.addAndGet(System.currentTimeMillis() - startTime);
+        //startTime = System.currentTimeMillis();
+        int id = tree.getIdFromValueBytes(value, offset, len, roundingFlag);
+        id = id + accuOffset.get(index);
+        id += baseId;
+        //getValueTime.addAndGet(System.currentTimeMillis() - startTime);
+        return id;
+    }
+
+    //id --> value
+    @Override
+    protected T getValueFromIdImpl(int id) throws IllegalArgumentException {
+        //System.out.println("here");
+        byte[] data = getValueBytesFromIdImpl(id);
+        if (data != null) {
+            return bytesConvert.convertFromBytes(data, 0, data.length);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset)
+            throws IllegalArgumentException {
+        //long startTime = System.currentTimeMillis();
+        int index = findIndexById(id);
+        int treeInnerOffset = getTreeInnerOffset(id, index);
+        TrieDictionary<T> tree = trees.get(index);
+        //getValueIndexTime2.addAndGet(System.currentTimeMillis() - startTime);
+        //startTime = System.currentTimeMillis();
+        int size = tree.getValueBytesFromIdImpl(treeInnerOffset, returnValue, offset);
+        //getValueTime2.addAndGet(System.currentTimeMillis() - startTime);
+        return size;
+    }
+
+
+    @Override
+    protected byte[] getValueBytesFromIdImpl(int id) throws IllegalArgumentException {
+        int index = findIndexById(id); //lower bound
+        if (index < 0) {
+            throw new IllegalArgumentException("Tree Not Found. index < 0");
+        }
+        int treeInnerOffset = getTreeInnerOffset(id, index);
+        TrieDictionary<T> tree = trees.get(index);
+        byte[] result = tree.getValueBytesFromId(treeInnerOffset);
+        return result;
+    }
+
+
+    private int getTreeInnerOffset(int id, int index) {
+        id -= baseId;
+        id = id - accuOffset.get(index);
+        return id;
+    }
+
+    @Override
+    public void dump(PrintStream out) {
+        for (int i = 0; i < trees.size(); i++) {
+            System.out.println("----tree " + i + "--------");
+            trees.get(i).dump(out);
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        writeHead(out);
+        writeBody(out);
+    }
+
+    private void writeHead(DataOutput out) throws IOException {
+        ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+        DataOutputStream headOut = new DataOutputStream(byteBuf);
+        headOut.writeInt(baseId);
+        headOut.writeUTF(bytesConvert == null ? "" : bytesConvert.getClass().getName());
+        //write accuOffset
+        headOut.writeInt(accuOffset.size());
+        for (int i = 0; i < accuOffset.size(); i++)
+            headOut.writeInt(accuOffset.get(i));
+        //write valueDivide
+        headOut.writeInt(valueDivide.size());
+        for (int i = 0; i < valueDivide.size(); i++) {
+            ByteArray ba = valueDivide.get(i);
+            byte[] byteStr = ba.toBytes();
+            headOut.writeInt(byteStr.length);
+            headOut.write(byteStr);
+        }
+        //write tree size
+        headOut.writeInt(trees.size());
+        headOut.close();
+        byte[] head = byteBuf.toByteArray();
+        //output
+        out.writeInt(head.length);
+        out.write(head);
+    }
+
+
+    private void writeBody(DataOutput out) throws IOException {
+        for (int i = 0; i < trees.size(); i++) {
+            TrieDictionary<T> tree = trees.get(i);
+            tree.write(out);
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        try {
+            int headSize = in.readInt();
+            this.baseId = in.readInt();
+            String converterName = in.readUTF();
+            if (converterName.isEmpty() == false)
+                this.bytesConvert = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+            //init accuOffset
+            int accuSize = in.readInt();
+            this.accuOffset = new ArrayList<>();
+            for (int i = 0; i < accuSize; i++) {
+                accuOffset.add(in.readInt());
+            }
+            //init valueDivide
+            int valueDivideSize = in.readInt();
+            this.valueDivide = new ArrayList<>();
+            for (int i = 0; i < valueDivideSize; i++) {
+                int length = in.readInt();
+                byte[] buffer = new byte[length];
+                in.readFully(buffer);
+                valueDivide.add(new ByteArray(buffer, 0, buffer.length));
+            }
+            int treeSize = in.readInt();
+            this.trees = new ArrayList<>();
+            for (int i = 0; i < treeSize; i++) {
+                TrieDictionary<T> dict = new TrieDictionary<>();
+                dict.readFields(in);
+                trees.add(dict);
+            }
+        } catch (Exception e) {
+            if (e instanceof RuntimeException)
+                throw (RuntimeException) e;
+            else
+                throw new RuntimeException(e);
+        }
+
+    }
+
+    @Override
+    public boolean contains(Dictionary other) {
+        if (other.getSize() > this.getSize()) {
+            return false;
+        }
+
+        for (int i = other.getMinId(); i <= other.getMaxId(); ++i) {
+            T v = (T) other.getValueFromId(i);
+            if (!this.containsValue(v)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public List<TrieDictionary<T>> getTrees() {
+        return Collections.unmodifiableList(this.trees);
+    }
+
+    private boolean onlyOneTree() {
+        return trees.size() == 1;
+    }
+
+    private int findIndexByValue(T value) {
+        byte[] valueBytes = bytesConvert.convertToBytes(value);
+        return findIndexByValue(new ByteArray(valueBytes, 0, valueBytes.length));
+    }
+
+    private int findIndexByValue(ByteArray value) {
+        int index = lowerBound(value, new Comparator<ByteArray>() {
+            @Override
+            public int compare(ByteArray o1, ByteArray o2) {
+                return o1.compareTo(o2);
+            }
+        }, this.valueDivide);
+        return index;
+    }
+
+    private int findIndexById(Integer id) {
+        id -= baseId;
+        int index = lowerBound(id, new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o1.compareTo(o2);
+            }
+        }, this.accuOffset);
+        return index;
+    }
+
+
+    private static <K> int lowerBound(K lookfor, Comparator<K> comparator, ArrayList<K> list) {
+        if (list == null || list.isEmpty())
+            return 0; //return the first tree
+        int left = 0;
+        int right = list.size() - 1;
+        int mid = 0;
+        boolean found = false;
+        int comp = 0;
+        while (!found && left <= right) {
+            mid = left + (right - left) / 2;
+            comp = comparator.compare(lookfor, list.get(mid));
+            if (comp < 0)
+                right = mid - 1;
+            else if (comp > 0)
+                left = mid + 1;
+            else
+                found = true;
+        }
+        if (found) {
+            //System.out.println("look for:"+lookfor+" index:"+mid);
+            return mid;
+        } else {
+            //System.out.println("look for:"+lookfor+" index:"+Math.max(left,right));
+            return Math.min(left, right);  //value may be bigger than the right tree
+        }
+    }
+
+    public static void main(String[] args) {
+        /*ArrayList<Integer> list = new ArrayList<>();
+        list.add(3);
+        list.add(10);
+        list.add(15);
+        Comparator<Integer> comp = new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o1.compareTo(o2);
+            }
+        };
+        int[] nums = {-1,0,1,2,3,4,13,15,16};
+        for(int i : nums){
+            System.out.println("found value:"+i+" index:"+lowerBound(i,comp,list));
+        }*/
+        ArrayList<String> list = new ArrayList<>();
+        list.add("\u4e00");
+        list.add("\u4e8c");
+        list.add("\u4e09");
+        list.add("");
+        list.add("part");
+        list.add("par");
+        list.add("partition");
+        list.add("party");
+        list.add("parties");
+        list.add("paint");
+        Collections.sort(list);
+        for (String str : list) {
+            System.out.println("found value:" + str + " index:" + lowerBound(str, new Comparator<String>() {
+                @Override
+                public int compare(String o1, String o2) {
+                    return o1.compareTo(o2);
+                }
+            }, list));
+        }
+        //System.out.println(BytesUtil.safeCompareBytes("\u4e8c".getBytes(),"\u4e09".getBytes()));
+    }
+
+    public BytesConverter<T> getBytesConvert() {
+        return bytesConvert;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
new file mode 100755
index 0000000..3c03c08
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+
+
+public class TrieDictionaryForestBuilder<T> {
+
+    public static int MaxTrieTreeSize = 1024 * 1024;//1M
+
+    private BytesConverter<T> bytesConverter;
+
+    private int curTreeSize = 0;
+
+    private TrieDictionaryBuilder<T> trieBuilder;
+
+    private ArrayList<TrieDictionary<T>> trees = new ArrayList<>();
+
+    private ArrayList<ByteArray> valueDivide = new ArrayList<>(); //find tree
+
+    private ArrayList<Integer> accuOffset = new ArrayList<>();  //find tree
+
+    private ByteArray previousValue = null;  //value use for remove duplicate
+
+    private static final Logger logger = LoggerFactory.getLogger(TrieDictionaryForestBuilder.class);
+
+    private int baseId;
+
+    private int curOffset;
+
+
+    public TrieDictionaryForestBuilder(BytesConverter<T> bytesConverter) {
+        this(bytesConverter, 0);
+    }
+
+    public TrieDictionaryForestBuilder(BytesConverter<T> bytesConverter, int baseId) {
+        this.bytesConverter = bytesConverter;
+        this.trieBuilder = new TrieDictionaryBuilder<T>(bytesConverter);
+        this.baseId = baseId;
+        curOffset = 0;
+        //stringComparator = new ByteComparator<>(new StringBytesConverter());
+    }
+
+    public void addValue(T value) {
+        if (value == null) return;
+        byte[] valueBytes = bytesConverter.convertToBytes(value);
+        addValue(new ByteArray(valueBytes, 0, valueBytes.length));
+    }
+
+    public void addValue(byte[] value) {
+        if (value == null) return;
+        ByteArray array = new ByteArray(value, 0, value.length);
+        addValue(array);
+    }
+
+    public void addValue(ByteArray value) {
+        //System.out.println("value length:"+value.length);
+        if (value == null) return;
+        if (previousValue == null) {
+            previousValue = value;
+        } else {
+            int comp = previousValue.compareTo(value);
+            if (comp == 0) return; //duplicate value
+            if (comp > 0) {
+                //logger.info("values not in ascending order");
+                //System.out.println(".");
+            }
+        }
+        this.trieBuilder.addValue(value.array());
+        previousValue = value;
+        this.curTreeSize += value.length();
+        if (curTreeSize >= MaxTrieTreeSize) {
+            TrieDictionary<T> tree = trieBuilder.build(0);
+            addTree(tree);
+            reset();
+        }
+    }
+
+    public TrieDictionaryForest<T> build() {
+        if (curTreeSize != 0) {  //last tree
+            TrieDictionary<T> tree = trieBuilder.build(0);
+            addTree(tree);
+            reset();
+        }
+        TrieDictionaryForest<T> forest = new TrieDictionaryForest<T>(this.trees,
+                this.valueDivide, this.accuOffset, this.bytesConverter, baseId);
+        return forest;
+    }
+
+    private void addTree(TrieDictionary<T> tree) {
+        trees.add(tree);
+        int minId = tree.getMinId();
+        accuOffset.add(curOffset);
+        byte[] valueBytes = tree.getValueBytesFromId(minId);
+        valueDivide.add(new ByteArray(valueBytes, 0, valueBytes.length));
+        curOffset += (tree.getMaxId() + 1);
+        //System.out.println(" curOffset:"+ curOffset);
+    }
+
+    private void reset() {
+        curTreeSize = 0;
+        trieBuilder = new TrieDictionaryBuilder<T>(bytesConverter);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java
new file mode 100755
index 0000000..624d6ba
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.dict;
+
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by xiefan on 16-10-26.
+ */
+
+public class TrieDictionaryForestTest {
+
+
+    @Test
+    public void testBasicFound() {
+        ArrayList<String> strs = new ArrayList<String>();
+        strs.add("part");
+        strs.add("par");
+        strs.add("partition");
+        strs.add("party");
+        strs.add("parties");
+        strs.add("paint");
+        Collections.sort(strs);
+        int baseId = 0;
+        TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId);
+        TrieDictionaryForest<String> dict = builder.build();
+        dict.dump(System.out);
+        int expectId = baseId;
+        for (String s : strs) {
+            System.out.println("value:" + s + "  expect id:" + expectId);
+            assertEquals(expectId, dict.getIdFromValue(s));
+            expectId++;
+        }
+        System.out.println("test ok");
+    }
+
+    @Test  //one string one tree
+    public void testMultiTree() {
+        ArrayList<String> strs = new ArrayList<String>();
+        strs.add("part");
+        strs.add("par");
+        strs.add("partition");
+        strs.add("party");
+        strs.add("parties");
+        strs.add("paint");
+        strs.add("\u4e00\u4e8c\u4e09");  //Chinese test
+        strs.add("\u56db\u4e94\u516d");
+        strs.add("");
+        Collections.sort(strs, new ByteComparator<String>(new StringBytesConverter()));
+        int baseId = 5;
+        int maxTreeSize = 0;
+        TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId, maxTreeSize);
+        TrieDictionaryForest<String> dict = builder.build();
+        dict.dump(System.out);
+        assertEquals(strs.size(), dict.getTrees().size());
+        int expectId = baseId;
+        for (String s : strs) {
+            System.out.println("value:" + s + "  expect id:" + expectId);
+            assertEquals(expectId, dict.getIdFromValue(s));
+            expectId++;
+        }
+        System.out.println("test ok");
+    }
+
+    public void duplicateDataTest() {
+        //todo
+    }
+
+    @Test
+    public void testBigDataSet() {
+        //h=generate data
+        ArrayList<String> strs = new ArrayList<>();
+        Iterator<String> it = new RandomStrings(100 * 10000).iterator();
+        int totalSize = 0;
+        final StringBytesConverter converter = new StringBytesConverter();
+        while (it.hasNext()) {
+            String str = it.next();
+            byte[] data = converter.convertToBytes(str);
+            if (data != null) {
+                totalSize += data.length;
+            }
+            strs.add(str);
+        }
+        Collections.sort(strs);
+        int baseId = 20;
+        int maxTreeSize = totalSize / 10;
+        System.out.println("data size:" + totalSize / 1024 + "KB  max tree size:" + maxTreeSize / 1024 + "KB");
+        //create the answer set
+        Map<String, Integer> idMap = rightIdMap(baseId, strs);
+        //build tree
+        TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId, maxTreeSize);
+        TrieDictionaryForest<String> dict = builder.build();
+        System.out.println("tree num:" + dict.getTrees().size());
+        //check
+        for (Map.Entry<String, Integer> entry : idMap.entrySet()) {
+            //System.out.println("my id:"+dict.getIdFromValue(entry.getKey())+" right id:"+entry.getValue());
+            assertEquals(0, dict.getIdFromValue(entry.getKey()) - entry.getValue());
+            assertEquals(entry.getKey(), dict.getValueFromId(entry.getValue()));
+        }
+    }
+
+    @Test
+    public void partOverflowTest() {
+        ArrayList<String> str = new ArrayList<String>();
+        // str.add("");
+        str.add("part");
+        str.add("par");
+        str.add("partition");
+        str.add("party");
+        str.add("parties");
+        str.add("paint");
+        String longStr = "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
+                + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk";
+        System.out.println("The length of the long string is " + longStr.length());
+        str.add(longStr);
+
+        str.add("zzzzzz" + longStr);// another long string
+        int baseId = 10;
+        int maxSize = 100 * 1024 * 1024;
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId, maxSize);
+        TrieDictionaryForest<String> dict = b.build();
+        TreeSet<String> set = new TreeSet<String>();
+        for (String s : str) {
+            set.add(s);
+        }
+        // test basic id<==>value
+        Iterator<String> it = set.iterator();
+        int id = 0;
+        int previousId = -1;
+        for (; it.hasNext(); id++) {
+            String value = it.next();
+
+            // in case of overflow parts, there exist interpolation nodes
+            // they exist to make sure that any node's part is shorter than 255
+            int actualId = dict.getIdFromValue(value);
+            assertTrue(actualId >= id);
+            assertTrue(actualId > previousId);
+            previousId = actualId;
+
+            assertEquals(value, dict.getValueFromId(actualId));
+        }
+    }
+
+    @Test
+    public void notFoundTest() {
+        ArrayList<String> str = new ArrayList<String>();
+        str.add("part");
+        str.add("par");
+        str.add("partition");
+        str.add("party");
+        str.add("parties");
+        str.add("paint");
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+
+        ArrayList<String> notFound = new ArrayList<String>();
+        notFound.add("");
+        notFound.add("p");
+        notFound.add("pa");
+        notFound.add("pb");
+        notFound.add("parti");
+        notFound.add("partz");
+        notFound.add("partyz");
+
+        testStringDictionary(str, notFound);
+    }
+
+
+    @Test
+    public void dictionaryContainTest() {
+        ArrayList<String> str = new ArrayList<String>();
+        str.add("part");
+        str.add("part"); // meant to be dup
+        str.add("par");
+        str.add("partition");
+        str.add("party");
+        str.add("parties");
+        str.add("paint");
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+        int baseId = new Random().nextInt(100);
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId);
+        TrieDictionaryForest<String> dict = b.build();
+        str.add("py");
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+        b = newDictBuilder(str, baseId);
+        baseId = new Random().nextInt(100);
+        TrieDictionaryForest<String> dict2 = b.build();
+
+        assertEquals(true, dict2.contains(dict));
+        assertEquals(false, dict.contains(dict2));
+    }
+
+    @Test
+    public void englishWordsTest() throws Exception {
+        InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
+        ArrayList<String> str = loadStrings(is);
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+        testStringDictionary(str, null);
+    }
+
+    @Test
+    public void categoryNamesTest() throws Exception {
+        InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
+        ArrayList<String> str = loadStrings(is);
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+        testStringDictionary(str, null);
+    }
+
+    @Test
+    public void serializeTest() {
+        ArrayList<String> testData = getTestData(10);
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(testData, 10, 0);
+        TrieDictionaryForest<String> dict = b.build();
+        dict = testSerialize(dict);
+        dict.dump(System.out);
+        for (String str : testData) {
+            assertEquals(str, dict.getValueFromId(dict.getIdFromValue(str)));
+        }
+    }
+
+
+    private static TrieDictionaryForest<String> testSerialize(TrieDictionaryForest<String> dict) {
+        try {
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            DataOutputStream dataout = new DataOutputStream(bout);
+            dict.write(dataout);
+            dataout.close();
+            ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+            DataInputStream datain = new DataInputStream(bin);
+            TrieDictionaryForest<String> r = new TrieDictionaryForest<>();
+            //r.dump(System.out);
+            r.readFields(datain);
+            //r.dump(System.out);
+            datain.close();
+            return r;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /*@Test
+    public void getIdFromValueBytesTest() throws Exception{
+        String value = "\u4e00\u4e8c\u4e09";
+        BytesConverter<String> converter = new StringBytesConverter();
+        TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<>(converter,0);
+        b.addValue(value);
+        TrieDictionaryForest<String> dict = b.build();
+        dict.dump(System.out);
+        byte[] data = converter.convertToBytes(value);
+        int id = dict.getIdFromValueBytes(data,0,data.length);
+
+    }*/
+
+    //benchmark
+    @Deprecated
+    public void memoryUsageBenchmarkTest() throws Exception {
+        //create data
+        ArrayList<String> testData = getTestData((int) (Integer.MAX_VALUE * 0.8 / 640));
+        int testTimes = 1;
+        System.out.println("start memory:" + Runtime.getRuntime().maxMemory());
+        System.out.println("start memory:" + Runtime.getRuntime().totalMemory());
+        for (int i = 0; i < testTimes; i++) {
+            long start = MemoryBudgetController.gcAndGetSystemAvailMB();
+            TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<>(new StringBytesConverter());
+            for (String str : testData)
+                b.addValue(str);
+            long end = MemoryBudgetController.gcAndGetSystemAvailMB();
+            System.out.println("object trie memory usage:" + (end - start) + "MB");
+            System.out.println("start memory:" + Runtime.getRuntime().maxMemory());
+            System.out.println("start memory:" + Runtime.getRuntime().totalMemory());
+            /*System.out.println(b == null);
+            startMemUse = getSystemCurUsedMemory();
+            TrieDictionary<String> dict = b.build(0);
+            memUse = getSystemCurUsedMemory();
+            System.out.println("array trie memory usage:"+(memUse-startMemUse)/(1024*1024)+"MB");
+            System.out.println(b == null );
+            System.out.println(dict == null);*/
+        }
+
+
+    }
+
+    @Deprecated
+    private long getSystemCurUsedMemory() throws Exception {
+        System.gc();
+        Thread.currentThread().sleep(1000);
+        long totalMem = Runtime.getRuntime().totalMemory();
+        long useMem = totalMem - Runtime.getRuntime().freeMemory();
+        return useMem;
+    }
+
+    //@Test
+    public void buildTimeBenchmarkTest() throws Exception {
+        //create data
+        ArrayList<String> testData = getTestData((int) (Integer.MAX_VALUE * 0.8 / 640));
+        //build time compare
+        int testTimes = 5;
+        long oldDictTotalBuildTime = 0;
+        long newDictTotalBuildTime = 0;
+
+        //old dict
+        System.gc();
+        Thread.currentThread().sleep(1000);
+        for (int i = 0; i < testTimes; i++) {
+            int keep = 0;
+            long startTime = System.currentTimeMillis();
+            TrieDictionaryBuilder<String> oldTrieBuilder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+            for (String str : testData)
+                oldTrieBuilder.addValue(str);
+            TrieDictionary<String> oldDict = oldTrieBuilder.build(0);
+            keep |= oldDict.getIdFromValue(testData.get(0));
+            oldDictTotalBuildTime += (System.currentTimeMillis() - startTime);
+            System.out.println("times:" + i);
+        }
+
+        //new dict
+        System.gc();
+        Thread.currentThread().sleep(1000);
+        for (int i = 0; i < testTimes; i++) {
+            int keep = 0;
+            long startTime = System.currentTimeMillis();
+            BytesConverter<String> converter = new StringBytesConverter();
+            TrieDictionaryForestBuilder<String> newTrieBuilder = new TrieDictionaryForestBuilder<String>(converter, 0);
+            for (String str : testData)
+                newTrieBuilder.addValue(str);
+            TrieDictionaryForest<String> newDict = newTrieBuilder.build();
+            keep |= newDict.getIdFromValue(testData.get(0));
+            newDictTotalBuildTime += (System.currentTimeMillis() - startTime);
+            System.out.println("times:" + i);
+        }
+
+
+        System.out.println("compare build time.  Old trie : " + oldDictTotalBuildTime / 1000.0 + "s.New trie : " + newDictTotalBuildTime / 1000.0 + "s");
+    }
+
+
+    @Test
+    public void queryTimeBenchmarkTest() throws Exception {
+        int count = (int) (Integer.MAX_VALUE * 0.8 / 640);
+        //int count = (int) (2);
+        benchmarkStringDictionary(new RandomStrings(count));
+    }
+
+
+    private void evaluateDataSize(ArrayList<String> list) {
+        long size = 0;
+        for (String str : list)
+            size += str.getBytes().length;
+        System.out.println("test data size : " + size / (1024 * 1024) + " MB");
+    }
+
+    private void evaluateDataSize(int count) {
+        RandomStrings rs = new RandomStrings(count);
+        Iterator<String> itr = rs.iterator();
+        long bytesCount = 0;
+        while (itr.hasNext())
+            bytesCount += itr.next().getBytes().length;
+        System.out.println("test data size : " + bytesCount / (1024 * 1024) + " MB");
+    }
+
+    private static void benchmarkStringDictionary(Iterable<String> str) throws IOException {
+        //System.out.println("test values:");
+        Iterator<String> itr = str.iterator();
+        ArrayList<String> testData = new ArrayList<>();
+        while (itr.hasNext())
+            testData.add(itr.next());
+        Collections.sort(testData);
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(testData, 0);
+        TrieDictionaryForest<String> dict = b.build();
+        System.out.println("tree size:" + dict.getTrees().size());
+        BytesConverter<String> converter = new StringBytesConverter();
+        TreeSet<String> set = new TreeSet<String>();
+        for (String s : testData) {
+            set.add(s);
+        }
+        //System.out.println("print set");
+        //System.out.println(set);
+        //dict.dump(System.out);
+        // prepare id==>value array and value==>id map
+        HashMap<String, Integer> map = new HashMap<String, Integer>();
+        String[] strArray = new String[set.size()];
+        byte[][] array = new byte[set.size()][];
+        Iterator<String> it = set.iterator();
+        for (int id = 0; it.hasNext(); id++) {
+            String value = it.next();
+            map.put(value, id);
+            strArray[id] = value;
+            //array[id] = value.getBytes("UTF-8");
+            array[id] = converter.convertToBytes(value);
+        }
+
+
+        // System.out.println("Dict size in bytes:  " +
+        //MemoryUtil.deepMemoryUsageOf(dict));
+        // System.out.println("Map size in bytes:   " +
+        // MemoryUtil.deepMemoryUsageOf(map));
+        // System.out.println("Array size in bytes: " +
+        // MemoryUtil.deepMemoryUsageOf(strArray));
+
+        // warm-up, said that code only got JIT after run 1k-10k times,
+        // following jvm options may help
+        // -XX:CompileThreshold=1500
+        // -XX:+PrintCompilation
+        System.out.println("Benchmark awaitig...");
+        benchmark("Warm up", dict, set, map, strArray, array);
+        benchmark("Benchmark", dict, set, map, strArray, array);
+    }
+
+    private static int benchmark(String msg, TrieDictionaryForest<String> dict, TreeSet<String> set, HashMap<String, Integer> map, String[] strArray, byte[][] array) {
+        int n = set.size();
+        int times = Math.max(10 * 1000 * 1000 / n, 1); // run 10 million lookups
+        int keep = 0; // make sure JIT don't OPT OUT function calls under test
+        byte[] valueBytes = new byte[dict.getSizeOfValue()];
+        long start;
+
+        // benchmark value==>id, via HashMap
+        System.out.println(msg + " HashMap lookup value==>id");
+        start = System.currentTimeMillis();
+        for (int i = 0; i < times; i++) {
+            for (int j = 0; j < n; j++) {
+                keep |= map.get(strArray[j]);
+            }
+        }
+        long timeValueToIdByMap = System.currentTimeMillis() - start;
+        System.out.println(timeValueToIdByMap);
+
+        // benchmark value==>id, via Dict
+        System.out.println(msg + " Dictionary lookup value==>id");
+        //dict.dump(System.out);
+
+        start = System.currentTimeMillis();
+        for (int i = 0; i < times; i++) {
+            for (int j = 0; j < n; j++) {
+                //System.out.println("looking for value:"+new String(array[j]));
+                keep |= dict.getIdFromValueBytes(array[j], 0, array[j].length);
+            }
+        }
+        long timeValueToIdByDict = System.currentTimeMillis() - start;
+        System.out.println(timeValueToIdByDict);
+        /*System.out.println("detail time.  get index time"+dict.getValueIndexTime.get()+" get value time"+
+        dict.getValueTime.get() +"  binary search time:"+dict.binarySearchTime.get() + " copy time:"+
+        dict.copyTime.get());*/
+
+        // benchmark id==>value, via Array
+        System.out.println(msg + " Array lookup id==>value");
+        start = System.currentTimeMillis();
+        for (int i = 0; i < times; i++) {
+            for (int j = 0; j < n; j++) {
+                keep |= strArray[j].length();
+            }
+        }
+        long timeIdToValueByArray = System.currentTimeMillis() - start;
+        System.out.println(timeIdToValueByArray);
+
+        // benchmark id==>value, via Dict
+        System.out.println(msg + " Dictionary lookup id==>value");
+        start = System.currentTimeMillis();
+        for (int i = 0; i < times; i++) {
+            for (int j = 0; j < n; j++) {
+                keep |= dict.getValueBytesFromId(j, valueBytes, 0);
+            }
+        }
+        long timeIdToValueByDict = System.currentTimeMillis() - start;
+        System.out.println(timeIdToValueByDict);
+        /*System.out.println("detail time.  get index time"+dict.getValueIndexTime2.get()+" get value time"+
+                dict.getValueTime2.get());*/
+
+        return keep;
+    }
+
+    private static void testStringDictionary(ArrayList<String> str, ArrayList<String> notFound) {
+        int baseId = new Random().nextInt(100);
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId, 2);
+        TrieDictionaryForest<String> dict = b.build();
+        //dict.dump(System.out);
+        TreeSet<String> set = new TreeSet<String>();
+        for (String s : str) {
+            set.add(s);
+        }
+
+        // test serialize
+        //dict = testSerialize(dict);
+
+        // test basic id<==>value
+        Iterator<String> it = set.iterator();
+        int id = baseId;
+        for (; it.hasNext(); id++) {
+            String value = it.next();
+            // System.out.println("checking " + id + " <==> " + value);
+
+            assertEquals(id, dict.getIdFromValue(value));
+            assertEquals(value, dict.getValueFromId(id));
+        }
+
+        //test not found value
+        if (notFound != null) {
+            for (String s : notFound) {
+                try {
+                    int nullId = dict.getIdFromValue(s);
+                    System.out.println("null value id:" + nullId);
+                    fail("For not found value '" + s + "', IllegalArgumentException is expected");
+                } catch (IllegalArgumentException e) {
+                    // good
+                }
+            }
+        }
+        int maxId = dict.getMaxId();
+        int[] notExistIds = {-10, -20, -Integer.MIN_VALUE, -Integer.MAX_VALUE, maxId + 1, maxId + 2};
+        for (Integer i : notExistIds) {
+            try {
+                dict.getValueFromId(i);
+                fail("For not found id '" + i + "', IllegalArgumentException is expected");
+            } catch (IllegalArgumentException e) {
+                // good
+            }
+        }
+
+        // test null value
+        int nullId = dict.getIdFromValue(null);
+        assertNull(dict.getValueFromId(nullId));
+        int nullId2 = dict.getIdFromValueBytes(null, 0, 0);
+        assertEquals(dict.getValueBytesFromId(nullId2, null, 0), -1);
+        assertEquals(nullId, nullId2);
+    }
+
+    private Map<String, Integer> rightIdMap(int baseId, ArrayList<String> strs) {
+        Map<String, Integer> result = new HashMap<>();
+        int expectId = baseId;
+        for (String str : strs) {
+            result.put(str, expectId);
+            expectId++;
+        }
+        return result;
+    }
+
+    private static TrieDictionaryForestBuilder<String> newDictBuilder(Iterable<String> strs, int baseId) {
+        TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), baseId);
+        for (String s : strs)
+            b.addValue(s);
+        return b;
+    }
+
+    private static TrieDictionaryForestBuilder<String> newDictBuilder(Iterable<String> strs, int baseId, int treeSize) {
+        TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), baseId);
+        TrieDictionaryForestBuilder.MaxTrieTreeSize = treeSize;
+        for (String s : strs)
+            b.addValue(s);
+        return b;
+    }
+
+    private static class RandomStrings implements Iterable<String> {
+        final private int size;
+
+        public RandomStrings(int size) {
+            this.size = size;
+            //System.out.println("size = " + size);
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return new Iterator<String>() {
+                Random rand = new Random(System.currentTimeMillis());
+                int i = 0;
+
+                @Override
+                public boolean hasNext() {
+                    return i < size;
+                }
+
+                @Override
+                public String next() {
+                    if (hasNext() == false)
+                        throw new NoSuchElementException();
+
+                    i++;
+                    //if (i % 1000000 == 0)
+                    //System.out.println(i);
+
+                    return nextString();
+                }
+
+                private String nextString() {
+                    StringBuffer buf = new StringBuffer();
+                    for (int i = 0; i < 64; i++) {
+                        int v = rand.nextInt(16);
+                        char c;
+                        if (v >= 0 && v <= 9)
+                            c = (char) ('0' + v);
+                        else
+                            c = (char) ('a' + v - 10);
+                        buf.append(c);
+                    }
+                    return buf.toString();
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+    }
+
+    private static ArrayList<String> loadStrings(InputStream is) throws Exception {
+        ArrayList<String> r = new ArrayList<String>();
+        BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+        try {
+            String word;
+            while ((word = reader.readLine()) != null) {
+                word = word.trim();
+                if (word.isEmpty() == false)
+                    r.add(word);
+            }
+        } finally {
+            reader.close();
+            is.close();
+        }
+        return r;
+    }
+
+
+    private ArrayList<String> getTestData(int count) {
+        RandomStrings rs = new RandomStrings(count);
+        Iterator<String> itr = rs.iterator();
+        ArrayList<String> testData = new ArrayList<>();
+        while (itr.hasNext())
+            testData.add(itr.next());
+        Collections.sort(testData, new ByteComparator<String>(new StringBytesConverter()));
+        evaluateDataSize(testData);
+        return testData;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java
new file mode 100644
index 0000000..dfc6b2c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ */
+public class FactDistinctColumnPartitioner2 extends Partitioner<SelfDefineSortableKey, Text> {
+    private Configuration conf;
+
+    @Override
+    public int getPartition(SelfDefineSortableKey key, Text value, int numReduceTasks) {
+
+        if (key.getText().getBytes()[0] == FactDistinctHiveColumnsMapper2.MARK_FOR_HLL) {
+            // the last reducer is for merging hll
+            return numReduceTasks - 1;
+        } else if (key.getText().getBytes()[0] == FactDistinctHiveColumnsMapper2.MARK_FOR_PARTITION_COL) {
+            // the last reducer is for merging hll
+            return numReduceTasks - 2;
+        } else {
+            int colIndex = BytesUtil.readUnsigned(key.getText().getBytes(), 0, 1);
+            return colIndex;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java
new file mode 100644
index 0000000..6ff07f0
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.KylinReducer;
+
+import java.io.IOException;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctColumnsCombiner2 extends KylinReducer<SelfDefineSortableKey, Text, Text, Text> {
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+    }
+
+    @Override
+    public void reduce(SelfDefineSortableKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+        // for hll, each key only has one output, no need to do local combine;
+        // for normal col, values are empty text
+        context.write(key.getText(), values.iterator().next());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java
new file mode 100644
index 0000000..4d26402
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ */
+public class FactDistinctColumnsJob2 extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsJob2.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_STATISTICS_ENABLED);
+            options.addOption(OPTION_STATISTICS_OUTPUT);
+            options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+            String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
+            String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
+            String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+
+            // ----------------------------------------------------------------------------
+            // add metadata to distributed cache
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
+
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+            logger.info("Starting: " + job.getJobName());
+            logger.info("using FactDistinctColumnsJob2");
+
+            setJobClasspath(job, cube.getConfig());
+
+            CubeSegment segment = cube.getSegmentById(segmentID);
+            if (segment == null) {
+                logger.error("Failed to find {} in cube {}", segmentID, cube);
+                System.out.println("Failed to find {} in cube {} " + segmentID + "," + cube);
+                for (CubeSegment s : cube.getSegments()) {
+                    logger.error(s.getName() + " with status " + s.getStatus());
+                    System.out.println(s.getName() + " with status " + s.getStatus());
+                }
+                throw new IllegalStateException();
+            } else {
+                logger.info("Found segment: " + segment);
+                System.out.println("Found segment " + segment);
+            }
+            setupMapper(cube.getSegmentById(segmentID));
+            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
+
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            return waitForCompletion(job);
+
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+
+    }
+
+    private void setupMapper(CubeSegment cubeSeg) throws IOException {
+        IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+        flatTableInputFormat.configureJob(job);
+
+        job.setMapperClass(FactDistinctHiveColumnsMapper2.class);
+        job.setCombinerClass(FactDistinctColumnsCombiner2.class);
+        job.setMapOutputKeyClass(SelfDefineSortableKey.class);
+        job.setMapOutputValueClass(Text.class);
+    }
+
+    private void setupReducer(Path output, int numberOfReducers) throws IOException {
+        job.setReducerClass(FactDistinctColumnsReducer.class);  //reducer do not need to change
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
+        job.setPartitionerClass(FactDistinctColumnPartitioner2.class);
+        job.setNumReduceTasks(numberOfReducers);
+
+        FileOutputFormat.setOutputPath(job, output);
+        job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+        deletePath(job.getConfiguration(), output);
+    }
+
+    public static void main(String[] args) throws Exception {
+        FactDistinctColumnsJob2 job = new FactDistinctColumnsJob2();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java
new file mode 100644
index 0000000..2e9a2dc
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ */
+public class FactDistinctColumnsMapperBase2<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
+
+    protected String cubeName;
+    protected CubeInstance cube;
+    protected CubeSegment cubeSeg;
+    protected CubeDesc cubeDesc;
+    protected long baseCuboidId;
+    protected List<TblColRef> factDictCols;
+    protected IMRTableInputFormat flatTableInputFormat;
+
+    protected Text outputKey = new Text();
+    protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+    protected Text outputValue = new Text();
+    protected int errorRecordCounter = 0;
+
+    protected CubeJoinedFlatTableEnrich intermediateTableDesc;
+    protected int[] dictionaryColumnIndex;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        bindCurrentConfiguration(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
+        cubeDesc = cube.getDescriptor();
+        baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
+
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+
+        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),  cubeDesc);
+        dictionaryColumnIndex = new int[factDictCols.size()];
+        for (int i = 0; i < factDictCols.size(); i++) {
+            TblColRef colRef = factDictCols.get(i);
+            int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+            dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+        }
+
+    }
+
+    protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
+
+        System.err.println("Insane record: " + Arrays.toString(record));
+        ex.printStackTrace(System.err);
+
+        errorRecordCounter++;
+        if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) {
+            if (ex instanceof IOException)
+                throw (IOException) ex;
+            else if (ex instanceof RuntimeException)
+                throw (RuntimeException) ex;
+            else
+                throw new RuntimeException("", ex);
+        }
+    }
+}