You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 06:13:15 UTC

[25/47] incubator-kylin git commit: KYLIN-875 rename modules: core-common, core-cube, core-dictionary, core-cube

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
new file mode 100644
index 0000000..af602a0
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * Builds a dictionary using Trie structure. All values are taken in byte[] form
+ * and organized in a Trie with ordering. Then numeric IDs are assigned in
+ * sequence.
+ * 
+ * @author yangli9
+ */
+public class TrieDictionaryBuilder<T> {
+
+    public static class Node {
+        public byte[] part;
+        public boolean isEndOfValue;
+        public ArrayList<Node> children;
+
+        public int nValuesBeneath; // only present after stats()
+
+        Node(byte[] value, boolean isEndOfValue) {
+            reset(value, isEndOfValue);
+        }
+
+        Node(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
+            reset(value, isEndOfValue, children);
+        }
+
+        void reset(byte[] value, boolean isEndOfValue) {
+            reset(value, isEndOfValue, new ArrayList<Node>());
+        }
+
+        void reset(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
+            this.part = value;
+            this.isEndOfValue = isEndOfValue;
+            this.children = children;
+        }
+    }
+
+    public static interface Visitor {
+        void visit(Node n, int level);
+    }
+
+    // ============================================================================
+
+    private Node root;
+    private BytesConverter<T> bytesConverter;
+
+    public TrieDictionaryBuilder(BytesConverter<T> bytesConverter) {
+        this.root = new Node(new byte[0], false);
+        this.bytesConverter = bytesConverter;
+    }
+
+    public void addValue(T value) {
+        addValue(bytesConverter.convertToBytes(value));
+    }
+
+    public void addValue(byte[] value) {
+        addValueR(root, value, 0);
+    }
+
+    private void addValueR(Node node, byte[] value, int start) {
+        // match the value part of current node
+        int i = 0, j = start;
+        int n = node.part.length, nn = value.length;
+        int comp = 0;
+        for (; i < n && j < nn; i++, j++) {
+            comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
+            if (comp != 0)
+                break;
+        }
+
+        // if value fully matched within the current node
+        if (j == nn) {
+            // if equals to current node, just mark end of value
+            if (i == n) {
+                node.isEndOfValue = true;
+            }
+            // otherwise, split the current node into two
+            else {
+                Node c = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+                node.reset(BytesUtil.subarray(node.part, 0, i), true);
+                node.children.add(c);
+            }
+            return;
+        }
+
+        // if partially matched the current, split the current node, add the new
+        // value, make a 3-way
+        if (i < n) {
+            Node c1 = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+            Node c2 = new Node(BytesUtil.subarray(value, j, nn), true);
+            node.reset(BytesUtil.subarray(node.part, 0, i), false);
+            if (comp < 0) {
+                node.children.add(c1);
+                node.children.add(c2);
+            } else {
+                node.children.add(c2);
+                node.children.add(c1);
+            }
+            return;
+        }
+
+        // out matched the current, binary search the next byte for a child node
+        // to continue
+        byte lookfor = value[j];
+        int lo = 0;
+        int hi = node.children.size() - 1;
+        int mid = 0;
+        boolean found = false;
+        comp = 0;
+        while (!found && lo <= hi) {
+            mid = lo + (hi - lo) / 2;
+            comp = BytesUtil.compareByteUnsigned(lookfor, node.children.get(mid).part[0]);
+            if (comp < 0)
+                hi = mid - 1;
+            else if (comp > 0)
+                lo = mid + 1;
+            else
+                found = true;
+        }
+        // found a child node matching the first byte, continue in that child
+        if (found) {
+            addValueR(node.children.get(mid), value, j);
+        }
+        // otherwise, make the value a new child
+        else {
+            Node c = new Node(BytesUtil.subarray(value, j, nn), true);
+            node.children.add(comp <= 0 ? mid : mid + 1, c);
+        }
+    }
+
+    public void traverse(Visitor visitor) {
+        traverseR(root, visitor, 0);
+    }
+
+    private void traverseR(Node node, Visitor visitor, int level) {
+        visitor.visit(node, level);
+        for (Node c : node.children)
+            traverseR(c, visitor, level + 1);
+    }
+
+    public void traversePostOrder(Visitor visitor) {
+        traversePostOrderR(root, visitor, 0);
+    }
+
+    private void traversePostOrderR(Node node, Visitor visitor, int level) {
+        for (Node c : node.children)
+            traversePostOrderR(c, visitor, level + 1);
+        visitor.visit(node, level);
+    }
+
+    public static class Stats {
+        public int nValues; // number of values in total
+        public int nValueBytesPlain; // number of bytes for all values
+                                     // uncompressed
+        public int nValueBytesCompressed; // number of values bytes in Trie
+                                          // (compressed)
+        public int maxValueLength; // size of longest value in bytes
+
+        // the trie is multi-byte-per-node
+        public int mbpn_nNodes; // number of nodes in trie
+        public int mbpn_trieDepth; // depth of trie
+        public int mbpn_maxFanOut; // the maximum no. children
+        public int mbpn_nChildLookups; // number of child lookups during lookup
+                                       // every value once
+        public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every
+                                      // value once
+        public int mbpn_sizeValueTotal; // the sum of value space in all nodes
+        public int mbpn_sizeNoValueBytes; // size of field noValueBytes
+        public int mbpn_sizeNoValueBeneath; // size of field noValuesBeneath,
+                                            // depends on cardinality
+        public int mbpn_sizeChildOffset; // size of field childOffset, points to
+                                         // first child in flattened array
+        public int mbpn_footprint; // MBPN footprint in bytes
+
+        // stats for one-byte-per-node as well, so there's comparison
+        public int obpn_sizeValue; // size of value per node, always 1
+        public int obpn_sizeNoValuesBeneath; // size of field noValuesBeneath,
+                                             // depends on cardinality
+        public int obpn_sizeChildCount; // size of field childCount, enables
+                                        // binary search among children
+        public int obpn_sizeChildOffset; // size of field childOffset, points to
+                                         // first child in flattened array
+        public int obpn_nNodes; // no. nodes in OBPN trie
+        public int obpn_footprint; // OBPN footprint in bytes
+
+        public void print() {
+            PrintStream out = System.out;
+            out.println("============================================================================");
+            out.println("No. values:             " + nValues);
+            out.println("No. bytes raw:          " + nValueBytesPlain);
+            out.println("No. bytes in trie:      " + nValueBytesCompressed);
+            out.println("Longest value length:   " + maxValueLength);
+
+            // flatten trie footprint calculation, case of One-Byte-Per-Node
+            out.println("----------------------------------------------------------------------------");
+            out.println("OBPN node size:  " + (obpn_sizeValue + obpn_sizeNoValuesBeneath + obpn_sizeChildCount + obpn_sizeChildOffset) + " = " + obpn_sizeValue + " + " + obpn_sizeNoValuesBeneath + " + " + obpn_sizeChildCount + " + " + obpn_sizeChildOffset);
+            out.println("OBPN no. nodes:  " + obpn_nNodes);
+            out.println("OBPN trie depth: " + maxValueLength);
+            out.println("OBPN footprint:  " + obpn_footprint + " in bytes");
+
+            // flatten trie footprint calculation, case of Multi-Byte-Per-Node
+            out.println("----------------------------------------------------------------------------");
+            out.println("MBPN max fan out:       " + mbpn_maxFanOut);
+            out.println("MBPN no. child lookups: " + mbpn_nChildLookups);
+            out.println("MBPN total fan out:     " + mbpn_nTotalFanOut);
+            out.println("MBPN average fan out:   " + (double) mbpn_nTotalFanOut / mbpn_nChildLookups);
+            out.println("MBPN values size total: " + mbpn_sizeValueTotal);
+            out.println("MBPN node size:         " + (mbpn_sizeNoValueBytes + mbpn_sizeNoValueBeneath + mbpn_sizeChildOffset) + " = " + mbpn_sizeNoValueBytes + " + " + mbpn_sizeNoValueBeneath + " + " + mbpn_sizeChildOffset);
+            out.println("MBPN no. nodes:         " + mbpn_nNodes);
+            out.println("MBPN trie depth:        " + mbpn_trieDepth);
+            out.println("MBPN footprint:         " + mbpn_footprint + " in bytes");
+        }
+    }
+
+    /** out print some statistics of the trie and the dictionary built from it */
+    public Stats stats() {
+        // calculate nEndValueBeneath
+        traversePostOrder(new Visitor() {
+            @Override
+            public void visit(Node n, int level) {
+                n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
+                for (Node c : n.children)
+                    n.nValuesBeneath += c.nValuesBeneath;
+            }
+        });
+
+        // run stats
+        final Stats s = new Stats();
+        final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
+        traverse(new Visitor() {
+            @Override
+            public void visit(Node n, int level) {
+                if (n.isEndOfValue)
+                    s.nValues++;
+                s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
+                s.nValueBytesCompressed += n.part.length;
+                s.mbpn_nNodes++;
+                if (s.mbpn_trieDepth < level + 1)
+                    s.mbpn_trieDepth = level + 1;
+                if (n.children.size() > 0) {
+                    if (s.mbpn_maxFanOut < n.children.size())
+                        s.mbpn_maxFanOut = n.children.size();
+                    int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
+                    s.mbpn_nChildLookups += childLookups;
+                    s.mbpn_nTotalFanOut += childLookups * n.children.size();
+                }
+
+                if (level < lenAtLvl.size())
+                    lenAtLvl.set(level, n.part.length);
+                else
+                    lenAtLvl.add(n.part.length);
+                int lenSoFar = 0;
+                for (int i = 0; i <= level; i++)
+                    lenSoFar += lenAtLvl.get(i);
+                if (lenSoFar > s.maxValueLength)
+                    s.maxValueLength = lenSoFar;
+            }
+        });
+
+        // flatten trie footprint calculation, case of One-Byte-Per-Node
+        s.obpn_sizeValue = 1;
+        s.obpn_sizeNoValuesBeneath = BytesUtil.sizeForValue(s.nValues);
+        s.obpn_sizeChildCount = 1;
+        s.obpn_sizeChildOffset = 4; // MSB used as isEndOfValue flag
+        s.obpn_nNodes = s.nValueBytesCompressed; // no. nodes is the total
+                                                 // number of compressed
+                                                 // bytes in OBPN
+        s.obpn_footprint = s.obpn_nNodes * (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset);
+        while (true) { // minimize the offset size to match the footprint
+            int t = s.obpn_nNodes * (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset - 1);
+            if (BytesUtil.sizeForValue(t * 2) <= s.obpn_sizeChildOffset - 1) { // *2
+                                                                               // because
+                                                                               // MSB
+                                                                               // of
+                                                                               // offset
+                                                                               // is
+                                                                               // used
+                                                                               // for
+                                                                               // isEndOfValue
+                                                                               // flag
+                s.obpn_sizeChildOffset--;
+                s.obpn_footprint = t;
+            } else
+                break;
+        }
+
+        // flatten trie footprint calculation, case of Multi-Byte-Per-Node
+        s.mbpn_sizeValueTotal = s.nValueBytesCompressed;
+        s.mbpn_sizeNoValueBytes = 1;
+        s.mbpn_sizeNoValueBeneath = BytesUtil.sizeForValue(s.nValues);
+        s.mbpn_sizeChildOffset = 4;
+        s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset);
+        while (true) { // minimize the offset size to match the footprint
+            int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset - 1);
+            if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { // *4
+                                                                               // because
+                                                                               // 2
+                                                                               // MSB
+                                                                               // of
+                                                                               // offset
+                                                                               // is
+                                                                               // used
+                                                                               // for
+                                                                               // isEndOfValue
+                                                                               // &
+                                                                               // isEndChild
+                                                                               // flag
+                s.mbpn_sizeChildOffset--;
+                s.mbpn_footprint = t;
+            } else
+                break;
+        }
+
+        return s;
+    }
+
+    /** out print trie for debug */
+    public void print() {
+        print(System.out);
+    }
+
+    public void print(final PrintStream out) {
+        traverse(new Visitor() {
+            @Override
+            public void visit(Node n, int level) {
+                try {
+                    for (int i = 0; i < level; i++)
+                        out.print("  ");
+                    out.print(new String(n.part, "UTF-8"));
+                    out.print(" - ");
+                    if (n.nValuesBeneath > 0)
+                        out.print(n.nValuesBeneath);
+                    if (n.isEndOfValue)
+                        out.print("*");
+                    out.print("\n");
+                } catch (UnsupportedEncodingException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+
+    private CompleteParts completeParts = new CompleteParts();
+
+    private class CompleteParts {
+        byte[] data = new byte[4096];
+        int current = 0;
+
+        public void append(byte[] part) {
+            while (current + part.length > data.length)
+                expand();
+
+            System.arraycopy(part, 0, data, current, part.length);
+            current += part.length;
+        }
+
+        public void withdraw(int size) {
+            current -= size;
+        }
+
+        public byte[] retrieve() {
+            return Arrays.copyOf(data, current);
+        }
+
+        private void expand() {
+            byte[] temp = new byte[2 * data.length];
+            System.arraycopy(data, 0, temp, 0, data.length);
+            data = temp;
+        }
+    }
+
+    // there is a 255 limitation of length for each node's part.
+    // we interpolate nodes to satisfy this when a node's part becomes
+    // too long(overflow)
+    private void checkOverflowParts(Node node) {
+        LinkedList<Node> childrenCopy = new LinkedList<Node>(node.children);
+        for (Node child : childrenCopy) {
+            if (child.part.length > 255) {
+                byte[] first255 = Arrays.copyOf(child.part, 255);
+
+                completeParts.append(node.part);
+                completeParts.append(first255);
+                byte[] visited = completeParts.retrieve();
+                this.addValue(visited);
+                completeParts.withdraw(255);
+                completeParts.withdraw(node.part.length);
+            }
+        }
+
+        completeParts.append(node.part);// by here the node.children may have
+                                        // been changed
+        for (Node child : node.children) {
+            checkOverflowParts(child);
+        }
+        completeParts.withdraw(node.part.length);
+    }
+
+    /**
+     * Flatten the trie into a byte array for a minimized memory footprint.
+     * Lookup remains fast. Cost is inflexibility to modify (becomes immutable).
+     * 
+     * Flattened node structure is HEAD + NODEs, for each node: - o byte, offset
+     * to child node, o = stats.mbpn_sizeChildOffset - 1 bit, isLastChild flag,
+     * the 1st MSB of o - 1 bit, isEndOfValue flag, the 2nd MSB of o - c byte,
+     * number of values beneath, c = stats.mbpn_sizeNoValueBeneath - 1 byte,
+     * number of value bytes - n byte, value bytes
+     */
+    public TrieDictionary<T> build(int baseId) {
+        byte[] trieBytes = buildTrieBytes(baseId);
+        TrieDictionary<T> r = new TrieDictionary<T>(trieBytes);
+        return r;
+    }
+
+    protected byte[] buildTrieBytes(int baseId) {
+        checkOverflowParts(this.root);
+
+        Stats stats = stats();
+        int sizeNoValuesBeneath = stats.mbpn_sizeNoValueBeneath;
+        int sizeChildOffset = stats.mbpn_sizeChildOffset;
+
+        // write head
+        byte[] head;
+        try {
+            ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+            DataOutputStream headOut = new DataOutputStream(byteBuf);
+            headOut.write(TrieDictionary.HEAD_MAGIC);
+            headOut.writeShort(0); // head size, will back fill
+            headOut.writeInt(stats.mbpn_footprint); // body size
+            headOut.write(sizeChildOffset);
+            headOut.write(sizeNoValuesBeneath);
+            headOut.writeShort(baseId);
+            headOut.writeShort(stats.maxValueLength);
+            headOut.writeUTF(bytesConverter == null ? "" : bytesConverter.getClass().getName());
+            headOut.close();
+            head = byteBuf.toByteArray();
+            BytesUtil.writeUnsigned(head.length, head, TrieDictionary.HEAD_SIZE_I, 2);
+        } catch (IOException e) {
+            throw new RuntimeException(e); // shall not happen, as we are
+                                           // writing in memory
+        }
+
+        byte[] trieBytes = new byte[stats.mbpn_footprint + head.length];
+        System.arraycopy(head, 0, trieBytes, 0, head.length);
+
+        LinkedList<Node> open = new LinkedList<Node>();
+        IdentityHashMap<Node, Integer> offsetMap = new IdentityHashMap<Node, Integer>();
+
+        // write body
+        int o = head.length;
+        offsetMap.put(root, o);
+        o = build_writeNode(root, o, true, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
+        if (root.children.isEmpty() == false)
+            open.addLast(root);
+
+        while (open.isEmpty() == false) {
+            Node parent = open.removeFirst();
+            build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
+            for (int i = 0; i < parent.children.size(); i++) {
+                Node c = parent.children.get(i);
+                boolean isLastChild = (i == parent.children.size() - 1);
+                offsetMap.put(c, o);
+                o = build_writeNode(c, o, isLastChild, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
+                if (c.children.isEmpty() == false)
+                    open.addLast(c);
+            }
+        }
+
+        if (o != trieBytes.length)
+            throw new RuntimeException();
+        return trieBytes;
+    }
+
+    private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
+        int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
+        BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
+        trieBytes[parentOffset] |= flags;
+    }
+
+    private int build_writeNode(Node n, int offset, boolean isLastChild, int sizeNoValuesBeneath, int sizeChildOffset, byte[] trieBytes) {
+        int o = offset;
+
+        // childOffset
+        if (isLastChild)
+            trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
+        if (n.isEndOfValue)
+            trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
+        o += sizeChildOffset;
+
+        // nValuesBeneath
+        BytesUtil.writeUnsigned(n.nValuesBeneath, trieBytes, o, sizeNoValuesBeneath);
+        o += sizeNoValuesBeneath;
+
+        // nValueBytes
+        if (n.part.length > 255)
+            throw new RuntimeException();
+        BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
+        o++;
+
+        // valueBytes
+        System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
+        o += n.part.length;
+
+        return o;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
new file mode 100644
index 0000000..59eca4a
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ */
+public class FileTable implements ReadableTable {
+
+    public static final String DELIM_AUTO = "auto";
+    public static final String DELIM_COMMA = ",";
+
+    String path;
+    String delim;
+    int nColumns;
+
+    public FileTable(String path, int nColumns) {
+        this(path, DELIM_AUTO, nColumns);
+    }
+
+    public FileTable(String path, String delim, int nColumns) {
+        this.path = path;
+        this.delim = delim;
+        this.nColumns = nColumns;
+    }
+
+    public String getColumnDelimeter() {
+        return delim;
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        return new FileTableReader(path, delim, nColumns);
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        try {
+            Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path);
+            return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond());
+        } catch (FileNotFoundException ex) {
+            return null;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return path;
+    }
+
+    public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException {
+        FileSystem fs = HadoopUtil.getFileSystem(path);
+
+        // get all contained files if path is directory
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(path));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(path));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+
+        long size = 0;
+        long lastModified = 0;
+        for (FileStatus file : allFiles) {
+            size += file.getLen();
+            lastModified = Math.max(lastModified, file.getModificationTime());
+        }
+
+        return new Pair<Long, Long>(size, lastModified);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
new file mode 100644
index 0000000..4e04c93
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.source.ReadableTable.TableReader;
+
+/**
+ * Tables are typically CSV or SEQ file.
+ * 
+ * @author yangli9
+ */
+public class FileTableReader implements TableReader {
+
+    private static final Logger logger = LoggerFactory.getLogger(FileTableReader.class);
+    private static final char CSV_QUOTE = '"';
+    private static final String[] DETECT_DELIMS = new String[] { "\177", "|", "\t", "," };
+
+    private String filePath;
+    private String delim;
+    private RowReader reader;
+
+    private String curLine;
+    private String[] curColumns;
+    private int expectedColumnNumber = -1; // helps delimiter detection
+
+    public FileTableReader(String filePath, int expectedColumnNumber) throws IOException {
+        this(filePath, FileTable.DELIM_AUTO, expectedColumnNumber);
+    }
+    
+    public FileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
+        filePath = HadoopUtil.fixWindowsPath(filePath);
+        this.filePath = filePath;
+        this.delim = delim;
+        this.expectedColumnNumber = expectedColumnNumber;
+
+        FileSystem fs = HadoopUtil.getFileSystem(filePath);
+
+        try {
+            this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
+
+        } catch (IOException e) {
+            if (isExceptionSayingNotSeqFile(e) == false)
+                throw e;
+
+            this.reader = new CsvRowReader(fs, filePath);
+        }
+    }
+
+    private boolean isExceptionSayingNotSeqFile(IOException e) {
+        if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile"))
+            return true;
+
+        if (e instanceof EOFException) // in case the file is very very small
+            return true;
+
+        return false;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+        curLine = reader.nextLine();
+        curColumns = null;
+        return curLine != null;
+    }
+
+    public String getLine() {
+        return curLine;
+    }
+
+    @Override
+    public String[] getRow() {
+        if (curColumns == null) {
+            if (FileTable.DELIM_AUTO.equals(delim))
+                delim = autoDetectDelim(curLine);
+
+            if (delim == null)
+                curColumns = new String[] { curLine };
+            else
+                curColumns = split(curLine, delim);
+        }
+        return curColumns;
+    }
+
+    private String[] split(String line, String delim) {
+        // FIXME CVS line should be parsed considering escapes
+        String str[] = StringSplitter.split(line, delim);
+
+        // un-escape CSV
+        if (FileTable.DELIM_COMMA.equals(delim)) {
+            for (int i = 0; i < str.length; i++) {
+                str[i] = unescapeCsv(str[i]);
+            }
+        }
+
+        return str;
+    }
+
+    private String unescapeCsv(String str) {
+        if (str == null || str.length() < 2)
+            return str;
+
+        str = StringEscapeUtils.unescapeCsv(str);
+
+        // unescapeCsv may not remove the outer most quotes
+        if (str.charAt(0) == CSV_QUOTE && str.charAt(str.length() - 1) == CSV_QUOTE)
+            str = str.substring(1, str.length() - 1);
+
+        return str;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (reader != null)
+            reader.close();
+    }
+
+    private String autoDetectDelim(String line) {
+        if (expectedColumnNumber > 0) {
+            for (String delim : DETECT_DELIMS) {
+                if (StringSplitter.split(line, delim).length == expectedColumnNumber) {
+                    logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber + " columns -- " + line);
+                    return delim;
+                }
+            }
+        }
+
+        logger.info("Auto detect delim to be null, will take THE-WHOLE-LINE as a single value, for " + filePath);
+        return null;
+    }
+
+    // ============================================================================
+
+    private interface RowReader extends Closeable {
+        String nextLine() throws IOException; // return null on EOF
+    }
+
+    private class SeqRowReader implements RowReader {
+        Reader reader;
+        Writable key;
+        Text value;
+
+        SeqRowReader(Configuration hconf, FileSystem fs, String path) throws IOException {
+            reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
+            key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
+            value = new Text();
+        }
+
+        @Override
+        public String nextLine() throws IOException {
+            boolean hasNext = reader.next(key, value);
+            if (hasNext)
+                return Bytes.toString(value.getBytes(), 0, value.getLength());
+            else
+                return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+        }
+    }
+
+    private class CsvRowReader implements RowReader {
+        BufferedReader reader;
+
+        CsvRowReader(FileSystem fs, String path) throws IOException {
+            FSDataInputStream in = fs.open(new Path(path));
+            reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+        }
+
+        @Override
+        public String nextLine() throws IOException {
+            return reader.readLine();
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
new file mode 100644
index 0000000..c5a75f5
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LookupBytesTable extends LookupTable<ByteArray> {
+
+    public LookupBytesTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+        super(tableDesc, keyColumns, table);
+    }
+
+    @Override
+    protected ByteArray[] convertRow(String[] cols) {
+        ByteArray[] r = new ByteArray[cols.length];
+        for (int i = 0; i < cols.length; i++) {
+            r[i] = cols[i] == null ? null : new ByteArray(Bytes.toBytes(cols[i]));
+        }
+        return r;
+    }
+
+    @Override
+    protected String toString(ByteArray cell) {
+        return cell.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
new file mode 100644
index 0000000..2d92d68
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LookupStringTable extends LookupTable<String> {
+
+    public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+        super(tableDesc, keyColumns, table);
+    }
+
+    @Override
+    protected String[] convertRow(String[] cols) {
+        return cols;
+    }
+
+    @Override
+    protected String toString(String cell) {
+        return cell;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
new file mode 100644
index 0000000..fd0c37f
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.util.Pair;
+
+import com.google.common.collect.Sets;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+
+/**
+ * An in-memory lookup table, in which each cell is an object of type T. The
+ * table is indexed by specified PK for fast lookup.
+ * 
+ * @author yangli9
+ */
+abstract public class LookupTable<T extends Comparable<T>> {
+
+    protected TableDesc tableDesc;
+    protected String[] keyColumns;
+    protected ReadableTable table;
+    protected ConcurrentHashMap<Array<T>, T[]> data;
+
+    public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+        this.tableDesc = tableDesc;
+        this.keyColumns = keyColumns;
+        this.table = table;
+        this.data = new ConcurrentHashMap<Array<T>, T[]>();
+        init();
+    }
+
+    protected void init() throws IOException {
+        int[] keyIndex = new int[keyColumns.length];
+        for (int i = 0; i < keyColumns.length; i++) {
+            keyIndex[i] = tableDesc.findColumnByName(keyColumns[i]).getZeroBasedIndex();
+        }
+
+        TableReader reader = table.getReader();
+        try {
+            while (reader.next()) {
+                initRow(reader.getRow(), keyIndex);
+            }
+        } finally {
+            reader.close();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void initRow(String[] cols, int[] keyIndex) {
+        T[] value = convertRow(cols);
+        T[] keyCols = (T[]) java.lang.reflect.Array.newInstance(value[0].getClass(), keyIndex.length);
+        for (int i = 0; i < keyCols.length; i++)
+            keyCols[i] = value[keyIndex[i]];
+
+        Array<T> key = new Array<T>(keyCols);
+
+        if (data.containsKey(key))
+            throw new IllegalStateException("Dup key found, key=" + toString(keyCols) + ", value1=" + toString(data.get(key)) + ", value2=" + toString(value));
+
+        data.put(key, value);
+    }
+
+    abstract protected T[] convertRow(String[] cols);
+
+    public T[] getRow(Array<T> key) {
+        return data.get(key);
+    }
+
+    public Collection<T[]> getAllRows() {
+        return data.values();
+    }
+
+    public List<T> scan(String col, List<T> values, String returnCol) {
+        ArrayList<T> result = new ArrayList<T>();
+        int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+        int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+        for (T[] row : data.values()) {
+            if (values.contains(row[colIdx]))
+                result.add(row[returnIdx]);
+        }
+        return result;
+    }
+
+    public Pair<T, T> mapRange(String col, T beginValue, T endValue, String returnCol) {
+        int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+        int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+        T returnBegin = null;
+        T returnEnd = null;
+        for (T[] row : data.values()) {
+            if (between(beginValue, row[colIdx], endValue)) {
+                T returnValue = row[returnIdx];
+                if (returnBegin == null || returnValue.compareTo(returnBegin) < 0) {
+                    returnBegin = returnValue;
+                }
+                if (returnEnd == null || returnValue.compareTo(returnEnd) > 0) {
+                    returnEnd = returnValue;
+                }
+            }
+        }
+        if (returnBegin == null && returnEnd == null)
+            return null;
+        else
+            return new Pair<T, T>(returnBegin, returnEnd);
+    }
+
+    public Set<T> mapValues(String col, Set<T> values, String returnCol) {
+        int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+        int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+        Set<T> result = Sets.newHashSetWithExpectedSize(values.size());
+        for (T[] row : data.values()) {
+            if (values.contains(row[colIdx])) {
+                result.add(row[returnIdx]);
+            }
+        }
+        return result;
+    }
+
+    private boolean between(T beginValue, T v, T endValue) {
+        return (beginValue == null || beginValue.compareTo(v) <= 0) && (endValue == null || v.compareTo(endValue) <= 0);
+    }
+
+    public String toString() {
+        return "LookupTable [path=" + table + "]";
+    }
+
+    protected String toString(T[] cols) {
+        StringBuilder b = new StringBuilder();
+        b.append("[");
+        for (int i = 0; i < cols.length; i++) {
+            if (i > 0)
+                b.append(",");
+            b.append(toString(cols[i]));
+        }
+        b.append("]");
+        return b.toString();
+    }
+
+    abstract protected String toString(T cell);
+
+    public void dump() {
+        for (Array<T> key : data.keySet()) {
+            System.out.println(toString(key.data) + " => " + toString(data.get(key)));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
new file mode 100644
index 0000000..01c4fbd
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
@@ -0,0 +1,29 @@
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
+
+public class SnapshotCLI {
+
+    public static void main(String[] args) throws IOException {
+        if ("rebuild".equals(args[0]))
+            rebuild(args[1], args[2]);
+    }
+
+    private static void rebuild(String table, String overwriteUUID) throws IOException {
+        KylinConfig conf = KylinConfig.getInstanceFromEnv();
+        MetadataManager metaMgr = MetadataManager.getInstance(conf);
+        SnapshotManager snapshotMgr = SnapshotManager.getInstance(conf);
+        
+        TableDesc tableDesc = metaMgr.getTableDesc(table);
+        if (tableDesc == null)
+            throw new IllegalArgumentException("Not table found by " + table);
+        
+        SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(TableSourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
+        System.out.println("resource path updated: " + snapshot.getResourcePath());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
new file mode 100644
index 0000000..7822154
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableSignature;
+
+/**
+ * @author yangli9
+ */
+public class SnapshotManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
+
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
+
+    public static SnapshotManager getInstance(KylinConfig config) {
+        SnapshotManager r = SERVICE_CACHE.get(config);
+        if (r == null) {
+            r = new SnapshotManager(config);
+            SERVICE_CACHE.put(config, r);
+            if (SERVICE_CACHE.size() > 1) {
+                logger.warn("More than one singleton exist");
+            }
+        }
+        return r;
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
+    private ConcurrentHashMap<String, SnapshotTable> snapshotCache; // resource
+
+    // path ==>
+    // SnapshotTable
+
+    private SnapshotManager(KylinConfig config) {
+        this.config = config;
+        snapshotCache = new ConcurrentHashMap<String, SnapshotTable>();
+    }
+
+    public void wipeoutCache() {
+        snapshotCache.clear();
+    }
+
+    public SnapshotTable getSnapshotTable(String resourcePath) throws IOException {
+        SnapshotTable r = snapshotCache.get(resourcePath);
+        if (r == null) {
+            r = load(resourcePath, true);
+            snapshotCache.put(resourcePath, r);
+        }
+        return r;
+    }
+
+    public void removeSnapshot(String resourcePath) throws IOException {
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+        store.deleteResource(resourcePath);
+        snapshotCache.remove(resourcePath);
+    }
+
+    public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
+        SnapshotTable snapshot = new SnapshotTable(table);
+        snapshot.updateRandomUuid();
+
+        String dup = checkDupByInfo(snapshot);
+        if (dup != null) {
+            logger.info("Identical input " + table.getSignature() + ", reuse existing snapshot at " + dup);
+            return getSnapshotTable(dup);
+        }
+
+        if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) {
+            throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() //
+                    + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+        }
+
+        snapshot.takeSnapshot(table, tableDesc);
+
+        return trySaveNewSnapshot(snapshot);
+    }
+    
+    public SnapshotTable rebuildSnapshot(ReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException {
+        SnapshotTable snapshot = new SnapshotTable(table);
+        snapshot.setUuid(overwriteUUID);
+        
+        snapshot.takeSnapshot(table, tableDesc);
+        
+        SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath());
+        snapshot.setLastModified(existing.getLastModified());
+        
+        save(snapshot);
+        snapshotCache.put(snapshot.getResourcePath(), snapshot);
+        
+        return snapshot;
+    }
+
+    public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException {
+
+        String dupTable = checkDupByContent(snapshotTable);
+        if (dupTable != null) {
+            logger.info("Identical snapshot content " + snapshotTable + ", reuse existing snapshot at " + dupTable);
+            return getSnapshotTable(dupTable);
+        }
+
+        save(snapshotTable);
+        snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable);
+
+        return snapshotTable;
+    }
+
+    private String checkDupByInfo(SnapshotTable snapshot) throws IOException {
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+        String resourceDir = snapshot.getResourceDir();
+        ArrayList<String> existings = store.listResources(resourceDir);
+        if (existings == null)
+            return null;
+
+        TableSignature sig = snapshot.getSignature();
+        for (String existing : existings) {
+            SnapshotTable existingTable = load(existing, false); // skip cache,
+            // direct load from store
+            if (existingTable != null && sig.equals(existingTable.getSignature()))
+                return existing;
+        }
+
+        return null;
+    }
+
+    private String checkDupByContent(SnapshotTable snapshot) throws IOException {
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+        String resourceDir = snapshot.getResourceDir();
+        ArrayList<String> existings = store.listResources(resourceDir);
+        if (existings == null)
+            return null;
+
+        for (String existing : existings) {
+            SnapshotTable existingTable = load(existing, true); // skip cache, direct load from store
+            if (existingTable != null && existingTable.equals(snapshot))
+                return existing;
+        }
+
+        return null;
+    }
+
+    private void save(SnapshotTable snapshot) throws IOException {
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+        String path = snapshot.getResourcePath();
+        store.putResource(path, snapshot, SnapshotTableSerializer.FULL_SERIALIZER);
+    }
+
+    private SnapshotTable load(String resourcePath, boolean loadData) throws IOException {
+        logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData);
+        ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+
+        SnapshotTable table = store.getResource(resourcePath, SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER);
+
+        if (loadData)
+            logger.debug("Loaded snapshot at " + resourcePath);
+
+        return table;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
new file mode 100644
index 0000000..e2205b9
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * @author yangli9
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class SnapshotTable extends RootPersistentEntity implements ReadableTable {
+
+    @JsonProperty("signature")
+    private TableSignature signature;
+    @JsonProperty("useDictionary")
+    private boolean useDictionary;
+
+    private ArrayList<int[]> rowIndices;
+    private Dictionary<String> dict;
+
+    // default constructor for JSON serialization
+    public SnapshotTable() {
+    }
+
+    SnapshotTable(ReadableTable table) throws IOException {
+        this.signature = table.getSignature();
+        this.useDictionary = true;
+    }
+
+    public void takeSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
+        this.signature = table.getSignature();
+
+        int maxIndex = tableDesc.getMaxColumnIndex();
+
+        TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter());
+
+        TableReader reader = table.getReader();
+        while (reader.next()) {
+            String[] row = reader.getRow();
+            if (row.length <= maxIndex) {
+                throw new IllegalStateException("Bad hive table row, " + tableDesc + " expect " + (maxIndex + 1) + " columns, but got " + Arrays.toString(row));
+            }
+
+            for (String cell : row) {
+                if (cell != null)
+                    b.addValue(cell);
+            }
+        }
+
+        this.dict = b.build(0);
+
+        reader = table.getReader();
+        ArrayList<int[]> allRowIndices = new ArrayList<int[]>();
+        while (reader.next()) {
+            String[] row = reader.getRow();
+            int[] rowIndex = new int[row.length];
+            for (int i = 0; i < row.length; i++) {
+                rowIndex[i] = dict.getIdFromValue(row[i]);
+            }
+            allRowIndices.add(rowIndex);
+        }
+        this.rowIndices = allRowIndices;
+    }
+
+    public String getResourcePath() {
+        return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new Path(signature.getPath()).getName() + "/" + uuid + ".snapshot";
+    }
+
+    public String getResourceDir() {
+        return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new Path(signature.getPath()).getName();
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        return new TableReader() {
+
+            int i = -1;
+
+            @Override
+            public boolean next() throws IOException {
+                i++;
+                return i < rowIndices.size();
+            }
+
+            @Override
+            public String[] getRow() {
+                int[] rowIndex = rowIndices.get(i);
+                String[] row = new String[rowIndex.length];
+                for (int x = 0; x < row.length; x++) {
+                    row[x] = dict.getValueFromId(rowIndex[x]);
+                }
+                return row;
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+        };
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        return signature;
+    }
+
+    /**
+     * a naive implementation
+     *
+     * @return
+     */
+    @Override
+    public int hashCode() {
+        int[] parts = new int[this.rowIndices.size()];
+        for (int i = 0; i < parts.length; ++i)
+            parts[i] = Arrays.hashCode(this.rowIndices.get(i));
+        return Arrays.hashCode(parts);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if ((o instanceof SnapshotTable) == false)
+            return false;
+        SnapshotTable that = (SnapshotTable) o;
+
+        //compare row by row
+        if (this.rowIndices.size() != that.rowIndices.size())
+            return false;
+        for (int i = 0; i < this.rowIndices.size(); ++i) {
+            if (!ArrayUtils.isEquals(this.rowIndices.get(i), that.rowIndices.get(i)))
+                return false;
+        }
+        return true;
+    }
+
+    private static String NULL_STR;
+    {
+        try {
+            // a special placeholder to indicate a NULL; 0, 9, 127, 255 are a few invisible ASCII characters
+            NULL_STR = new String(new byte[] { 0, 9, 127, (byte) 255 }, "ISO-8859-1");
+        } catch (UnsupportedEncodingException e) {
+            // does not happen
+        }
+    }
+
+    void writeData(DataOutput out) throws IOException {
+        out.writeInt(rowIndices.size());
+        if (rowIndices.size() > 0) {
+            int n = rowIndices.get(0).length;
+            out.writeInt(n);
+
+            if (this.useDictionary == true) {
+                dict.write(out);
+                for (int i = 0; i < rowIndices.size(); i++) {
+                    int[] row = rowIndices.get(i);
+                    for (int j = 0; j < n; j++) {
+                        out.writeInt(row[j]);
+                    }
+                }
+
+            } else {
+                for (int i = 0; i < rowIndices.size(); i++) {
+                    int[] row = rowIndices.get(i);
+                    for (int j = 0; j < n; j++) {
+                        // NULL_STR is tricky, but we don't want to break the current snapshots
+                        out.writeUTF(dict.getValueFromId(row[j]) == null ? NULL_STR : dict.getValueFromId(row[j]));
+                    }
+                }
+            }
+        }
+    }
+
+    void readData(DataInput in) throws IOException {
+        int rowNum = in.readInt();
+        if (rowNum > 0) {
+            int n = in.readInt();
+            rowIndices = new ArrayList<int[]>(rowNum);
+
+            if (this.useDictionary == true) {
+                this.dict = new TrieDictionary<String>();
+                dict.readFields(in);
+
+                for (int i = 0; i < rowNum; i++) {
+                    int[] row = new int[n];
+                    this.rowIndices.add(row);
+                    for (int j = 0; j < n; j++) {
+                        row[j] = in.readInt();
+                    }
+                }
+            } else {
+                List<String[]> rows = new ArrayList<String[]>(rowNum);
+                TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter());
+
+                for (int i = 0; i < rowNum; i++) {
+                    String[] row = new String[n];
+                    rows.add(row);
+                    for (int j = 0; j < n; j++) {
+                        row[j] = in.readUTF();
+                        // NULL_STR is tricky, but we don't want to break the current snapshots
+                        if (row[j].equals(NULL_STR))
+                            row[j] = null;
+                        
+                        b.addValue(row[j]);
+                    }
+                }
+                this.dict = b.build(0);
+                for (String[] row : rows) {
+                    int[] rowIndex = new int[n];
+                    for (int i = 0; i < n; i++) {
+                        rowIndex[i] = dict.getIdFromValue(row[i]);
+                    }
+                    this.rowIndices.add(rowIndex);
+                }
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java
new file mode 100644
index 0000000..f5663a5
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTableSerializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.JsonUtil;
+
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @author yangli9
+ * 
+ */
+public class SnapshotTableSerializer implements Serializer<SnapshotTable> {
+
+    public static final SnapshotTableSerializer FULL_SERIALIZER = new SnapshotTableSerializer(false);
+    public static final SnapshotTableSerializer INFO_SERIALIZER = new SnapshotTableSerializer(true);
+
+    private boolean infoOnly;
+
+    SnapshotTableSerializer(boolean infoOnly) {
+        this.infoOnly = infoOnly;
+    }
+
+    @Override
+    public void serialize(SnapshotTable obj, DataOutputStream out) throws IOException {
+        String json = JsonUtil.writeValueAsIndentString(obj);
+        out.writeUTF(json);
+
+        if (infoOnly == false)
+            obj.writeData(out);
+    }
+
+    @Override
+    public SnapshotTable deserialize(DataInputStream in) throws IOException {
+        String json = in.readUTF();
+        SnapshotTable obj = JsonUtil.readValue(json, SnapshotTable.class);
+
+        if (infoOnly == false)
+            obj.readData(in);
+
+        return obj;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/test/java/org/apache/kylin/dict/DateStrDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DateStrDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DateStrDictionaryTest.java
new file mode 100644
index 0000000..81db5df
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DateStrDictionaryTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import static org.junit.Assert.*;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class DateStrDictionaryTest {
+
+    DateStrDictionary dict;
+
+    @Before
+    public void setup() {
+        dict = new DateStrDictionary();
+    }
+
+    @Test
+    public void testMinMaxId() {
+        assertEquals(0, dict.getIdFromValue("0000-01-01"));
+        assertEquals(DateStrDictionary.ID_9999_12_31, dict.getIdFromValue("9999-12-31"));
+
+        try {
+            dict.getValueFromId(-2); // -1 is id for NULL
+            fail("IllegalArgumentException expected");
+        } catch (IllegalArgumentException e) {
+            // good
+        }
+        
+        try {
+            dict.getValueFromId(DateStrDictionary.ID_9999_12_31 + 1);
+            fail("IllegalArgumentException expected");
+        } catch (IllegalArgumentException e) {
+            // good
+        }
+        
+        try {
+            dict.getIdFromValue("10000-1-1");
+            fail("IllegalArgumentException expected");
+        } catch (IllegalArgumentException e) {
+            // good
+        }
+    }
+
+    @Test
+    public void testNull() {
+        int nullId = dict.getIdFromValue(null);
+        assertNull(dict.getValueFromId(nullId));
+        int nullId2 = dict.getIdFromValueBytes(null, 0, 0);
+        assertEquals(dict.getValueBytesFromId(nullId2, null, 0), -1);
+        assertEquals(nullId, nullId2);
+    }
+
+    @Test
+    public void test() {
+        checkPair("0001-01-01");
+        checkPair("1970-01-02");
+        checkPair("1975-06-24");
+        checkPair("2024-10-04");
+        checkPair("9999-12-31");
+    }
+
+    @Test
+    public void testIllegalArgument() {
+        try {
+            dict.getIdFromValue("abcd");
+            fail("IllegalArgumentException expected");
+        } catch (IllegalArgumentException e) {
+            // good
+        }
+
+        try {
+            dict.getValueFromId(-2);
+            fail("IllegalArgumentException expected");
+        } catch (IllegalArgumentException e) {
+            // good
+        }
+    }
+
+    private void checkPair(String dateStr) {
+        int id = dict.getIdFromValue(dateStr);
+        String dateStrBack = dict.getValueFromId(id);
+        assertEquals(dateStr, dateStrBack);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java
new file mode 100644
index 0000000..86fa635
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import java.io.File;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.dict.lookup.FileTable;
+import org.apache.kylin.dict.lookup.LookupBytesTable;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * @author yangli9
+ */
+public class LookupTableTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+        TableDesc siteTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("EDW.TEST_SITES");
+        TableDesc categoryTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.test_category_groupings");
+        LookupBytesTable lookup;
+
+        System.out.println("============================================================================");
+
+        File f = new File(LOCALMETA_TEST_DATA + "/data/EDW.TEST_SITES.csv");
+        lookup = new LookupBytesTable(siteTable, new String[] { "SITE_ID" }, new FileTable("file://" + f.getAbsolutePath(), 10));
+        lookup.dump();
+
+        System.out.println("============================================================================");
+
+        f = new File(LOCALMETA_TEST_DATA + "/data/DEFAULT.TEST_CATEGORY_GROUPINGS.csv");
+        lookup = new LookupBytesTable(categoryTable, new String[] { "leaf_categ_id", "site_id" }, new FileTable("file://" + f.getAbsolutePath(), 36));
+        lookup.dump();
+
+        System.out.println("============================================================================");
+
+        ByteArray k1 = new ByteArray(Bytes.toBytes("533"));
+        ByteArray k2 = new ByteArray(Bytes.toBytes("0"));
+        Array<ByteArray> key = new Array<ByteArray>(new ByteArray[] { k1, k2 });
+        System.out.println(lookup.getRow(key));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
new file mode 100644
index 0000000..2bc8c4f
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.metadata.model.DataType;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class NumberDictionaryTest {
+
+    NumberDictionary.NumberBytesCodec codec = new NumberDictionary.NumberBytesCodec();
+    Random rand = new Random();
+
+
+    @Test
+    public void testEmptyInput() {
+        String[] ints = new String[] { "", "0", "5", "100", "13" };
+        Collection<byte[]> intBytes = new ArrayList<byte[]>();
+        for (String s : ints) {
+            intBytes.add((s == null) ? null : Bytes.toBytes(s));
+        }
+        
+        // check "" is treated as NULL, not a code of dictionary
+        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance("integer"), intBytes);
+        assertEquals(4, dict.getSize());
+
+        final int id = ((NumberDictionary<String>) dict).getIdFromValue("");
+        assertEquals(id, dict.nullId());
+    }
+
+    
+    @Test
+    public void testNumberEncode() {
+        checkCodec("12345", "00000000000012345");
+        checkCodec("12345.123", "00000000000012345.123");
+        checkCodec("-12345", "-9999999999987654;");
+        checkCodec("-12345.123", "-9999999999987654.876;");
+        checkCodec("0", "00000000000000000");
+        checkCodec("0.0", "00000000000000000.0");
+    }
+
+    private void checkCodec(String number, String code) {
+        assertEquals(code, encodeNumber(number));
+        assertEquals(number, decodeNumber(code));
+    }
+
+    private String decodeNumber(String code) {
+        byte[] buf = Bytes.toBytes(code);
+        System.arraycopy(buf, 0, codec.buf, 0, buf.length);
+        codec.bufOffset = 0;
+        codec.bufLen = buf.length;
+        int len = codec.decodeNumber(buf, 0);
+        return Bytes.toString(buf, 0, len);
+    }
+
+    private String encodeNumber(String number) {
+        byte[] num1 = Bytes.toBytes(number);
+        codec.encodeNumber(num1, 0, num1.length);
+        return Bytes.toString(codec.buf, codec.bufOffset, codec.bufLen);
+    }
+
+    @Test
+    public void testDictionary() {
+        int n = 100;
+
+        Set<BigDecimal> set = Sets.newHashSet();
+        NumberDictionaryBuilder<String> builder = new NumberDictionaryBuilder<String>(new StringBytesConverter());
+        for (int i = 0; i < n; i++) {
+            String num = randNumber();
+            if (set.add(new BigDecimal(num))) {
+                builder.addValue(num);
+            }
+        }
+
+        List<BigDecimal> sorted = Lists.newArrayList();
+        sorted.addAll(set);
+        Collections.sort(sorted);
+
+        // test exact match
+        NumberDictionary<String> dict = builder.build(0);
+        for (int i = 0; i < sorted.size(); i++) {
+            String dictNum = dict.getValueFromId(i);
+            System.out.println(sorted.get(i) + "\t" + dictNum);
+        }
+        
+        for (int i = 0; i < sorted.size(); i++) {
+            String dictNum = dict.getValueFromId(i);
+            assertEquals(sorted.get(i), new BigDecimal(dictNum));
+        }
+
+        // test rounding
+        for (int i = 0; i < n * 50; i++) {
+            String randStr = randNumber();
+            BigDecimal rand = new BigDecimal(randStr);
+            int binarySearch = Collections.binarySearch(sorted, rand);
+            if (binarySearch >= 0)
+                continue;
+            int insertion = -(binarySearch + 1);
+            int expectedLowerId = insertion - 1;
+            int expectedHigherId = insertion;
+            // System.out.println("-- " + randStr + ", " + expectedLowerId +
+            // ", " + expectedHigherId);
+
+            if (expectedLowerId < 0) {
+                try {
+                    dict.getIdFromValue(randStr, -1);
+                    fail();
+                } catch (IllegalArgumentException ex) {
+                    // expect
+                }
+            } else {
+                assertEquals(expectedLowerId, dict.getIdFromValue(randStr, -1));
+            }
+
+            if (expectedHigherId >= sorted.size()) {
+                try {
+                    dict.getIdFromValue(randStr, 1);
+                    fail();
+                } catch (IllegalArgumentException ex) {
+                    // expect
+                }
+            } else {
+                assertEquals(expectedHigherId, dict.getIdFromValue(randStr, 1));
+            }
+        }
+    }
+
+    private String randNumber() {
+        int digits1 = rand.nextInt(10);
+        int digits2 = rand.nextInt(3);
+        int sign = rand.nextInt(2);
+        if (digits1 == 0 && digits2 == 0) {
+            return randNumber();
+        }
+        StringBuilder buf = new StringBuilder();
+        if (sign == 1)
+            buf.append("-");
+        for (int i = 0; i < digits1; i++)
+            buf.append("" + rand.nextInt(10));
+        if (digits2 > 0) {
+            buf.append(".");
+            for (int i = 0; i < digits2; i++)
+                buf.append("" + rand.nextInt(9) + 1); // BigDecimal thinks 4.5
+                                                      // != 4.50, my god!
+        }
+        return buf.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
new file mode 100644
index 0000000..cfecaee
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.kylin.dict.lookup.FileTable;
+import org.apache.kylin.dict.lookup.FileTableReader;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class TableReaderTest {
+
+    @Test
+    public void testBasicReader() throws IOException {
+        File f = new File("src/test/resources/dict/DW_SITES");
+        FileTableReader reader = new FileTableReader("file://" + f.getAbsolutePath(), FileTable.DELIM_AUTO, 10);
+        while (reader.next()) {
+            assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", Arrays.toString(reader.getRow()));
+            break;
+        }
+        reader.close();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java b/core-dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
new file mode 100644
index 0000000..1e381d4
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/TimeStrDictionaryTests.java
@@ -0,0 +1,58 @@
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class TimeStrDictionaryTests {
+    TimeStrDictionary dict;
+
+    @Before
+    public void setup() {
+        dict = new TimeStrDictionary();
+    }
+
+    @Test
+    public void basicTest() {
+        int a = dict.getIdFromValue("1999-01-01");
+        int b = dict.getIdFromValue("1999-01-01 00:00:00");
+        int c = dict.getIdFromValue("1999-01-01 00:00:00.000");
+        int d = dict.getIdFromValue("1999-01-01 00:00:00.022");
+
+        Assert.assertEquals(a, b);
+        Assert.assertEquals(a, c);
+        Assert.assertEquals(a, d);
+    }
+
+    @Test
+    public void testEncodeDecode() {
+        encodeDecode("1999-01-12");
+        encodeDecode("2038-01-09");
+        encodeDecode("2038-01-08");
+        encodeDecode("1970-01-01");
+        encodeDecode("1970-01-02");
+
+        encodeDecode("1999-01-12 11:00:01");
+        encodeDecode("2038-01-09 01:01:02");
+        encodeDecode("2038-01-19 03:14:07");
+        encodeDecode("1970-01-01 23:22:11");
+        encodeDecode("1970-01-02 23:22:11");
+    }
+
+    @Test
+    public void testIllegal() {
+        Assert.assertEquals(-1, dict.getIdFromValue("2038-01-19 03:14:08"));
+    }
+
+    public void encodeDecode(String origin) {
+        int a = dict.getIdFromValue(origin);
+        String back = dict.getValueFromId(a);
+
+        String originChoppingMilis = DateFormat.formatToTimeWithoutMilliStr(DateFormat.stringToMillis(origin));
+        Assert.assertEquals(originChoppingMilis, back);
+    }
+
+}