You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/23 09:09:01 UTC

[5/9] kylin git commit: KYLIN-2195 All code changes, ready for test

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
index 916c369..598865b 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
@@ -1,512 +1,512 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * Builds a dictionary using Trie structure. All values are taken in byte[] form
- * and organized in a Trie with ordering. Then numeric IDs are assigned in
- * sequence.
- * 
- * @author yangli9
- */
-public class TrieDictionaryBuilder<T> {
-    
-    private static final int _2GB = 2000000000;
-
-    public static class Node {
-        public byte[] part;
-        public boolean isEndOfValue;
-        public ArrayList<Node> children;
-
-        public int nValuesBeneath; // only present after stats()
-
-        Node(byte[] value, boolean isEndOfValue) {
-            reset(value, isEndOfValue);
-        }
-
-        Node(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
-            reset(value, isEndOfValue, children);
-        }
-
-        void reset(byte[] value, boolean isEndOfValue) {
-            reset(value, isEndOfValue, new ArrayList<Node>());
-        }
-
-        void reset(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
-            this.part = value;
-            this.isEndOfValue = isEndOfValue;
-            this.children = children;
-        }
-    }
-
-    public static interface Visitor {
-        void visit(Node n, int level);
-    }
-
-    // ============================================================================
-
-    private Node root;
-    private BytesConverter<T> bytesConverter;
-
-    public TrieDictionaryBuilder(BytesConverter<T> bytesConverter) {
-        this.root = new Node(new byte[0], false);
-        this.bytesConverter = bytesConverter;
-    }
-
-    public void addValue(T value) {
-        addValue(bytesConverter.convertToBytes(value));
-    }
-
-    // add a converted value (given in byte[] format), use with care, for internal only
-    void addValue(byte[] value) {
-        addValueR(root, value, 0);
-    }
-
-    private void addValueR(Node node, byte[] value, int start) {
-        // match the value part of current node
-        int i = 0, j = start;
-        int n = node.part.length, nn = value.length;
-        int comp = 0;
-        for (; i < n && j < nn; i++, j++) {
-            comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
-            if (comp != 0)
-                break;
-        }
-
-        if (j == nn) {
-            // if value fully matched within the current node
-            if (i == n) {
-                // if equals to current node, just mark end of value
-                node.isEndOfValue = true;
-            } else {
-                // otherwise, split the current node into two
-                Node c = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
-                node.reset(BytesUtil.subarray(node.part, 0, i), true);
-                node.children.add(c);
-            }
-            return;
-        }
-
-        // if partially matched the current, split the current node, add the new value, make a 3-way
-        if (i < n) {
-            Node c1 = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
-            Node c2 = new Node(BytesUtil.subarray(value, j, nn), true);
-            node.reset(BytesUtil.subarray(node.part, 0, i), false);
-            if (comp < 0) {
-                node.children.add(c1);
-                node.children.add(c2);
-            } else {
-                node.children.add(c2);
-                node.children.add(c1);
-            }
-            return;
-        }
-
-        // out matched the current, binary search the next byte for a child node to continue
-        byte lookfor = value[j];
-        int lo = 0;
-        int hi = node.children.size() - 1;
-        int mid = 0;
-        boolean found = false;
-        comp = 0;
-        while (!found && lo <= hi) {
-            mid = lo + (hi - lo) / 2;
-            comp = BytesUtil.compareByteUnsigned(lookfor, node.children.get(mid).part[0]);
-            if (comp < 0)
-                hi = mid - 1;
-            else if (comp > 0)
-                lo = mid + 1;
-            else
-                found = true;
-        }
-        if (found) {
-            // found a child node matching the first byte, continue in that child
-            addValueR(node.children.get(mid), value, j);
-        } else {
-            // otherwise, make the value a new child
-            Node c = new Node(BytesUtil.subarray(value, j, nn), true);
-            node.children.add(comp <= 0 ? mid : mid + 1, c);
-        }
-    }
-
-    public void traverse(Visitor visitor) {
-        traverseR(root, visitor, 0);
-    }
-
-    private void traverseR(Node node, Visitor visitor, int level) {
-        visitor.visit(node, level);
-        for (Node c : node.children)
-            traverseR(c, visitor, level + 1);
-    }
-
-    public void traversePostOrder(Visitor visitor) {
-        traversePostOrderR(root, visitor, 0);
-    }
-
-    private void traversePostOrderR(Node node, Visitor visitor, int level) {
-        for (Node c : node.children)
-            traversePostOrderR(c, visitor, level + 1);
-        visitor.visit(node, level);
-    }
-
-    public static class Stats {
-        public int nValues; // number of values in total
-        public int nValueBytesPlain; // number of bytes for all values
-                                     // uncompressed
-        public int nValueBytesCompressed; // number of values bytes in Trie
-                                          // (compressed)
-        public int maxValueLength; // size of longest value in bytes
-
-        // the trie is multi-byte-per-node
-        public int mbpn_nNodes; // number of nodes in trie
-        public int mbpn_trieDepth; // depth of trie
-        public int mbpn_maxFanOut; // the maximum no. children
-        public long mbpn_nChildLookups; // number of child lookups during lookup every value once
-        public long mbpn_nTotalFanOut; // the sum of fan outs during lookup every value once
-        public int mbpn_sizeValueTotal; // the sum of value space in all nodes
-        public int mbpn_sizeNoValueBytes; // size of field noValueBytes
-        public int mbpn_sizeNoValueBeneath; // size of field noValuesBeneath, depends on cardinality
-        public int mbpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array
-        public long mbpn_footprint; // MBPN footprint in bytes
-
-        // stats for one-byte-per-node as well, so there's comparison
-        public int obpn_sizeValue; // size of value per node, always 1
-        public int obpn_sizeNoValuesBeneath; // size of field noValuesBeneath, depends on cardinality
-        public int obpn_sizeChildCount; // size of field childCount, enables binary search among children
-        public int obpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array
-        public int obpn_nNodes; // no. nodes in OBPN trie
-        public long obpn_footprint; // OBPN footprint in bytes
-
-        public void print() {
-            PrintStream out = System.out;
-            out.println("============================================================================");
-            out.println("No. values:             " + nValues);
-            out.println("No. bytes raw:          " + nValueBytesPlain);
-            out.println("No. bytes in trie:      " + nValueBytesCompressed);
-            out.println("Longest value length:   " + maxValueLength);
-
-            // flatten trie footprint calculation, case of One-Byte-Per-Node
-            out.println("----------------------------------------------------------------------------");
-            out.println("OBPN node size:  " + (obpn_sizeValue + obpn_sizeNoValuesBeneath + obpn_sizeChildCount + obpn_sizeChildOffset) + " = " + obpn_sizeValue + " + " + obpn_sizeNoValuesBeneath + " + " + obpn_sizeChildCount + " + " + obpn_sizeChildOffset);
-            out.println("OBPN no. nodes:  " + obpn_nNodes);
-            out.println("OBPN trie depth: " + maxValueLength);
-            out.println("OBPN footprint:  " + obpn_footprint + " in bytes");
-
-            // flatten trie footprint calculation, case of Multi-Byte-Per-Node
-            out.println("----------------------------------------------------------------------------");
-            out.println("MBPN max fan out:       " + mbpn_maxFanOut);
-            out.println("MBPN no. child lookups: " + mbpn_nChildLookups);
-            out.println("MBPN total fan out:     " + mbpn_nTotalFanOut);
-            out.println("MBPN average fan out:   " + (double) mbpn_nTotalFanOut / mbpn_nChildLookups);
-            out.println("MBPN values size total: " + mbpn_sizeValueTotal);
-            out.println("MBPN node size:         " + (mbpn_sizeNoValueBytes + mbpn_sizeNoValueBeneath + mbpn_sizeChildOffset) + " = " + mbpn_sizeNoValueBytes + " + " + mbpn_sizeNoValueBeneath + " + " + mbpn_sizeChildOffset);
-            out.println("MBPN no. nodes:         " + mbpn_nNodes);
-            out.println("MBPN trie depth:        " + mbpn_trieDepth);
-            out.println("MBPN footprint:         " + mbpn_footprint + " in bytes");
-        }
-    }
-
-    /** out print some statistics of the trie and the dictionary built from it */
-    public Stats stats() {
-        // calculate nEndValueBeneath
-        traversePostOrder(new Visitor() {
-            @Override
-            public void visit(Node n, int level) {
-                n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
-                for (Node c : n.children)
-                    n.nValuesBeneath += c.nValuesBeneath;
-            }
-        });
-
-        // run stats
-        final Stats s = new Stats();
-        final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
-        traverse(new Visitor() {
-            @Override
-            public void visit(Node n, int level) {
-                if (n.isEndOfValue)
-                    s.nValues++;
-                s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
-                s.nValueBytesCompressed += n.part.length;
-                s.mbpn_nNodes++;
-                if (s.mbpn_trieDepth < level + 1)
-                    s.mbpn_trieDepth = level + 1;
-                if (n.children.size() > 0) {
-                    if (s.mbpn_maxFanOut < n.children.size())
-                        s.mbpn_maxFanOut = n.children.size();
-                    int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
-                    s.mbpn_nChildLookups += childLookups;
-                    s.mbpn_nTotalFanOut += childLookups * n.children.size();
-                }
-
-                if (level < lenAtLvl.size())
-                    lenAtLvl.set(level, n.part.length);
-                else
-                    lenAtLvl.add(n.part.length);
-                int lenSoFar = 0;
-                for (int i = 0; i <= level; i++)
-                    lenSoFar += lenAtLvl.get(i);
-                if (lenSoFar > s.maxValueLength)
-                    s.maxValueLength = lenSoFar;
-            }
-        });
-
-        // flatten trie footprint calculation, case of One-Byte-Per-Node
-        s.obpn_sizeValue = 1;
-        s.obpn_sizeNoValuesBeneath = BytesUtil.sizeForValue(s.nValues);
-        s.obpn_sizeChildCount = 1;
-        s.obpn_sizeChildOffset = 5; // MSB used as isEndOfValue flag
-        s.obpn_nNodes = s.nValueBytesCompressed; // no. nodes is the total number of compressed bytes in OBPN
-        s.obpn_footprint = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset);
-        while (true) { // minimize the offset size to match the footprint
-            long t = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset - 1);
-            if (BytesUtil.sizeForValue(t * 2) <= s.obpn_sizeChildOffset - 1) { // *2 because MSB of offset is used for isEndOfValue flag
-                s.obpn_sizeChildOffset--;
-                s.obpn_footprint = t;
-            } else
-                break;
-        }
-
-        // flatten trie footprint calculation, case of Multi-Byte-Per-Node
-        s.mbpn_sizeValueTotal = s.nValueBytesCompressed;
-        s.mbpn_sizeNoValueBytes = 1;
-        s.mbpn_sizeNoValueBeneath = BytesUtil.sizeForValue(s.nValues);
-        s.mbpn_sizeChildOffset = 5;
-        s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset);
-        while (true) { // minimize the offset size to match the footprint
-            long t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset - 1);
-            if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
-                s.mbpn_sizeChildOffset--;
-                s.mbpn_footprint = t;
-            } else
-                break;
-        }
-
-        return s;
-    }
-
-    /** out print trie for debug */
-    public void print() {
-        print(System.out);
-    }
-
-    public void print(final PrintStream out) {
-        traverse(new Visitor() {
-            @Override
-            public void visit(Node n, int level) {
-                try {
-                    for (int i = 0; i < level; i++)
-                        out.print("  ");
-                    out.print(new String(n.part, "UTF-8"));
-                    out.print(" - ");
-                    if (n.nValuesBeneath > 0)
-                        out.print(n.nValuesBeneath);
-                    if (n.isEndOfValue)
-                        out.print("*");
-                    out.print("\n");
-                } catch (UnsupportedEncodingException e) {
-                    e.printStackTrace();
-                }
-            }
-        });
-    }
-
-    private CompleteParts completeParts = new CompleteParts();
-
-    private class CompleteParts {
-        byte[] data = new byte[4096];
-        int current = 0;
-
-        public void append(byte[] part) {
-            while (current + part.length > data.length)
-                expand();
-
-            System.arraycopy(part, 0, data, current, part.length);
-            current += part.length;
-        }
-
-        public void withdraw(int size) {
-            current -= size;
-        }
-
-        public byte[] retrieve() {
-            return Arrays.copyOf(data, current);
-        }
-
-        private void expand() {
-            byte[] temp = new byte[2 * data.length];
-            System.arraycopy(data, 0, temp, 0, data.length);
-            data = temp;
-        }
-    }
-
-    // there is a 255 limitation of length for each node's part.
-    // we interpolate nodes to satisfy this when a node's part becomes
-    // too long(overflow)
-    private void checkOverflowParts(Node node) {
-        LinkedList<Node> childrenCopy = new LinkedList<Node>(node.children);
-        for (Node child : childrenCopy) {
-            if (child.part.length > 255) {
-                byte[] first255 = Arrays.copyOf(child.part, 255);
-
-                completeParts.append(node.part);
-                completeParts.append(first255);
-                byte[] visited = completeParts.retrieve();
-                this.addValue(visited);
-                completeParts.withdraw(255);
-                completeParts.withdraw(node.part.length);
-            }
-        }
-
-        completeParts.append(node.part); // by here the node.children may have been changed
-        for (Node child : node.children) {
-            checkOverflowParts(child);
-        }
-        completeParts.withdraw(node.part.length);
-    }
-
-    /**
-     * Flatten the trie into a byte array for a minimized memory footprint.
-     * Lookup remains fast. Cost is inflexibility to modify (becomes immutable).
-     * 
-     * Flattened node structure is HEAD + NODEs, for each node:
-     * - o byte, offset to child node, o = stats.mbpn_sizeChildOffset
-     *    - 1 bit, isLastChild flag, the 1st MSB of o
-     *    - 1 bit, isEndOfValue flag, the 2nd MSB of o
-     * - c byte, number of values beneath, c = stats.mbpn_sizeNoValueBeneath
-     * - 1 byte, number of value bytes
-     * - n byte, value bytes
-     */
-    public TrieDictionary<T> build(int baseId) {
-        byte[] trieBytes = buildTrieBytes(baseId);
-        TrieDictionary<T> r = new TrieDictionary<T>(trieBytes);
-        return r;
-    }
-
-    protected byte[] buildTrieBytes(int baseId) {
-        checkOverflowParts(this.root);
-
-        Stats stats = stats();
-        int sizeNoValuesBeneath = stats.mbpn_sizeNoValueBeneath;
-        int sizeChildOffset = stats.mbpn_sizeChildOffset;
-        
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * Builds a dictionary using Trie structure. All values are taken in byte[] form
+ * and organized in a Trie with ordering. Then numeric IDs are assigned in
+ * sequence.
+ * 
+ * @author yangli9
+ */
+public class TrieDictionaryBuilder<T> {
+    
+    private static final int _2GB = 2000000000;
+
+    public static class Node {
+        public byte[] part;
+        public boolean isEndOfValue;
+        public ArrayList<Node> children;
+
+        public int nValuesBeneath; // only present after stats()
+
+        Node(byte[] value, boolean isEndOfValue) {
+            reset(value, isEndOfValue);
+        }
+
+        Node(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
+            reset(value, isEndOfValue, children);
+        }
+
+        void reset(byte[] value, boolean isEndOfValue) {
+            reset(value, isEndOfValue, new ArrayList<Node>());
+        }
+
+        void reset(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
+            this.part = value;
+            this.isEndOfValue = isEndOfValue;
+            this.children = children;
+        }
+    }
+
+    public static interface Visitor {
+        void visit(Node n, int level);
+    }
+
+    // ============================================================================
+
+    private Node root;
+    private BytesConverter<T> bytesConverter;
+
+    public TrieDictionaryBuilder(BytesConverter<T> bytesConverter) {
+        this.root = new Node(new byte[0], false);
+        this.bytesConverter = bytesConverter;
+    }
+
+    public void addValue(T value) {
+        addValue(bytesConverter.convertToBytes(value));
+    }
+
+    // add a converted value (given in byte[] format), use with care, for internal only
+    void addValue(byte[] value) {
+        addValueR(root, value, 0);
+    }
+
+    private void addValueR(Node node, byte[] value, int start) {
+        // match the value part of current node
+        int i = 0, j = start;
+        int n = node.part.length, nn = value.length;
+        int comp = 0;
+        for (; i < n && j < nn; i++, j++) {
+            comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
+            if (comp != 0)
+                break;
+        }
+
+        if (j == nn) {
+            // if value fully matched within the current node
+            if (i == n) {
+                // if equals to current node, just mark end of value
+                node.isEndOfValue = true;
+            } else {
+                // otherwise, split the current node into two
+                Node c = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+                node.reset(BytesUtil.subarray(node.part, 0, i), true);
+                node.children.add(c);
+            }
+            return;
+        }
+
+        // if partially matched the current, split the current node, add the new value, make a 3-way
+        if (i < n) {
+            Node c1 = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+            Node c2 = new Node(BytesUtil.subarray(value, j, nn), true);
+            node.reset(BytesUtil.subarray(node.part, 0, i), false);
+            if (comp < 0) {
+                node.children.add(c1);
+                node.children.add(c2);
+            } else {
+                node.children.add(c2);
+                node.children.add(c1);
+            }
+            return;
+        }
+
+        // out matched the current, binary search the next byte for a child node to continue
+        byte lookfor = value[j];
+        int lo = 0;
+        int hi = node.children.size() - 1;
+        int mid = 0;
+        boolean found = false;
+        comp = 0;
+        while (!found && lo <= hi) {
+            mid = lo + (hi - lo) / 2;
+            comp = BytesUtil.compareByteUnsigned(lookfor, node.children.get(mid).part[0]);
+            if (comp < 0)
+                hi = mid - 1;
+            else if (comp > 0)
+                lo = mid + 1;
+            else
+                found = true;
+        }
+        if (found) {
+            // found a child node matching the first byte, continue in that child
+            addValueR(node.children.get(mid), value, j);
+        } else {
+            // otherwise, make the value a new child
+            Node c = new Node(BytesUtil.subarray(value, j, nn), true);
+            node.children.add(comp <= 0 ? mid : mid + 1, c);
+        }
+    }
+
+    public void traverse(Visitor visitor) {
+        traverseR(root, visitor, 0);
+    }
+
+    private void traverseR(Node node, Visitor visitor, int level) {
+        visitor.visit(node, level);
+        for (Node c : node.children)
+            traverseR(c, visitor, level + 1);
+    }
+
+    public void traversePostOrder(Visitor visitor) {
+        traversePostOrderR(root, visitor, 0);
+    }
+
+    private void traversePostOrderR(Node node, Visitor visitor, int level) {
+        for (Node c : node.children)
+            traversePostOrderR(c, visitor, level + 1);
+        visitor.visit(node, level);
+    }
+
+    public static class Stats {
+        public int nValues; // number of values in total
+        public int nValueBytesPlain; // number of bytes for all values
+                                     // uncompressed
+        public int nValueBytesCompressed; // number of values bytes in Trie
+                                          // (compressed)
+        public int maxValueLength; // size of longest value in bytes
+
+        // the trie is multi-byte-per-node
+        public int mbpn_nNodes; // number of nodes in trie
+        public int mbpn_trieDepth; // depth of trie
+        public int mbpn_maxFanOut; // the maximum no. children
+        public long mbpn_nChildLookups; // number of child lookups during lookup every value once
+        public long mbpn_nTotalFanOut; // the sum of fan outs during lookup every value once
+        public int mbpn_sizeValueTotal; // the sum of value space in all nodes
+        public int mbpn_sizeNoValueBytes; // size of field noValueBytes
+        public int mbpn_sizeNoValueBeneath; // size of field noValuesBeneath, depends on cardinality
+        public int mbpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array
+        public long mbpn_footprint; // MBPN footprint in bytes
+
+        // stats for one-byte-per-node as well, so there's comparison
+        public int obpn_sizeValue; // size of value per node, always 1
+        public int obpn_sizeNoValuesBeneath; // size of field noValuesBeneath, depends on cardinality
+        public int obpn_sizeChildCount; // size of field childCount, enables binary search among children
+        public int obpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array
+        public int obpn_nNodes; // no. nodes in OBPN trie
+        public long obpn_footprint; // OBPN footprint in bytes
+
+        public void print() {
+            PrintStream out = System.out;
+            out.println("============================================================================");
+            out.println("No. values:             " + nValues);
+            out.println("No. bytes raw:          " + nValueBytesPlain);
+            out.println("No. bytes in trie:      " + nValueBytesCompressed);
+            out.println("Longest value length:   " + maxValueLength);
+
+            // flatten trie footprint calculation, case of One-Byte-Per-Node
+            out.println("----------------------------------------------------------------------------");
+            out.println("OBPN node size:  " + (obpn_sizeValue + obpn_sizeNoValuesBeneath + obpn_sizeChildCount + obpn_sizeChildOffset) + " = " + obpn_sizeValue + " + " + obpn_sizeNoValuesBeneath + " + " + obpn_sizeChildCount + " + " + obpn_sizeChildOffset);
+            out.println("OBPN no. nodes:  " + obpn_nNodes);
+            out.println("OBPN trie depth: " + maxValueLength);
+            out.println("OBPN footprint:  " + obpn_footprint + " in bytes");
+
+            // flatten trie footprint calculation, case of Multi-Byte-Per-Node
+            out.println("----------------------------------------------------------------------------");
+            out.println("MBPN max fan out:       " + mbpn_maxFanOut);
+            out.println("MBPN no. child lookups: " + mbpn_nChildLookups);
+            out.println("MBPN total fan out:     " + mbpn_nTotalFanOut);
+            out.println("MBPN average fan out:   " + (double) mbpn_nTotalFanOut / mbpn_nChildLookups);
+            out.println("MBPN values size total: " + mbpn_sizeValueTotal);
+            out.println("MBPN node size:         " + (mbpn_sizeNoValueBytes + mbpn_sizeNoValueBeneath + mbpn_sizeChildOffset) + " = " + mbpn_sizeNoValueBytes + " + " + mbpn_sizeNoValueBeneath + " + " + mbpn_sizeChildOffset);
+            out.println("MBPN no. nodes:         " + mbpn_nNodes);
+            out.println("MBPN trie depth:        " + mbpn_trieDepth);
+            out.println("MBPN footprint:         " + mbpn_footprint + " in bytes");
+        }
+    }
+
+    /** out print some statistics of the trie and the dictionary built from it */
+    public Stats stats() {
+        // calculate nEndValueBeneath
+        traversePostOrder(new Visitor() {
+            @Override
+            public void visit(Node n, int level) {
+                n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
+                for (Node c : n.children)
+                    n.nValuesBeneath += c.nValuesBeneath;
+            }
+        });
+
+        // run stats
+        final Stats s = new Stats();
+        final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
+        traverse(new Visitor() {
+            @Override
+            public void visit(Node n, int level) {
+                if (n.isEndOfValue)
+                    s.nValues++;
+                s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
+                s.nValueBytesCompressed += n.part.length;
+                s.mbpn_nNodes++;
+                if (s.mbpn_trieDepth < level + 1)
+                    s.mbpn_trieDepth = level + 1;
+                if (n.children.size() > 0) {
+                    if (s.mbpn_maxFanOut < n.children.size())
+                        s.mbpn_maxFanOut = n.children.size();
+                    int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
+                    s.mbpn_nChildLookups += childLookups;
+                    s.mbpn_nTotalFanOut += childLookups * n.children.size();
+                }
+
+                if (level < lenAtLvl.size())
+                    lenAtLvl.set(level, n.part.length);
+                else
+                    lenAtLvl.add(n.part.length);
+                int lenSoFar = 0;
+                for (int i = 0; i <= level; i++)
+                    lenSoFar += lenAtLvl.get(i);
+                if (lenSoFar > s.maxValueLength)
+                    s.maxValueLength = lenSoFar;
+            }
+        });
+
+        // flatten trie footprint calculation, case of One-Byte-Per-Node
+        s.obpn_sizeValue = 1;
+        s.obpn_sizeNoValuesBeneath = BytesUtil.sizeForValue(s.nValues);
+        s.obpn_sizeChildCount = 1;
+        s.obpn_sizeChildOffset = 5; // MSB used as isEndOfValue flag
+        s.obpn_nNodes = s.nValueBytesCompressed; // no. nodes is the total number of compressed bytes in OBPN
+        s.obpn_footprint = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset);
+        while (true) { // minimize the offset size to match the footprint
+            long t = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset - 1);
+            if (BytesUtil.sizeForValue(t * 2) <= s.obpn_sizeChildOffset - 1) { // *2 because MSB of offset is used for isEndOfValue flag
+                s.obpn_sizeChildOffset--;
+                s.obpn_footprint = t;
+            } else
+                break;
+        }
+
+        // flatten trie footprint calculation, case of Multi-Byte-Per-Node
+        s.mbpn_sizeValueTotal = s.nValueBytesCompressed;
+        s.mbpn_sizeNoValueBytes = 1;
+        s.mbpn_sizeNoValueBeneath = BytesUtil.sizeForValue(s.nValues);
+        s.mbpn_sizeChildOffset = 5;
+        s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset);
+        while (true) { // minimize the offset size to match the footprint
+            long t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset - 1);
+            if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
+                s.mbpn_sizeChildOffset--;
+                s.mbpn_footprint = t;
+            } else
+                break;
+        }
+
+        return s;
+    }
+
+    /** out print trie for debug */
+    public void print() {
+        print(System.out);
+    }
+
+    public void print(final PrintStream out) {
+        traverse(new Visitor() {
+            @Override
+            public void visit(Node n, int level) {
+                try {
+                    for (int i = 0; i < level; i++)
+                        out.print("  ");
+                    out.print(new String(n.part, "UTF-8"));
+                    out.print(" - ");
+                    if (n.nValuesBeneath > 0)
+                        out.print(n.nValuesBeneath);
+                    if (n.isEndOfValue)
+                        out.print("*");
+                    out.print("\n");
+                } catch (UnsupportedEncodingException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
+    private CompleteParts completeParts = new CompleteParts();
+
+    private class CompleteParts {
+        byte[] data = new byte[4096];
+        int current = 0;
+
+        public void append(byte[] part) {
+            while (current + part.length > data.length)
+                expand();
+
+            System.arraycopy(part, 0, data, current, part.length);
+            current += part.length;
+        }
+
+        public void withdraw(int size) {
+            current -= size;
+        }
+
+        public byte[] retrieve() {
+            return Arrays.copyOf(data, current);
+        }
+
+        private void expand() {
+            byte[] temp = new byte[2 * data.length];
+            System.arraycopy(data, 0, temp, 0, data.length);
+            data = temp;
+        }
+    }
+
+    // there is a 255 limitation of length for each node's part.
+    // we interpolate nodes to satisfy this when a node's part becomes
+    // too long(overflow)
+    private void checkOverflowParts(Node node) {
+        LinkedList<Node> childrenCopy = new LinkedList<Node>(node.children);
+        for (Node child : childrenCopy) {
+            if (child.part.length > 255) {
+                byte[] first255 = Arrays.copyOf(child.part, 255);
+
+                completeParts.append(node.part);
+                completeParts.append(first255);
+                byte[] visited = completeParts.retrieve();
+                this.addValue(visited);
+                completeParts.withdraw(255);
+                completeParts.withdraw(node.part.length);
+            }
+        }
+
+        completeParts.append(node.part); // by here the node.children may have been changed
+        for (Node child : node.children) {
+            checkOverflowParts(child);
+        }
+        completeParts.withdraw(node.part.length);
+    }
+
+    /**
+     * Flatten the trie into a byte array for a minimized memory footprint.
+     * Lookup remains fast. Cost is inflexibility to modify (becomes immutable).
+     * 
+     * Flattened node structure is HEAD + NODEs, for each node:
+     * - o byte, offset to child node, o = stats.mbpn_sizeChildOffset
+     *    - 1 bit, isLastChild flag, the 1st MSB of o
+     *    - 1 bit, isEndOfValue flag, the 2nd MSB of o
+     * - c byte, number of values beneath, c = stats.mbpn_sizeNoValueBeneath
+     * - 1 byte, number of value bytes
+     * - n byte, value bytes
+     */
+    public TrieDictionary<T> build(int baseId) {
+        byte[] trieBytes = buildTrieBytes(baseId);
+        TrieDictionary<T> r = new TrieDictionary<T>(trieBytes);
+        return r;
+    }
+
+    protected byte[] buildTrieBytes(int baseId) {
+        checkOverflowParts(this.root);
+
+        Stats stats = stats();
+        int sizeNoValuesBeneath = stats.mbpn_sizeNoValueBeneath;
+        int sizeChildOffset = stats.mbpn_sizeChildOffset;
+        
         if (stats.mbpn_footprint <= 0) // must never happen, but let us be cautious
             throw new IllegalStateException("Too big dictionary, dictionary cannot be bigger than 2GB");
-        if (stats.mbpn_footprint > _2GB)
-            throw new RuntimeException("Too big dictionary, dictionary cannot be bigger than 2GB");
-
-        // write head
-        byte[] head;
-        try {
-            ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
-            DataOutputStream headOut = new DataOutputStream(byteBuf);
-            headOut.write(TrieDictionary.MAGIC);
-            headOut.writeShort(0); // head size, will back fill
-            headOut.writeInt((int) stats.mbpn_footprint); // body size
-            headOut.write(sizeChildOffset);
-            headOut.write(sizeNoValuesBeneath);
-            headOut.writeShort(baseId);
-            headOut.writeShort(stats.maxValueLength);
-            headOut.writeUTF(bytesConverter == null ? "" : bytesConverter.getClass().getName());
-            headOut.close();
-            head = byteBuf.toByteArray();
-            BytesUtil.writeUnsigned(head.length, head, TrieDictionary.MAGIC_SIZE_I, 2);
-        } catch (IOException e) {
-            throw new RuntimeException(e); // shall not happen, as we are writing in memory
-        }
-
-        byte[] trieBytes = new byte[(int) stats.mbpn_footprint + head.length];
-        System.arraycopy(head, 0, trieBytes, 0, head.length);
-
-        LinkedList<Node> open = new LinkedList<Node>();
-        IdentityHashMap<Node, Integer> offsetMap = new IdentityHashMap<Node, Integer>();
-
-        // write body
-        int o = head.length;
-        offsetMap.put(root, o);
-        o = build_writeNode(root, o, true, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
-        if (root.children.isEmpty() == false)
-            open.addLast(root);
-
-        while (open.isEmpty() == false) {
-            Node parent = open.removeFirst();
-            build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
-            for (int i = 0; i < parent.children.size(); i++) {
-                Node c = parent.children.get(i);
-                boolean isLastChild = (i == parent.children.size() - 1);
-                offsetMap.put(c, o);
-                o = build_writeNode(c, o, isLastChild, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
-                if (c.children.isEmpty() == false)
-                    open.addLast(c);
-            }
-        }
-
-        if (o != trieBytes.length)
-            throw new RuntimeException();
-        return trieBytes;
-    }
-
-    private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
-        int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
-        BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
-        trieBytes[parentOffset] |= flags;
-    }
-
-    private int build_writeNode(Node n, int offset, boolean isLastChild, int sizeNoValuesBeneath, int sizeChildOffset, byte[] trieBytes) {
-        int o = offset;
-        if (o > _2GB)
-            throw new IllegalStateException();
-
-        // childOffset
-        if (isLastChild)
-            trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
-        if (n.isEndOfValue)
-            trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
-        o += sizeChildOffset;
-
-        // nValuesBeneath
-        BytesUtil.writeUnsigned(n.nValuesBeneath, trieBytes, o, sizeNoValuesBeneath);
-        o += sizeNoValuesBeneath;
-
-        // nValueBytes
-        if (n.part.length > 255)
-            throw new RuntimeException();
-        BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
-        o++;
-
-        // valueBytes
-        System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
-        o += n.part.length;
-
-        return o;
-    }
-
-}
+        if (stats.mbpn_footprint > _2GB)
+            throw new RuntimeException("Too big dictionary, dictionary cannot be bigger than 2GB");
+
+        // write head
+        byte[] head;
+        try {
+            ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+            DataOutputStream headOut = new DataOutputStream(byteBuf);
+            headOut.write(TrieDictionary.MAGIC);
+            headOut.writeShort(0); // head size, will back fill
+            headOut.writeInt((int) stats.mbpn_footprint); // body size
+            headOut.write(sizeChildOffset);
+            headOut.write(sizeNoValuesBeneath);
+            headOut.writeShort(baseId);
+            headOut.writeShort(stats.maxValueLength);
+            headOut.writeUTF(bytesConverter == null ? "" : bytesConverter.getClass().getName());
+            headOut.close();
+            head = byteBuf.toByteArray();
+            BytesUtil.writeUnsigned(head.length, head, TrieDictionary.MAGIC_SIZE_I, 2);
+        } catch (IOException e) {
+            throw new RuntimeException(e); // shall not happen, as we are writing in memory
+        }
+
+        byte[] trieBytes = new byte[(int) stats.mbpn_footprint + head.length];
+        System.arraycopy(head, 0, trieBytes, 0, head.length);
+
+        LinkedList<Node> open = new LinkedList<Node>();
+        IdentityHashMap<Node, Integer> offsetMap = new IdentityHashMap<Node, Integer>();
+
+        // write body
+        int o = head.length;
+        offsetMap.put(root, o);
+        o = build_writeNode(root, o, true, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
+        if (root.children.isEmpty() == false)
+            open.addLast(root);
+
+        while (open.isEmpty() == false) {
+            Node parent = open.removeFirst();
+            build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
+            for (int i = 0; i < parent.children.size(); i++) {
+                Node c = parent.children.get(i);
+                boolean isLastChild = (i == parent.children.size() - 1);
+                offsetMap.put(c, o);
+                o = build_writeNode(c, o, isLastChild, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
+                if (c.children.isEmpty() == false)
+                    open.addLast(c);
+            }
+        }
+
+        if (o != trieBytes.length)
+            throw new RuntimeException();
+        return trieBytes;
+    }
+
+    private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
+        int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
+        BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
+        trieBytes[parentOffset] |= flags;
+    }
+
+    private int build_writeNode(Node n, int offset, boolean isLastChild, int sizeNoValuesBeneath, int sizeChildOffset, byte[] trieBytes) {
+        int o = offset;
+        if (o > _2GB)
+            throw new IllegalStateException();
+
+        // childOffset
+        if (isLastChild)
+            trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
+        if (n.isEndOfValue)
+            trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
+        o += sizeChildOffset;
+
+        // nValuesBeneath
+        BytesUtil.writeUnsigned(n.nValuesBeneath, trieBytes, o, sizeNoValuesBeneath);
+        o += sizeNoValuesBeneath;
+
+        // nValueBytes
+        if (n.part.length > 255)
+            throw new RuntimeException();
+        BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
+        o++;
+
+        // valueBytes
+        System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
+        o += n.part.length;
+
+        return o;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
index 56c1c98..4b96622 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -1,112 +1,112 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-import java.util.Comparator;
-
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- * @author yangli9
- * 
- */
-public class LookupStringTable extends LookupTable<String> {
-
-    private static final Comparator<String> dateStrComparator = new Comparator<String>() {
-        @Override
-        public int compare(String o1, String o2) {
-            long l1 = Long.parseLong(o1);
-            long l2 = Long.parseLong(o2);
-            return Long.compare(l1, l2);
-        }
-    };
-
-    private static final Comparator<String> numStrComparator = new Comparator<String>() {
-        @Override
-        public int compare(String o1, String o2) {
-            double d1 = Double.parseDouble(o1);
-            double d2 = Double.parseDouble(o2);
-            return Double.compare(d1, d2);
-        }
-    };
-
-    private static final Comparator<String> defaultStrComparator = new Comparator<String>() {
-        @Override
-        public int compare(String o1, String o2) {
-            return o1.compareTo(o2);
-        }
-    };
-
-    boolean[] colIsDateTime;
-    boolean[] colIsNumber;
-
-    public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
-        super(tableDesc, keyColumns, table);
-    }
-
-    @Override
-    protected void init() throws IOException {
-        ColumnDesc[] cols = tableDesc.getColumns();
-        colIsDateTime = new boolean[cols.length];
-        colIsNumber = new boolean[cols.length];
-        for (int i = 0; i < cols.length; i++) {
-            DataType t = cols[i].getType();
-            colIsDateTime[i] = t.isDateTimeFamily();
-            colIsNumber[i] = t.isNumberFamily();
-        }
-
-        super.init();
-    }
-
-    @Override
-    protected String[] convertRow(String[] cols) {
-        for (int i = 0; i < cols.length; i++) {
-            if (colIsDateTime[i]) {
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LookupStringTable extends LookupTable<String> {
+
+    private static final Comparator<String> dateStrComparator = new Comparator<String>() {
+        @Override
+        public int compare(String o1, String o2) {
+            long l1 = Long.parseLong(o1);
+            long l2 = Long.parseLong(o2);
+            return Long.compare(l1, l2);
+        }
+    };
+
+    private static final Comparator<String> numStrComparator = new Comparator<String>() {
+        @Override
+        public int compare(String o1, String o2) {
+            double d1 = Double.parseDouble(o1);
+            double d2 = Double.parseDouble(o2);
+            return Double.compare(d1, d2);
+        }
+    };
+
+    private static final Comparator<String> defaultStrComparator = new Comparator<String>() {
+        @Override
+        public int compare(String o1, String o2) {
+            return o1.compareTo(o2);
+        }
+    };
+
+    boolean[] colIsDateTime;
+    boolean[] colIsNumber;
+
+    public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+        super(tableDesc, keyColumns, table);
+    }
+
+    @Override
+    protected void init() throws IOException {
+        ColumnDesc[] cols = tableDesc.getColumns();
+        colIsDateTime = new boolean[cols.length];
+        colIsNumber = new boolean[cols.length];
+        for (int i = 0; i < cols.length; i++) {
+            DataType t = cols[i].getType();
+            colIsDateTime[i] = t.isDateTimeFamily();
+            colIsNumber[i] = t.isNumberFamily();
+        }
+
+        super.init();
+    }
+
+    @Override
+    protected String[] convertRow(String[] cols) {
+        for (int i = 0; i < cols.length; i++) {
+            if (colIsDateTime[i]) {
                 if (cols[i] != null)
                     cols[i] = String.valueOf(DateFormat.stringToMillis(cols[i]));
-            }
-        }
-        return cols;
-    }
-
-    @Override
-    protected Comparator<String> getComparator(int idx) {
-        if (colIsDateTime[idx])
-            return dateStrComparator;
-        else if (colIsNumber[idx])
-            return numStrComparator;
-        else
-            return defaultStrComparator;
-    }
-
-    @Override
-    protected String toString(String cell) {
-        return cell;
-    }
-
-    public Class<?> getType() {
-        return String.class;
-    }
-
-}
+            }
+        }
+        return cols;
+    }
+
+    @Override
+    protected Comparator<String> getComparator(int idx) {
+        if (colIsDateTime[idx])
+            return dateStrComparator;
+        else if (colIsNumber[idx])
+            return numStrComparator;
+        else
+            return defaultStrComparator;
+    }
+
+    @Override
+    protected String toString(String cell) {
+        return cell;
+    }
+
+    public Class<?> getType() {
+        return String.class;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index 9aae755..cd700e9 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -1,180 +1,180 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableReader;
-
-import com.google.common.collect.Sets;
-
-/**
- * An in-memory lookup table, in which each cell is an object of type T. The
- * table is indexed by specified PK for fast lookup.
- *
- * @author yangli9
- */
-abstract public class LookupTable<T> {
-
-    protected TableDesc tableDesc;
-    protected String[] keyColumns;
-    protected ReadableTable table;
-    protected ConcurrentHashMap<Array<T>, T[]> data;
-
-    public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
-        this.tableDesc = tableDesc;
-        this.keyColumns = keyColumns;
-        this.table = table;
-        this.data = new ConcurrentHashMap<Array<T>, T[]>();
-        init();
-    }
-
-    protected void init() throws IOException {
-        int[] keyIndex = new int[keyColumns.length];
-        for (int i = 0; i < keyColumns.length; i++) {
-            keyIndex[i] = tableDesc.findColumnByName(keyColumns[i]).getZeroBasedIndex();
-        }
-
-        TableReader reader = table.getReader();
-        try {
-            while (reader.next()) {
-                initRow(reader.getRow(), keyIndex);
-            }
-        } finally {
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+
+import com.google.common.collect.Sets;
+
+/**
+ * An in-memory lookup table, in which each cell is an object of type T. The
+ * table is indexed by specified PK for fast lookup.
+ *
+ * @author yangli9
+ */
+abstract public class LookupTable<T> {
+
+    protected TableDesc tableDesc;
+    protected String[] keyColumns;
+    protected ReadableTable table;
+    protected ConcurrentHashMap<Array<T>, T[]> data;
+
+    public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+        this.tableDesc = tableDesc;
+        this.keyColumns = keyColumns;
+        this.table = table;
+        this.data = new ConcurrentHashMap<Array<T>, T[]>();
+        init();
+    }
+
+    protected void init() throws IOException {
+        int[] keyIndex = new int[keyColumns.length];
+        for (int i = 0; i < keyColumns.length; i++) {
+            keyIndex[i] = tableDesc.findColumnByName(keyColumns[i]).getZeroBasedIndex();
+        }
+
+        TableReader reader = table.getReader();
+        try {
+            while (reader.next()) {
+                initRow(reader.getRow(), keyIndex);
+            }
+        } finally {
             IOUtils.closeQuietly(reader);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private void initRow(String[] cols, int[] keyIndex) {
-        T[] value = convertRow(cols);
-        T[] keyCols = (T[]) java.lang.reflect.Array.newInstance(getType(), keyIndex.length);
-        for (int i = 0; i < keyCols.length; i++)
-            keyCols[i] = value[keyIndex[i]];
-
-        Array<T> key = new Array<T>(keyCols);
-
-        if (data.containsKey(key))
-            throw new IllegalStateException("Dup key found, key=" + toString(keyCols) + ", value1=" + toString(data.get(key)) + ", value2=" + toString(value));
-
-        data.put(key, value);
-    }
-
-    abstract protected T[] convertRow(String[] cols);
-
-    public T[] getRow(Array<T> key) {
-        return data.get(key);
-    }
-
-    public Collection<T[]> getAllRows() {
-        return data.values();
-    }
-
-    public List<T> scan(String col, List<T> values, String returnCol) {
-        ArrayList<T> result = new ArrayList<T>();
-        int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
-        int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
-        for (T[] row : data.values()) {
-            if (values.contains(row[colIdx]))
-                result.add(row[returnIdx]);
-        }
-        return result;
-    }
-
-    public Pair<T, T> mapRange(String col, T beginValue, T endValue, String returnCol) {
-        int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
-        int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
-        Comparator<T> colComp = getComparator(colIdx);
-        Comparator<T> returnComp = getComparator(returnIdx);
-
-        T returnBegin = null;
-        T returnEnd = null;
-        for (T[] row : data.values()) {
-            if (between(beginValue, row[colIdx], endValue, colComp)) {
-                T returnValue = row[returnIdx];
-                if (returnBegin == null || returnComp.compare(returnValue, returnBegin) < 0) {
-                    returnBegin = returnValue;
-                }
-                if (returnEnd == null || returnComp.compare(returnValue, returnEnd) > 0) {
-                    returnEnd = returnValue;
-                }
-            }
-        }
-        if (returnBegin == null && returnEnd == null)
-            return null;
-        else
-            return Pair.newPair(returnBegin, returnEnd);
-    }
-
-    public Set<T> mapValues(String col, Set<T> values, String returnCol) {
-        int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
-        int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
-        Set<T> result = Sets.newHashSetWithExpectedSize(values.size());
-        for (T[] row : data.values()) {
-            if (values.contains(row[colIdx])) {
-                result.add(row[returnIdx]);
-            }
-        }
-        return result;
-    }
-
-    private boolean between(T beginValue, T v, T endValue, Comparator<T> comp) {
-        return (beginValue == null || comp.compare(beginValue, v) <= 0) && (endValue == null || comp.compare(v, endValue) <= 0);
-    }
-
-    abstract protected Comparator<T> getComparator(int colIdx);
-
-    public String toString() {
-        return "LookupTable [path=" + table + "]";
-    }
-
-    protected String toString(T[] cols) {
-        StringBuilder b = new StringBuilder();
-        b.append("[");
-        for (int i = 0; i < cols.length; i++) {
-            if (i > 0)
-                b.append(",");
-            b.append(toString(cols[i]));
-        }
-        b.append("]");
-        return b.toString();
-    }
-
-    abstract protected String toString(T cell);
-
-    abstract public Class<?> getType();
-
-    public void dump() {
-        for (Array<T> key : data.keySet()) {
-            System.out.println(toString(key.data) + " => " + toString(data.get(key)));
-        }
-    }
-
-}
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void initRow(String[] cols, int[] keyIndex) {
+        T[] value = convertRow(cols);
+        T[] keyCols = (T[]) java.lang.reflect.Array.newInstance(getType(), keyIndex.length);
+        for (int i = 0; i < keyCols.length; i++)
+            keyCols[i] = value[keyIndex[i]];
+
+        Array<T> key = new Array<T>(keyCols);
+
+        if (data.containsKey(key))
+            throw new IllegalStateException("Dup key found, key=" + toString(keyCols) + ", value1=" + toString(data.get(key)) + ", value2=" + toString(value));
+
+        data.put(key, value);
+    }
+
+    abstract protected T[] convertRow(String[] cols);
+
+    public T[] getRow(Array<T> key) {
+        return data.get(key);
+    }
+
+    public Collection<T[]> getAllRows() {
+        return data.values();
+    }
+
+    public List<T> scan(String col, List<T> values, String returnCol) {
+        ArrayList<T> result = new ArrayList<T>();
+        int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+        int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+        for (T[] row : data.values()) {
+            if (values.contains(row[colIdx]))
+                result.add(row[returnIdx]);
+        }
+        return result;
+    }
+
+    public Pair<T, T> mapRange(String col, T beginValue, T endValue, String returnCol) {
+        int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+        int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+        Comparator<T> colComp = getComparator(colIdx);
+        Comparator<T> returnComp = getComparator(returnIdx);
+
+        T returnBegin = null;
+        T returnEnd = null;
+        for (T[] row : data.values()) {
+            if (between(beginValue, row[colIdx], endValue, colComp)) {
+                T returnValue = row[returnIdx];
+                if (returnBegin == null || returnComp.compare(returnValue, returnBegin) < 0) {
+                    returnBegin = returnValue;
+                }
+                if (returnEnd == null || returnComp.compare(returnValue, returnEnd) > 0) {
+                    returnEnd = returnValue;
+                }
+            }
+        }
+        if (returnBegin == null && returnEnd == null)
+            return null;
+        else
+            return Pair.newPair(returnBegin, returnEnd);
+    }
+
+    public Set<T> mapValues(String col, Set<T> values, String returnCol) {
+        int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+        int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+        Set<T> result = Sets.newHashSetWithExpectedSize(values.size());
+        for (T[] row : data.values()) {
+            if (values.contains(row[colIdx])) {
+                result.add(row[returnIdx]);
+            }
+        }
+        return result;
+    }
+
+    private boolean between(T beginValue, T v, T endValue, Comparator<T> comp) {
+        return (beginValue == null || comp.compare(beginValue, v) <= 0) && (endValue == null || comp.compare(v, endValue) <= 0);
+    }
+
+    abstract protected Comparator<T> getComparator(int colIdx);
+
+    public String toString() {
+        return "LookupTable [path=" + table + "]";
+    }
+
+    protected String toString(T[] cols) {
+        StringBuilder b = new StringBuilder();
+        b.append("[");
+        for (int i = 0; i < cols.length; i++) {
+            if (i > 0)
+                b.append(",");
+            b.append(toString(cols[i]));
+        }
+        b.append("]");
+        return b.toString();
+    }
+
+    abstract protected String toString(T cell);
+
+    abstract public Class<?> getType();
+
+    public void dump() {
+        for (Array<T> key : data.keySet()) {
+            System.out.println(toString(key.data) + " => " + toString(data.get(key)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 0d3848c..085158a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -1,81 +1,81 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-import java.util.NavigableSet;
-import java.util.concurrent.ConcurrentHashMap;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 
-/**
- * @author yangli9
- */
-public class SnapshotManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
-
-    // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
-
-    public static SnapshotManager getInstance(KylinConfig config) {
-        SnapshotManager r = SERVICE_CACHE.get(config);
-        if (r == null) {
-            synchronized (SnapshotManager.class) {
-                r = SERVICE_CACHE.get(config);
-                if (r == null) {
-                    r = new SnapshotManager(config);
-                    SERVICE_CACHE.put(config, r);
-                    if (SERVICE_CACHE.size() > 1) {
-                        logger.warn("More than one singleton exist");
-                    }
-                }
-            }
-        }
-        return r;
-    }
-
-    // ============================================================================
-
-    private KylinConfig config;
+/**
+ * @author yangli9
+ */
+public class SnapshotManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
+
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
+
+    public static SnapshotManager getInstance(KylinConfig config) {
+        SnapshotManager r = SERVICE_CACHE.get(config);
+        if (r == null) {
+            synchronized (SnapshotManager.class) {
+                r = SERVICE_CACHE.get(config);
+                if (r == null) {
+                    r = new SnapshotManager(config);
+                    SERVICE_CACHE.put(config, r);
+                    if (SERVICE_CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return r;
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
     private LoadingCache<String, SnapshotTable> snapshotCache; // resource
-
-    // path ==>
-    // SnapshotTable
-
-    private SnapshotManager(KylinConfig config) {
-        this.config = config;
+
+    // path ==>
+    // SnapshotTable
+
+    private SnapshotManager(KylinConfig config) {
+        this.config = config;
         this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, SnapshotTable>() {
             @Override
             public void onRemoval(RemovalNotification<String, SnapshotTable> notification) {
@@ -89,13 +89,13 @@ public class SnapshotManager {
                         return snapshotTable;
                     }
                 });
-    }
-
-    public void wipeoutCache() {
+    }
+
+    public void wipeoutCache() {
         snapshotCache.invalidateAll();
-    }
-
-    public SnapshotTable getSnapshotTable(String resourcePath) throws IOException {
+    }
+
+    public SnapshotTable getSnapshotTable(String resourcePath) throws IOException {
         try {
             SnapshotTable r = snapshotCache.get(resourcePath);
             if (r == null) {
@@ -105,114 +105,114 @@ public class SnapshotManager {
             return r;
         } catch (ExecutionException e) {
             throw new RuntimeException(e.getCause());
-        }
-    }
-
-    public void removeSnapshot(String resourcePath) throws IOException {
-        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
-        store.deleteResource(resourcePath);
+        }
+    }
+
+    public void removeSnapshot(String resourcePath) throws IOException {
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+        store.deleteResource(resourcePath);
         snapshotCache.invalidate(resourcePath);
-    }
-
-    public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
-        SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
-        snapshot.updateRandomUuid();
-
-        String dup = checkDupByInfo(snapshot);
-        if (dup != null) {
-            logger.info("Identical input " + table.getSignature() + ", reuse existing snapshot at " + dup);
-            return getSnapshotTable(dup);
-        }
-
-        if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) {
-            throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() //
-                    + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
-        }
-
-        snapshot.takeSnapshot(table, tableDesc);
-
-        return trySaveNewSnapshot(snapshot);
-    }
-
-    public SnapshotTable rebuildSnapshot(ReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException {
-        SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
-        snapshot.setUuid(overwriteUUID);
-
-        snapshot.takeSnapshot(table, tableDesc);
-
-        SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath());
-        snapshot.setLastModified(existing.getLastModified());
-
-        save(snapshot);
-        snapshotCache.put(snapshot.getResourcePath(), snapshot);
-
-        return snapshot;
-    }
-
-    public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException {
-
-        String dupTable = checkDupByContent(snapshotTable);
-        if (dupTable != null) {
-            logger.info("Identical snapshot content " + snapshotTable + ", reuse existing snapshot at " + dupTable);
-            return getSnapshotTable(dupTable);
-        }
-
-        save(snapshotTable);
-        snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable);
-
-        return snapshotTable;
-    }
-
-    private String checkDupByInfo(SnapshotTable snapshot) throws IOException {
-        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
-        String resourceDir = snapshot.getResourceDir();
-        NavigableSet<String> existings = store.listResources(resourceDir);
-        if (existings == null)
-            return null;
-
-        TableSignature sig = snapshot.getSignature();
-        for (String existing : existings) {
-            SnapshotTable existingTable = load(existing, false); // skip cache,
-            // direct load from store
-            if (existingTable != null && sig.equals(existingTable.getSignature()))
-                return existing;
-        }
-
-        return null;
-    }
-
-    private String checkDupByContent(SnapshotTable snapshot) throws IOException {
-        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
-        String resourceDir = snapshot.getResourceDir();
-        NavigableSet<String> existings = store.listResources(resourceDir);
-        if (existings == null)
-            return null;
-
-        for (String existing : existings) {
-            SnapshotTable existingTable = load(existing, true); // skip cache, direct load from store
-            if (existingTable != null && existingTable.equals(snapshot))
-                return existing;
-        }
-
-        return null;
-    }
-
-    private void save(SnapshotTable snapshot) throws IOException {
-        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
-        String path = snapshot.getResourcePath();
-        store.putResource(path, snapshot, SnapshotTableSerializer.FULL_SERIALIZER);
-    }
-
-    private SnapshotTable load(String resourcePath, boolean loadData) throws IOException {
-        logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData);
-        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
-
-        SnapshotTable table = store.getResource(resourcePath, SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER);
-
-        if (loadData)
-            logger.debug("Loaded snapshot at " + resourcePath);
-
-        return table;
-    }
-
-}
+    }
+
+    public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
+        SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
+        snapshot.updateRandomUuid();
+
+        String dup = checkDupByInfo(snapshot);
+        if (dup != null) {
+            logger.info("Identical input " + table.getSignature() + ", reuse existing snapshot at " + dup);
+            return getSnapshotTable(dup);
+        }
+
+        if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) {
+            throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() //
+                    + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+        }
+
+        snapshot.takeSnapshot(table, tableDesc);
+
+        return trySaveNewSnapshot(snapshot);
+    }
+
+    public SnapshotTable rebuildSnapshot(ReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException {
+        SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
+        snapshot.setUuid(overwriteUUID);
+
+        snapshot.takeSnapshot(table, tableDesc);
+
+        SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath());
+        snapshot.setLastModified(existing.getLastModified());
+
+        save(snapshot);
+        snapshotCache.put(snapshot.getResourcePath(), snapshot);
+
+        return snapshot;
+    }
+
+    public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException {
+
+        String dupTable = checkDupByContent(snapshotTable);
+        if (dupTable != null) {
+            logger.info("Identical snapshot content " + snapshotTable + ", reuse existing snapshot at " + dupTable);
+            return getSnapshotTable(dupTable);
+        }
+
+        save(snapshotTable);
+        snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable);
+
+        return snapshotTable;
+    }
+
+    private String checkDupByInfo(SnapshotTable snapshot) throws IOException {
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+        String resourceDir = snapshot.getResourceDir();
+        NavigableSet<String> existings = store.listResources(resourceDir);
+        if (existings == null)
+            return null;
+
+        TableSignature sig = snapshot.getSignature();
+        for (String existing : existings) {
+            SnapshotTable existingTable = load(existing, false); // skip cache,
+            // direct load from store
+            if (existingTable != null && sig.equals(existingTable.getSignature()))
+                return existing;
+        }
+
+        return null;
+    }
+
+    private String checkDupByContent(SnapshotTable snapshot) throws IOException {
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+        String resourceDir = snapshot.getResourceDir();
+        NavigableSet<String> existings = store.listResources(resourceDir);
+        if (existings == null)
+            return null;
+
+        for (String existing : existings) {
+            SnapshotTable existingTable = load(existing, true); // skip cache, direct load from store
+            if (existingTable != null && existingTable.equals(snapshot))
+                return existing;
+        }
+
+        return null;
+    }
+
+    private void save(SnapshotTable snapshot) throws IOException {
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+        String path = snapshot.getResourcePath();
+        store.putResource(path, snapshot, SnapshotTableSerializer.FULL_SERIALIZER);
+    }
+
+    private SnapshotTable load(String resourcePath, boolean loadData) throws IOException {
+        logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData);
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+
+        SnapshotTable table = store.getResource(resourcePath, SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER);
+
+        if (loadData)
+            logger.debug("Loaded snapshot at " + resourcePath);
+
+        return table;
+    }
+
+}