You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/04/20 12:02:08 UTC

[3/4] kylin git commit: KYLIN-2506 Refactor Global Dictionary

http://git-wip-us.apache.org/repos/asf/kylin/blob/9b36e268/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index cda3c2b..8b41d58 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -18,9 +18,21 @@
 
 package org.apache.kylin.dict;
 
+import java.io.Closeable;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.dict.global.AppendTrieDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments.
@@ -28,29 +40,118 @@ import org.apache.kylin.common.util.Dictionary;
  * Created by sunyerui on 16/5/24.
  */
 public class GlobalDictionaryBuilder implements IDictionaryBuilder {
-    AppendTrieDictionary.Builder<String> builder;
-    int baseId;
+    private AppendTrieDictionaryBuilder builder;
+    private int baseId;
+
+    private DistributedLock lock;
+    private String sourceColumn;
+    private final String lockData = getServerName() + "_" + Thread.currentThread().getName();
+    private int counter;
+
+    private static Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
 
     @Override
     public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
         if (dictInfo == null) {
             throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
         }
-        this.builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir());
+
+        sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn();
+        lock(sourceColumn);
+
+        int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
+        this.builder = new AppendTrieDictionaryBuilder(dictInfo.getResourceDir(), maxEntriesPerSlice);
         this.baseId = baseId;
     }
-    
+
     @Override
     public boolean addValue(String value) {
-        if (value == null)
+        if (++counter % 1_000_000 == 0) {
+            if (lock.lockPath(getLockPath(sourceColumn), lockData)) {
+                logger.info("processed {} values for {}", counter, sourceColumn);
+            } else {
+                throw new RuntimeException("Failed to create global dictionary on " + sourceColumn + " This client doesn't keep the lock");
+            }
+        }
+
+        if (value == null) {
             return false;
-        
-        builder.addValue(value);
+        }
+
+        try {
+            builder.addValue(value);
+        } catch (Throwable e) {
+            lock.unlockPath(getLockPath(sourceColumn), lockData);
+            throw new RuntimeException(String.format("Failed to create global dictionary on %s ", sourceColumn), e);
+        }
+
         return true;
     }
-    
+
     @Override
     public Dictionary<String> build() throws IOException {
-        return builder.build(baseId);
+        try {
+            if (lock.lockPath(getLockPath(sourceColumn), lockData)) {
+                return builder.build(baseId);
+            }
+        } finally {
+            lock.unlockPath(getLockPath(sourceColumn), lockData);
+        }
+        return new AppendTrieDictionary<>();
+    }
+
+    private void lock(final String sourceColumn) throws IOException {
+        lock = KylinConfig.getInstanceFromEnv().getDistributedLock();
+
+        if (!lock.lockPath(getLockPath(sourceColumn), lockData)) {
+            logger.info("{} will wait the lock for {} ", lockData, sourceColumn);
+
+            final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(1);
+
+            Closeable watch = lock.watchPath(getWatchPath(sourceColumn), MoreExecutors.sameThreadExecutor(), new DistributedLock.Watcher() {
+                @Override
+                public void process(String path, String data) {
+                    if (!data.equalsIgnoreCase(lockData) && lock.lockPath(getLockPath(sourceColumn), lockData)) {
+                        try {
+                            bq.put("getLock");
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            });
+
+            long start = System.currentTimeMillis();
+
+            try {
+                bq.take();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            } finally {
+                watch.close();
+            }
+
+            logger.info("{} has waited the lock {} ms for {} ", lockData, (System.currentTimeMillis() - start), sourceColumn);
+        }
+    }
+
+    private static final String GLOBAL_DICT_LOCK_PATH = "/kylin/dict/lock";
+
+    private String getLockPath(String pathName) {
+        return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName + "/lock";
+    }
+
+    private String getWatchPath(String pathName) {
+        return GLOBAL_DICT_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+    }
+
+    private static String getServerName() {
+        String serverName = null;
+        try {
+            serverName = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            logger.error("fail to get the serverName");
+        }
+        return serverName;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/9b36e268/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictNode.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictNode.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictNode.java
new file mode 100644
index 0000000..ee3a2c2
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictNode.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict.global;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.dict.AppendTrieDictionary;
+import org.apache.kylin.dict.TrieDictionary;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+
+public class AppendDictNode {
+    public byte[] part;
+    public int id = -1;
+    public boolean isEndOfValue;
+    public ArrayList<AppendDictNode> children = new ArrayList<>();
+
+    public int nValuesBeneath;
+    public AppendDictNode parent;
+    public int childrenCount = 1;
+
+    AppendDictNode(byte[] value, boolean isEndOfValue) {
+        reset(value, isEndOfValue);
+    }
+
+    AppendDictNode(byte[] value, boolean isEndOfValue, ArrayList<AppendDictNode> children) {
+        reset(value, isEndOfValue, children);
+    }
+
+    void reset(byte[] value, boolean isEndOfValue) {
+        reset(value, isEndOfValue, new ArrayList<AppendDictNode>());
+    }
+
+    void reset(byte[] value, boolean isEndOfValue, ArrayList<AppendDictNode> children) {
+        this.part = value;
+        this.isEndOfValue = isEndOfValue;
+        clearChild();
+        for (AppendDictNode child : children) {
+            addChild(child);
+        }
+        this.id = -1;
+    }
+
+    void clearChild() {
+        this.children.clear();
+        int childrenCountDelta = this.childrenCount - 1;
+        for (AppendDictNode p = this; p != null; p = p.parent) {
+            p.childrenCount -= childrenCountDelta;
+        }
+    }
+
+    void addChild(AppendDictNode child) {
+        addChild(-1, child);
+    }
+
+    void addChild(int index, AppendDictNode child) {
+        child.parent = this;
+        if (index < 0) {
+            this.children.add(child);
+        } else {
+            this.children.add(index, child);
+        }
+        for (AppendDictNode p = this; p != null; p = p.parent) {
+            p.childrenCount += child.childrenCount;
+        }
+    }
+
+    private AppendDictNode removeChild(int index) {
+        AppendDictNode child = children.remove(index);
+        child.parent = null;
+        for (AppendDictNode p = this; p != null; p = p.parent) {
+            p.childrenCount -= child.childrenCount;
+        }
+        return child;
+    }
+
+    private AppendDictNode duplicateNode() {
+        AppendDictNode newChild = new AppendDictNode(part, false);
+        newChild.parent = parent;
+        if (parent != null) {
+            int index = parent.children.indexOf(this);
+            parent.addChild(index + 1, newChild);
+        }
+        return newChild;
+    }
+
+    public byte[] firstValue() {
+        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+        AppendDictNode p = this;
+        while (true) {
+            bytes.write(p.part, 0, p.part.length);
+            if (p.isEndOfValue || p.children.size() == 0) {
+                break;
+            }
+            p = p.children.get(0);
+        }
+        return bytes.toByteArray();
+    }
+
+    public static AppendDictNode splitNodeTree(final AppendDictNode splitNode) {
+        if (splitNode == null) {
+            return null;
+        }
+        AppendDictNode current = splitNode;
+        AppendDictNode p = current.parent;
+        while (p != null) {
+            int index = p.children.indexOf(current);
+            assert index != -1;
+            AppendDictNode newParent = p.duplicateNode();
+            for (int i = p.children.size() - 1; i >= index; i--) {
+                AppendDictNode child = p.removeChild(i);
+                newParent.addChild(0, child);
+            }
+            current = newParent;
+            p = p.parent;
+        }
+        return current;
+    }
+
+    public byte[] buildTrieBytes() {
+        Stats stats = Stats.stats(this);
+        int sizeChildOffset = stats.mbpn_sizeChildOffset;
+        int sizeId = stats.mbpn_sizeId;
+
+        // write head
+        byte[] head;
+        try {
+            ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+            DataOutputStream headOut = new DataOutputStream(byteBuf);
+            headOut.write(AppendTrieDictionary.HEAD_MAGIC);
+            headOut.writeShort(0); // head size, will back fill
+            headOut.writeInt(stats.mbpn_footprint); // body size
+            headOut.writeInt(stats.nValues);
+            headOut.write(sizeChildOffset);
+            headOut.write(sizeId);
+            headOut.close();
+            head = byteBuf.toByteArray();
+            BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2);
+        } catch (IOException e) {
+            throw new RuntimeException(e); // shall not happen, as we are
+        }
+
+        byte[] trieBytes = new byte[stats.mbpn_footprint + head.length];
+        System.arraycopy(head, 0, trieBytes, 0, head.length);
+
+        LinkedList<AppendDictNode> open = new LinkedList<AppendDictNode>();
+        IdentityHashMap<AppendDictNode, Integer> offsetMap = new IdentityHashMap<AppendDictNode, Integer>();
+
+        // write body
+        int o = head.length;
+        offsetMap.put(this, o);
+        o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes);
+        if (this.children.isEmpty() == false)
+            open.addLast(this);
+
+        while (open.isEmpty() == false) {
+            AppendDictNode parent = open.removeFirst();
+            build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
+            for (int i = 0; i < parent.children.size(); i++) {
+                AppendDictNode c = parent.children.get(i);
+                boolean isLastChild = (i == parent.children.size() - 1);
+                offsetMap.put(c, o);
+                o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes);
+                if (c.children.isEmpty() == false)
+                    open.addLast(c);
+            }
+        }
+
+        if (o != trieBytes.length)
+            throw new RuntimeException();
+        return trieBytes;
+    }
+
+    private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
+        int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
+        BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
+        trieBytes[parentOffset] |= flags;
+    }
+
+    private int build_writeNode(AppendDictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) {
+        int o = offset;
+
+        // childOffset
+        if (isLastChild)
+            trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
+        if (n.isEndOfValue)
+            trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
+        o += sizeChildOffset;
+
+        // nValueBytes
+        if (n.part.length > 255)
+            throw new RuntimeException("Value length is " + n.part.length + " and larger than 255: " + Bytes.toStringBinary(n.part));
+        BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
+        o++;
+
+        // valueBytes
+        System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
+        o += n.part.length;
+
+        if (n.isEndOfValue) {
+            checkValidId(n.id);
+            BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId);
+            o += sizeId;
+        }
+
+        return o;
+    }
+
+    // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
+    private void checkValidId(int id) {
+        if (id == 0 || id == -1) {
+            throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue()));
+    }
+
+    static class Stats {
+        public interface Visitor {
+            void visit(AppendDictNode n, int level);
+        }
+
+        private static void traverseR(AppendDictNode node, Visitor visitor, int level) {
+            visitor.visit(node, level);
+            for (AppendDictNode c : node.children)
+                traverseR(c, visitor, level + 1);
+        }
+
+        private static void traversePostOrderR(AppendDictNode node, Visitor visitor, int level) {
+            for (AppendDictNode c : node.children)
+                traversePostOrderR(c, visitor, level + 1);
+            visitor.visit(node, level);
+        }
+
+        public int nValues; // number of values in total
+        public int nValueBytesPlain; // number of bytes for all values
+        // uncompressed
+        public int nValueBytesCompressed; // number of values bytes in Trie
+        // (compressed)
+        public int maxValueLength; // size of longest value in bytes
+
+        // the trie is multi-byte-per-node
+        public int mbpn_nNodes; // number of nodes in trie
+        public int mbpn_trieDepth; // depth of trie
+        public int mbpn_maxFanOut; // the maximum no. children
+        public int mbpn_nChildLookups; // number of child lookups during lookup
+        // every value once
+        public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every
+        // value once
+        public int mbpn_sizeValueTotal; // the sum of value space in all nodes
+        public int mbpn_sizeNoValueBytes; // size of field noValueBytes
+        public int mbpn_sizeChildOffset; // size of field childOffset, points to
+        // first child in flattened array
+        public int mbpn_sizeId; // size of id value, always be 4
+        public int mbpn_footprint; // MBPN footprint in bytes
+
+        /**
+         * out print some statistics of the trie and the dictionary built from it
+         */
+        public static Stats stats(AppendDictNode root) {
+            // calculate nEndValueBeneath
+            traversePostOrderR(root, new Visitor() {
+                @Override
+                public void visit(AppendDictNode n, int level) {
+                    n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
+                    for (AppendDictNode c : n.children)
+                        n.nValuesBeneath += c.nValuesBeneath;
+                }
+            }, 0);
+
+            // run stats
+            final Stats s = new Stats();
+            final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
+            traverseR(root, new Visitor() {
+                @Override
+                public void visit(AppendDictNode n, int level) {
+                    if (n.isEndOfValue)
+                        s.nValues++;
+                    s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
+                    s.nValueBytesCompressed += n.part.length;
+                    s.mbpn_nNodes++;
+                    if (s.mbpn_trieDepth < level + 1)
+                        s.mbpn_trieDepth = level + 1;
+                    if (n.children.size() > 0) {
+                        if (s.mbpn_maxFanOut < n.children.size())
+                            s.mbpn_maxFanOut = n.children.size();
+                        int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
+                        s.mbpn_nChildLookups += childLookups;
+                        s.mbpn_nTotalFanOut += childLookups * n.children.size();
+                    }
+
+                    if (level < lenAtLvl.size())
+                        lenAtLvl.set(level, n.part.length);
+                    else
+                        lenAtLvl.add(n.part.length);
+                    int lenSoFar = 0;
+                    for (int i = 0; i <= level; i++)
+                        lenSoFar += lenAtLvl.get(i);
+                    if (lenSoFar > s.maxValueLength)
+                        s.maxValueLength = lenSoFar;
+                }
+            }, 0);
+
+            // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode
+            s.mbpn_sizeId = 4;
+            s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId;
+            s.mbpn_sizeNoValueBytes = 1;
+            s.mbpn_sizeChildOffset = 5;
+            s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset);
+            while (true) { // minimize the offset size to match the footprint
+                int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1);
+                // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
+                // expand t to long before *4, avoiding exceed Integer.MAX_VALUE
+                if (BytesUtil.sizeForValue((long) t * 4) <= s.mbpn_sizeChildOffset - 1) {
+                    s.mbpn_sizeChildOffset--;
+                    s.mbpn_footprint = t;
+                } else
+                    break;
+            }
+
+            return s;
+        }
+
+        /**
+         * out print trie for debug
+         */
+        public void print(AppendDictNode root) {
+            print(root, System.out);
+        }
+
+        public void print(AppendDictNode root, final PrintStream out) {
+            traverseR(root, new Visitor() {
+                @Override
+                public void visit(AppendDictNode n, int level) {
+                    try {
+                        for (int i = 0; i < level; i++)
+                            out.print("  ");
+                        out.print(new String(n.part, "UTF-8"));
+                        out.print(" - ");
+                        if (n.nValuesBeneath > 0)
+                            out.print(n.nValuesBeneath);
+                        if (n.isEndOfValue)
+                            out.print("* [" + n.id + "]");
+                        out.print("\n");
+                    } catch (UnsupportedEncodingException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }, 0);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/9b36e268/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSlice.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSlice.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSlice.java
new file mode 100644
index 0000000..4e820e0
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSlice.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict.global;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class AppendDictSlice {
+    static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict"
+    static final int HEAD_SIZE_I = HEAD_MAGIC.length;
+    static final int BIT_IS_LAST_CHILD = 0x80;
+    static final int BIT_IS_END_OF_VALUE = 0x40;
+
+    private byte[] trieBytes;
+
+    // non-persistent part
+    transient private int headSize;
+    transient private int bodyLen;
+    transient private int sizeChildOffset;
+
+    transient private int nValues;
+    transient private int sizeOfId;
+    // mask MUST be long, since childOffset maybe 5 bytes at most
+    transient private long childOffsetMask;
+    transient private int firstByteOffset;
+
+    public AppendDictSlice(byte[] bytes) {
+        this.trieBytes = bytes;
+        init();
+    }
+
+    private void init() {
+        if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0)
+            throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+        try {
+            DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I));
+            this.headSize = headIn.readShort();
+            this.bodyLen = headIn.readInt();
+            this.nValues = headIn.readInt();
+            this.sizeChildOffset = headIn.read();
+            this.sizeOfId = headIn.read();
+
+            this.childOffsetMask = ~(((long) (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8));
+            this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte
+        } catch (Exception e) {
+            if (e instanceof RuntimeException)
+                throw (RuntimeException) e;
+            else
+                throw new RuntimeException(e);
+        }
+    }
+
+    public static AppendDictSlice deserializeFrom(DataInput in) throws IOException {
+        byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE / Byte.SIZE + Integer.SIZE / Byte.SIZE];
+        in.readFully(headPartial);
+
+        if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0)
+            throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+        DataInputStream headIn = new DataInputStream(//
+                new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
+        int headSize = headIn.readShort();
+        int bodyLen = headIn.readInt();
+        headIn.close();
+
+        byte[] all = new byte[headSize + bodyLen];
+        System.arraycopy(headPartial, 0, all, 0, headPartial.length);
+        in.readFully(all, headPartial.length, all.length - headPartial.length);
+
+        return new AppendDictSlice(all);
+    }
+
+    public byte[] getFirstValue() {
+        int nodeOffset = headSize;
+        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+        while (true) {
+            int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1);
+            bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen);
+            if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) {
+                break;
+            }
+            nodeOffset = headSize + (int) (BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
+            if (nodeOffset == headSize) {
+                break;
+            }
+        }
+        return bytes.toByteArray();
+    }
+
+    /**
+     * returns a code point from [0, nValues), preserving order of value
+     *
+     * @param n            -- the offset of current node
+     * @param inp          -- input value bytes to lookup
+     * @param o            -- offset in the input value bytes matched so far
+     * @param inpEnd       -- end of input
+     * @param roundingFlag -- =0: return -1 if not found
+     *                     -- <0: return closest smaller if not found, return -1
+     *                     -- >0: return closest bigger if not found, return nValues
+     */
+    private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
+        while (true) {
+            // match the current node
+            int p = n + firstByteOffset; // start of node's value
+            int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value
+            for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0]
+                if (trieBytes[p] != inp[o]) {
+                    return -1; // mismatch
+                }
+            }
+
+            // node completely matched, is input all consumed?
+            boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+            if (o == inpEnd) {
+                return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1;
+            }
+
+            // find a child to continue
+            int c = headSize + (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
+            if (c == headSize) // has no children
+                return -1;
+            byte inpByte = inp[o];
+            int comp;
+            while (true) {
+                p = c + firstByteOffset;
+                comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte);
+                if (comp == 0) { // continue in the matching child, reset n and loop again
+                    n = c;
+                    break;
+                } else if (comp < 0) { // try next child
+                    if (checkFlag(c, BIT_IS_LAST_CHILD))
+                        return -1;
+                    c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0);
+                } else { // children are ordered by their first value byte
+                    return -1;
+                }
+            }
+        }
+    }
+
+    private boolean checkFlag(int offset, int bit) {
+        return (trieBytes[offset] & bit) > 0;
+    }
+
+    public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+        int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
+        return id;
+    }
+
+    public AppendDictNode rebuildTrieTree() {
+        return rebuildTrieTreeR(headSize, null);
+    }
+
+    private AppendDictNode rebuildTrieTreeR(int n, AppendDictNode parent) {
+        AppendDictNode root = null;
+        while (true) {
+            int p = n + firstByteOffset;
+            int childOffset = (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
+            int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+            boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+
+            byte[] value = new byte[parLen];
+            System.arraycopy(trieBytes, p, value, 0, parLen);
+
+            AppendDictNode node = new AppendDictNode(value, isEndOfValue);
+            if (isEndOfValue) {
+                int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+                node.id = id;
+            }
+
+            if (parent == null) {
+                root = node;
+            } else {
+                parent.addChild(node);
+            }
+
+            if (childOffset != 0) {
+                rebuildTrieTreeR(childOffset + headSize, node);
+            }
+
+            if (checkFlag(n, BIT_IS_LAST_CHILD)) {
+                break;
+            } else {
+                n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+            }
+        }
+        return root;
+    }
+
+    public boolean doCheck() {
+        int offset = headSize;
+        HashSet<Integer> parentSet = new HashSet<>();
+        boolean lastChild = false;
+
+        while (offset < trieBytes.length) {
+            if (lastChild) {
+                boolean contained = parentSet.remove(offset - headSize);
+                // Can't find parent, the data is corrupted
+                if (!contained) {
+                    return false;
+                }
+                lastChild = false;
+            }
+            int p = offset + firstByteOffset;
+            int childOffset = (int) (BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask);
+            int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+            boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE);
+
+            // Copy value overflow, the data is corrupted
+            if (trieBytes.length < p + parLen) {
+                return false;
+            }
+
+            // Check id is fine
+            if (isEndOfValue) {
+                BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+            }
+
+            // Record it if has children
+            if (childOffset != 0) {
+                parentSet.add(childOffset);
+            }
+
+            // brothers done, move to next parent
+            if (checkFlag(offset, BIT_IS_LAST_CHILD)) {
+                lastChild = true;
+            }
+
+            // move to next node
+            offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+        }
+
+        // ParentMap is empty, meaning all nodes has parent, the data is correct
+        return parentSet.isEmpty();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen);
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(trieBytes);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof AppendDictSlice)) {
+            return false;
+        }
+        AppendDictSlice that = (AppendDictSlice) o;
+        return Arrays.equals(this.trieBytes, that.trieBytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9b36e268/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSliceKey.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSliceKey.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSliceKey.java
new file mode 100644
index 0000000..323fe6b
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendDictSliceKey.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict.global;
+
+import org.apache.kylin.common.util.Bytes;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class AppendDictSliceKey implements Comparable<AppendDictSliceKey> {
+    static final AppendDictSliceKey START_KEY = AppendDictSliceKey.wrap(new byte[0]);
+
+    byte[] key;
+
+    public static AppendDictSliceKey wrap(byte[] key) {
+        AppendDictSliceKey dictKey = new AppendDictSliceKey();
+        dictKey.key = key;
+        return dictKey;
+    }
+
+    @Override
+    public String toString() {
+        return Bytes.toStringBinary(key);
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(key);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o instanceof AppendDictSliceKey) {
+            AppendDictSliceKey that = (AppendDictSliceKey) o;
+            return Arrays.equals(this.key, that.key);
+        }
+        return false;
+    }
+
+    @Override
+    public int compareTo(AppendDictSliceKey that) {
+        return Bytes.compareTo(key, that.key);
+    }
+
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(key.length);
+        out.write(key);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        key = new byte[in.readInt()];
+        in.readFully(key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9b36e268/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java
new file mode 100644
index 0000000..90d65b6
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryBuilder.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict.global;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.dict.AppendTrieDictionary;
+import org.apache.kylin.dict.BytesConverter;
+import org.apache.kylin.dict.StringBytesConverter;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeMap;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class AppendTrieDictionaryBuilder {
+
+    private final String baseDir;
+    private final String workingDir;
+    private final int maxEntriesPerSlice;
+
+    private GlobalDictStore store;
+    private int maxId;
+    private int maxValueLength;
+    private int nValues;
+    private BytesConverter bytesConverter;
+    private TreeMap<AppendDictSliceKey, String> sliceFileMap = new TreeMap<>(); // slice key -> slice file name
+
+    private AppendDictSliceKey curKey;
+    private AppendDictNode curNode;
+
+    public AppendTrieDictionaryBuilder(String resourceDir, int maxEntriesPerSlice) throws IOException {
+        this.baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourceDir + "/";
+        this.workingDir = this.baseDir + "/working";
+        this.maxEntriesPerSlice = maxEntriesPerSlice;
+        init();
+    }
+
+    public synchronized void init() throws IOException {
+        this.store = new GlobalDictHDFSStore(baseDir);
+        store.prepareForWrite(workingDir);
+
+        Long[] versions = store.listAllVersions();
+
+        if (versions.length == 0) { // build dict for the first time
+            this.maxId = 0;
+            this.maxValueLength = 0;
+            this.nValues = 0;
+            this.bytesConverter = new StringBytesConverter();
+
+        } else { // append values to last version
+            GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]);
+            this.maxId = metadata.maxId;
+            this.maxValueLength = metadata.maxValueLength;
+            this.nValues = metadata.nValues;
+            this.bytesConverter = metadata.bytesConverter;
+            this.sliceFileMap = new TreeMap<>(metadata.sliceFileMap);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void addValue(String value) throws IOException {
+        byte[] valueBytes = bytesConverter.convertToBytes(value);
+
+        if (sliceFileMap.isEmpty()) {
+            curNode = new AppendDictNode(new byte[0], false);
+            sliceFileMap.put(AppendDictSliceKey.START_KEY, null);
+        }
+        checkState(sliceFileMap.firstKey().equals(AppendDictSliceKey.START_KEY), "first key should be \"\", but got \"%s\"", sliceFileMap.firstKey());
+
+        AppendDictSliceKey nextKey = sliceFileMap.floorKey(AppendDictSliceKey.wrap(valueBytes));
+
+        if (curKey != null && !nextKey.equals(curKey)) {
+            // you may suppose nextKey>=curKey, but nextKey<curKey could happen when a node splits.
+            // for example, suppose we have curNode [1-10], and add value "2" triggers split:
+            //   first half [1-5] is flushed out, make second half [6-10] curNode and "6" curKey.
+            // then value "3" is added, now we got nextKey "1" smaller than curKey "6", surprise!
+            // in this case, we need to flush [6-10] and read [1-5] back.
+            flushCurrentNode();
+            curNode = null;
+        }
+        if (curNode == null) { // read next slice
+            AppendDictSlice slice = store.readSlice(workingDir, sliceFileMap.get(nextKey));
+            curNode = slice.rebuildTrieTree();
+        }
+        curKey = nextKey;
+
+        addValueR(curNode, valueBytes, 0);
+
+        // split slice if it's too large
+        if (curNode.childrenCount > maxEntriesPerSlice) {
+            AppendDictNode newRoot = splitNodeTree(curNode);
+            flushCurrentNode();
+            curNode = newRoot;
+            curKey = AppendDictSliceKey.wrap(newRoot.firstValue());
+            sliceFileMap.put(curKey, null);
+        }
+
+        maxValueLength = Math.max(maxValueLength, valueBytes.length);
+    }
+
+    public AppendTrieDictionary build(int baseId) throws IOException {
+        if (curNode != null) {
+            flushCurrentNode();
+        }
+
+        GlobalDictMetadata metadata = new GlobalDictMetadata(baseId, this.maxId, this.maxValueLength, this.nValues, this.bytesConverter, sliceFileMap);
+        store.commit(workingDir, metadata);
+
+        AppendTrieDictionary dict = new AppendTrieDictionary();
+        dict.init(this.baseDir);
+        return dict;
+    }
+
+    private void flushCurrentNode() throws IOException {
+        String newSliceFile = store.writeSlice(workingDir, curKey, curNode);
+        String oldSliceFile = sliceFileMap.put(curKey, newSliceFile);
+        if (oldSliceFile != null) {
+            store.deleteSlice(workingDir, oldSliceFile);
+        }
+    }
+
+    private void addValueR(AppendDictNode node, byte[] value, int start) {
+        // match the value part of current node
+        int i = 0, j = start;
+        int n = node.part.length, nn = value.length;
+        int comp = 0;
+        for (; i < n && j < nn; i++, j++) {
+            comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
+            if (comp != 0)
+                break;
+        }
+
+        if (j == nn) {
+            // if value fully matched within the current node
+            if (i == n) {
+                // on first match, mark end of value and assign an ID
+                if (!node.isEndOfValue) {
+                    node.id = createNextId();
+                    node.isEndOfValue = true;
+                }
+            } else {
+                // otherwise, split the current node into two
+                AppendDictNode c = new AppendDictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+                c.id = node.id;
+                node.reset(BytesUtil.subarray(node.part, 0, i), true);
+                node.addChild(c);
+                node.id = createNextId();
+            }
+            return;
+        }
+
+        // if partially matched the current, split the current node, add the new
+        // value, make a 3-way
+        if (i < n) {
+            AppendDictNode c1 = new AppendDictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+            c1.id = node.id;
+            AppendDictNode c2 = addNodeMaybeOverflow(value, j, nn);
+            node.reset(BytesUtil.subarray(node.part, 0, i), false);
+            if (comp < 0) {
+                node.addChild(c1);
+                node.addChild(c2);
+            } else {
+                node.addChild(c2);
+                node.addChild(c1);
+            }
+            return;
+        }
+
+        // out matched the current, binary search the next byte for a child node
+        // to continue
+        byte lookfor = value[j];
+        int lo = 0;
+        int hi = node.children.size() - 1;
+        int mid = 0;
+        boolean found = false;
+        comp = 0;
+        while (!found && lo <= hi) {
+            mid = lo + (hi - lo) / 2;
+            AppendDictNode c = node.children.get(mid);
+            comp = BytesUtil.compareByteUnsigned(lookfor, c.part[0]);
+            if (comp < 0)
+                hi = mid - 1;
+            else if (comp > 0)
+                lo = mid + 1;
+            else
+                found = true;
+        }
+        if (found) {
+            // found a child node matching the first byte, continue in that child
+            addValueR(node.children.get(mid), value, j);
+        } else {
+            // otherwise, make the value a new child
+            AppendDictNode c = addNodeMaybeOverflow(value, j, nn);
+            node.addChild(comp <= 0 ? mid : mid + 1, c);
+        }
+    }
+
+    private int createNextId() {
+        int id = ++maxId;
+        checkValidId(id);
+        nValues++;
+        return id;
+    }
+
+    // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
+    private void checkValidId(int id) {
+        if (id == 0 || id == -1) {
+            throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
+        }
+    }
+
+    // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree
+    private AppendDictNode addNodeMaybeOverflow(byte[] value, int start, int end) {
+        AppendDictNode head = null;
+        AppendDictNode current = null;
+        for (; start + 255 < end; start += 255) {
+            AppendDictNode c = new AppendDictNode(BytesUtil.subarray(value, start, start + 255), false);
+            if (head == null) {
+                head = c;
+                current = c;
+            } else {
+                current.addChild(c);
+                current = c;
+            }
+        }
+        AppendDictNode last = new AppendDictNode(BytesUtil.subarray(value, start, end), true);
+        last.id = createNextId();
+        if (head == null) {
+            head = last;
+        } else {
+            current.addChild(last);
+        }
+        return head;
+    }
+
+    private AppendDictNode splitNodeTree(AppendDictNode root) {
+        AppendDictNode parent = root;
+        int childCountToSplit = (int) (maxEntriesPerSlice * 1.0 / 2);
+        while (true) {
+            List<AppendDictNode> children = parent.children;
+            if (children.size() == 0) {
+                break;
+            }
+            if (children.size() == 1) {
+                parent = children.get(0);
+            } else {
+                for (int i = children.size() - 1; i >= 0; i--) {
+                    parent = children.get(i);
+                    if (childCountToSplit > children.get(i).childrenCount) {
+                        childCountToSplit -= children.get(i).childrenCount;
+                    } else {
+                        childCountToSplit--;
+                        break;
+                    }
+                }
+            }
+        }
+        return AppendDictNode.splitNodeTree(parent);
+    }
+
+    // Only used for test
+    void setMaxId(int id) {
+        this.maxId = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9b36e268/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryChecker.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryChecker.java
new file mode 100644
index 0000000..94b6e9d
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/AppendTrieDictionaryChecker.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict.global;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+
+import static org.apache.kylin.dict.global.GlobalDictHDFSStore.BUFFER_SIZE;
+
+/**
+ * Created by sunyerui on 16/11/15.
+ */
+public class AppendTrieDictionaryChecker {
+
+    public boolean runChecker(String baseDir) throws IOException {
+        Path basePath = new Path(baseDir);
+        FileSystem fs = HadoopUtil.getFileSystem(basePath);
+        List<Path> sliceList = new ArrayList<>();
+        List<Path> corruptedSliceList = new ArrayList<>();
+        listDictSlicePath(fs, fs.getFileStatus(basePath), sliceList);
+
+        for (Path path : sliceList) {
+            if (!doCheck(fs, path)) {
+                System.out.println("AppendDict Slice " + path + " corrupted");
+                corruptedSliceList.add(path);
+            } else {
+                System.out.println("AppendDict Slice " + path + " is right");
+            }
+        }
+
+        if (corruptedSliceList.isEmpty()) {
+            System.out.println("ALL AppendDict Slices is right");
+            return true;
+        } else {
+            System.out.println("Some AppendDict Slice(s) corrupted: ");
+            for (Path path : corruptedSliceList) {
+                System.out.println(path.toString());
+            }
+            return false;
+        }
+    }
+
+    public void listDictSlicePath(FileSystem fs, FileStatus path, List<Path> list) throws IOException {
+        if (path.isDirectory()) {
+            for (FileStatus status : fs.listStatus(path.getPath())) {
+                listDictSlicePath(fs, status, list);
+            }
+        } else {
+            if (path.getPath().getName().startsWith(GlobalDictHDFSStore.IndexFormatV1.SLICE_PREFIX)) {
+                list.add(path.getPath());
+            }
+        }
+    }
+
+    public boolean doCheck(FileSystem fs, Path filePath) {
+        try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) {
+            AppendDictSlice slice = AppendDictSlice.deserializeFrom(input);
+            return slice.doCheck();
+        } catch (Exception e) {
+            return false;
+        } catch (Error e) {
+            return false;
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        String path = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/";
+        if (args.length > 0) {
+            path = args[0];
+        }
+        System.out.println("Recursive Check AppendTrieDictionary Slices in path " + path);
+        AppendTrieDictionaryChecker checker = new AppendTrieDictionaryChecker();
+        if (checker.runChecker(path)) {
+            System.exit(0);
+        } else {
+            System.exit(-1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9b36e268/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java
new file mode 100644
index 0000000..b30d5b9
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictHDFSStore.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict.global;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.dict.BytesConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class GlobalDictHDFSStore extends GlobalDictStore {
+
+    static final Logger logger = LoggerFactory.getLogger(GlobalDictHDFSStore.class);
+    static final String V1_INDEX_NAME = ".index";
+    static final String V2_INDEX_NAME = ".index_v2";
+    static final String VERSION_PREFIX = "version_";
+    static final int BUFFER_SIZE = 8 * 1024 * 1024;
+
+    private final Path basePath;
+    private final Configuration conf;
+    private final FileSystem fileSystem;
+
+    public GlobalDictHDFSStore(String baseDir) throws IOException {
+        super(baseDir);
+        this.basePath = new Path(baseDir);
+        this.conf = HadoopUtil.getCurrentConfiguration();
+        this.fileSystem = HadoopUtil.getFileSystem(baseDir);
+
+        if (!fileSystem.exists(basePath)) {
+            logger.info("Global dict at {} doesn't exist, create a new one", basePath);
+            fileSystem.mkdirs(basePath);
+        }
+
+        migrateOldLayout();
+    }
+
+    // Previously we put slice files and index file directly in base directory,
+    // should migrate to the new versioned layout
+    private void migrateOldLayout() throws IOException {
+        FileStatus[] sliceFiles = fileSystem.listStatus(basePath, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return path.getName().startsWith(IndexFormatV1.SLICE_PREFIX);
+            }
+        });
+        Path indexFile = new Path(basePath, V1_INDEX_NAME);
+
+        if (fileSystem.exists(indexFile) && sliceFiles.length > 0) { // old layout
+            final long version = System.currentTimeMillis();
+            Path tempDir = new Path(basePath, "tmp_" + VERSION_PREFIX + version);
+            Path versionDir = getVersionDir(version);
+
+            logger.info("Convert global dict at {} to new layout with version {}", basePath, version);
+
+            fileSystem.mkdirs(tempDir);
+            // convert to new layout
+            try {
+                // copy index and slice files to temp
+                FileUtil.copy(fileSystem, indexFile, fileSystem, tempDir, false, conf);
+                for (FileStatus sliceFile : sliceFiles) {
+                    FileUtil.copy(fileSystem, sliceFile.getPath(), fileSystem, tempDir, false, conf);
+                }
+                // rename
+                fileSystem.rename(tempDir, versionDir);
+                // delete index and slices files in base dir
+                fileSystem.delete(indexFile, false);
+                for (FileStatus sliceFile : sliceFiles) {
+                    fileSystem.delete(sliceFile.getPath(), true);
+                }
+
+            } finally {
+                if (fileSystem.exists(tempDir)) {
+                    fileSystem.delete(tempDir, true);
+                }
+            }
+        }
+    }
+
+    @Override
+    void prepareForWrite(String workingDir) throws IOException {
+        // TODO create lock file
+        Path working = new Path(workingDir);
+
+        if (fileSystem.exists(working)) {
+            fileSystem.delete(working, true);
+            logger.info("Working directory {} exits, delete it first", working);
+        }
+
+        // when build dict, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt
+        Long[] versions = listAllVersions();
+        if (versions.length > 0) {
+            Path latestVersion = getVersionDir(versions[versions.length - 1]);
+            FileUtil.copy(fileSystem, latestVersion, fileSystem, working, false, true, conf);
+        } else {
+            fileSystem.mkdirs(working);
+        }
+    }
+
+    @Override
+    public Long[] listAllVersions() throws IOException {
+        FileStatus[] versionDirs = fileSystem.listStatus(basePath, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return path.getName().startsWith(VERSION_PREFIX);
+            }
+        });
+        TreeSet<Long> versions = new TreeSet<>();
+        for (int i = 0; i < versionDirs.length; i++) {
+            Path path = versionDirs[i].getPath();
+            versions.add(Long.parseLong(path.getName().substring(VERSION_PREFIX.length())));
+        }
+        return versions.toArray(new Long[versions.size()]);
+    }
+
+    @Override
+    public Path getVersionDir(long version) {
+        return new Path(basePath, VERSION_PREFIX + version);
+    }
+
+    @Override
+    public GlobalDictMetadata getMetadata(long version) throws IOException {
+        Path versionDir = getVersionDir(version);
+        FileStatus[] indexFiles = fileSystem.listStatus(versionDir, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return path.getName().startsWith(V1_INDEX_NAME);
+            }
+        });
+        checkState(indexFiles.length == 1, "zero or more than one index file found: %s", Arrays.toString(indexFiles));
+
+        IndexFormat format;
+        String indexFile = indexFiles[0].getPath().getName();
+        if (V2_INDEX_NAME.equals(indexFile)) {
+            format = new IndexFormatV2(fileSystem, conf);
+        } else if (V1_INDEX_NAME.equals(indexFile)) {
+            format = new IndexFormatV1(fileSystem, conf);
+        } else {
+            throw new RuntimeException("Unknown index file: " + indexFile);
+        }
+
+        return format.readIndexFile(versionDir);
+    }
+
+    @Override
+    public AppendDictSlice readSlice(String directory, String sliceFileName) throws IOException {
+        Path path = new Path(directory, sliceFileName);
+        logger.info("read slice from {}", path);
+        try (FSDataInputStream input = fileSystem.open(path, BUFFER_SIZE)) {
+            return AppendDictSlice.deserializeFrom(input);
+        }
+    }
+
+    @Override
+    public String writeSlice(String workingDir, AppendDictSliceKey key, AppendDictNode slice) throws IOException {
+        //write new slice
+        String sliceFile = IndexFormatV2.sliceFileName(key);
+        Path path = new Path(workingDir, sliceFile);
+
+        logger.info("write slice with key {} into file {}", key, path);
+        try (FSDataOutputStream out = fileSystem.create(path, true, BUFFER_SIZE)) {
+            byte[] bytes = slice.buildTrieBytes();
+            out.write(bytes);
+        }
+        return sliceFile;
+    }
+
+    @Override
+    public void deleteSlice(String workingDir, String sliceFileName) throws IOException {
+        Path path = new Path(workingDir, sliceFileName);
+        logger.info("delete slice at {}", path);
+        if (fileSystem.exists(path)) {
+            fileSystem.delete(path, false);
+        }
+    }
+
+    @Override
+    public void commit(String workingDir, GlobalDictMetadata metadata) throws IOException {
+        Path workingPath = new Path(workingDir);
+
+        // delete v1 index file
+        Path oldIndexFile = new Path(workingPath, V1_INDEX_NAME);
+        if (fileSystem.exists(oldIndexFile)) {
+            fileSystem.delete(oldIndexFile, false);
+        }
+        // write v2 index file
+        IndexFormat index = new IndexFormatV2(fileSystem, conf);
+        index.writeIndexFile(workingPath, metadata);
+        index.sanityCheck(workingPath, metadata);
+
+        // copy working dir to newVersion dir
+        Path newVersionPath = new Path(basePath, VERSION_PREFIX + System.currentTimeMillis());
+        fileSystem.rename(workingPath, newVersionPath);
+
+        cleanUp();
+    }
+
+    // Check versions count, delete expired versions
+    private void cleanUp() throws IOException {
+        Long[] versions = listAllVersions();
+        long timestamp = System.currentTimeMillis();
+        for (int i = 0; i < versions.length - maxVersions; i++) {
+            if (versions[i] + versionTTL < timestamp) {
+                fileSystem.delete(getVersionDir(versions[i]), true);
+            }
+        }
+    }
+
+    @Override
+    public String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
+        checkArgument(baseDir.startsWith(srcConfig.getHdfsWorkingDirectory()), "Please check why current directory {} doesn't belong to source working directory {}", baseDir, srcConfig.getHdfsWorkingDirectory());
+
+        final String dstBaseDir = baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory());
+
+        Long[] versions = listAllVersions();
+        if (versions.length == 0) { // empty dict, nothing to copy
+            return dstBaseDir;
+        }
+
+        Path srcVersionDir = getVersionDir(versions[versions.length - 1]);
+        Path dstVersionDir = new Path(srcVersionDir.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()));
+        FileSystem dstFS = dstVersionDir.getFileSystem(conf);
+        if (dstFS.exists(dstVersionDir)) {
+            dstFS.delete(dstVersionDir, true);
+        }
+        FileUtil.copy(fileSystem, srcVersionDir, dstFS, dstVersionDir, false, true, conf);
+
+        return dstBaseDir;
+    }
+
+    public interface IndexFormat {
+        GlobalDictMetadata readIndexFile(Path dir) throws IOException;
+
+        void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException;
+
+        void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException;
+    }
+
+    public static class IndexFormatV1 implements IndexFormat {
+        static final String SLICE_PREFIX = "cached_";
+
+        protected final FileSystem fs;
+        protected final Configuration conf;
+
+        protected IndexFormatV1(FileSystem fs, Configuration conf) {
+            this.fs = fs;
+            this.conf = conf;
+        }
+
+        @Override
+        public GlobalDictMetadata readIndexFile(Path dir) throws IOException {
+            Path indexFile = new Path(dir, V1_INDEX_NAME);
+            try (FSDataInputStream in = fs.open(indexFile)) {
+                int baseId = in.readInt();
+                int maxId = in.readInt();
+                int maxValueLength = in.readInt();
+                int nValues = in.readInt();
+                String converterName = in.readUTF();
+                BytesConverter converter;
+                try {
+                    converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+                } catch (Exception e) {
+                    throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e);
+                }
+
+                int nSlices = in.readInt();
+                TreeMap<AppendDictSliceKey, String> sliceFileMap = new TreeMap<>();
+                for (int i = 0; i < nSlices; i++) {
+                    AppendDictSliceKey key = new AppendDictSliceKey();
+                    key.readFields(in);
+                    sliceFileMap.put(key, sliceFileName(key));
+                }
+                // make sure first key is always ""
+                String firstFile = sliceFileMap.remove(sliceFileMap.firstKey());
+                sliceFileMap.put(AppendDictSliceKey.START_KEY, firstFile);
+
+                return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap);
+            }
+        }
+
+        //only for test
+        @Override
+        public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException {
+            Path indexFile = new Path(dir, V1_INDEX_NAME);
+            try (FSDataOutputStream out = fs.create(indexFile, true)) {
+                out.writeInt(metadata.baseId);
+                out.writeInt(metadata.maxId);
+                out.writeInt(metadata.maxValueLength);
+                out.writeInt(metadata.nValues);
+                out.writeUTF(metadata.bytesConverter.getClass().getName());
+                out.writeInt(metadata.sliceFileMap.size());
+                for (Map.Entry<AppendDictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+                    entry.getKey().write(out);
+                }
+            }
+        }
+
+        @Override
+        public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException {
+            throw new UnsupportedOperationException("sanityCheck V1 format is no longer supported");
+        }
+
+        public static String sliceFileName(AppendDictSliceKey key) {
+            return SLICE_PREFIX + key;
+        }
+    }
+
+    public static class IndexFormatV2 extends IndexFormatV1 {
+        static final String SLICE_PREFIX = "cached_";
+        static final int MINOR_VERSION_V1 = 0x01;
+
+        protected IndexFormatV2(FileSystem fs, Configuration conf) {
+            super(fs, conf);
+        }
+
+        @Override
+        public GlobalDictMetadata readIndexFile(Path dir) throws IOException {
+            Path indexFile = new Path(dir, V2_INDEX_NAME);
+            try (FSDataInputStream in = fs.open(indexFile)) {
+                byte minorVersion = in.readByte(); // include a header to allow minor format changes
+                if (minorVersion != MINOR_VERSION_V1) {
+                    throw new RuntimeException("Unsupported minor version " + minorVersion);
+                }
+                int baseId = in.readInt();
+                int maxId = in.readInt();
+                int maxValueLength = in.readInt();
+                int nValues = in.readInt();
+                String converterName = in.readUTF();
+                BytesConverter converter;
+                try {
+                    converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+                } catch (Exception e) {
+                    throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e);
+                }
+
+                int nSlices = in.readInt();
+                TreeMap<AppendDictSliceKey, String> sliceFileMap = new TreeMap<>();
+                for (int i = 0; i < nSlices; i++) {
+                    AppendDictSliceKey key = new AppendDictSliceKey();
+                    key.readFields(in);
+                    String sliceFileName = in.readUTF();
+                    sliceFileMap.put(key, sliceFileName);
+                }
+
+                return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap);
+            }
+        }
+
+        @Override
+        public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException {
+            Path indexFile = new Path(dir, V2_INDEX_NAME);
+            try (FSDataOutputStream out = fs.create(indexFile, true)) {
+                out.writeByte(MINOR_VERSION_V1);
+                out.writeInt(metadata.baseId);
+                out.writeInt(metadata.maxId);
+                out.writeInt(metadata.maxValueLength);
+                out.writeInt(metadata.nValues);
+                out.writeUTF(metadata.bytesConverter.getClass().getName());
+                out.writeInt(metadata.sliceFileMap.size());
+                for (Map.Entry<AppendDictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+                    entry.getKey().write(out);
+                    out.writeUTF(entry.getValue());
+                }
+            }
+        }
+
+        @Override
+        public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException {
+            for (Map.Entry<AppendDictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+                if (!fs.exists(new Path(dir, entry.getValue()))) {
+                    throw new RuntimeException("The slice file " + entry.getValue() + " for the key: " + entry.getKey() + " must be existed!");
+                }
+            }
+        }
+
+        public static String sliceFileName(AppendDictSliceKey key) {
+            return String.format("%s%d_%d", SLICE_PREFIX, System.currentTimeMillis(), key.hashCode());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9b36e268/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictMetadata.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictMetadata.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictMetadata.java
new file mode 100644
index 0000000..7c89ea2
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictMetadata.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict.global;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.dict.BytesConverter;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Encapsulates the metadata for a particular version of the global dictionary.
+ * Usually each version of a global dictionary stores its metadata in an index file.
+ */
+public class GlobalDictMetadata {
+    public final int baseId;
+    public final int maxId;
+    public final int maxValueLength;
+    public final int nValues;
+    public final BytesConverter bytesConverter;
+    public final TreeMap<AppendDictSliceKey, String> sliceFileMap; // slice key -> slice file name
+
+    public GlobalDictMetadata(int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, NavigableMap<AppendDictSliceKey, String> sliceFileMap) {
+
+        Preconditions.checkNotNull(bytesConverter, "bytesConverter");
+        Preconditions.checkNotNull(sliceFileMap, "sliceFileMap");
+
+        this.baseId = baseId;
+        this.maxId = maxId;
+        this.maxValueLength = maxValueLength;
+        this.nValues = nValues;
+        this.bytesConverter = bytesConverter;
+        this.sliceFileMap = new TreeMap<>(sliceFileMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/9b36e268/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictStore.java
new file mode 100644
index 0000000..eaf0729
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/GlobalDictStore.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict.global;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+
+import java.io.IOException;
+
+public abstract class GlobalDictStore {
+
+    protected final String baseDir; // base directory containing all versions of this global dict
+    protected final int maxVersions;
+    protected final int versionTTL;
+
+    protected GlobalDictStore(String baseDir) {
+        this.baseDir = Preconditions.checkNotNull(baseDir, "baseDir");
+        this.maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
+        this.versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
+    }
+
+    // workingDir should be an absolute path, will create if not exists
+    abstract void prepareForWrite(String workingDir) throws IOException;
+
+    /**
+     * @return all versions of this dictionary in ascending order
+     * @throws IOException on I/O error
+     */
+    public abstract Long[] listAllVersions() throws IOException;
+
+    // return the path of specified version dir
+    public abstract Path getVersionDir(long version);
+
+    /**
+     * Get the metadata for a particular version of the dictionary.
+     * @param version version number
+     * @return <i>GlobalDictMetadata</i> for the specified version
+     * @throws IOException on I/O error
+     */
+    public abstract GlobalDictMetadata getMetadata(long version) throws IOException;
+
+    /**
+     * Read a <i>DictSlice</i> from a slice file.
+     * @param workingDir directory of the slice file
+     * @param sliceFileName file name of the slice
+     * @return a <i>DictSlice</i>
+     * @throws IOException on I/O error
+     */
+    public abstract AppendDictSlice readSlice(String workingDir, String sliceFileName) throws IOException;
+
+    /**
+     * Write a slice with the given key to the specified directory.
+     * @param workingDir where to write the slice, should exist
+     * @param key slice key
+     * @param slice slice to write
+     * @return file name of the new written slice
+     * @throws IOException on I/O error
+     */
+    public abstract String writeSlice(String workingDir, AppendDictSliceKey key, AppendDictNode slice) throws IOException;
+
+    /**
+     * Delete a slice with the specified file name.
+     * @param workingDir directory of the slice file, should exist
+     * @param sliceFileName file name of the slice, should exist
+     * @throws IOException on I/O error
+     */
+    public abstract void deleteSlice(String workingDir, String sliceFileName) throws IOException;
+
+    /**
+     * commit the <i>DictSlice</i> and <i>GlobalDictMetadata</i> in workingDir to new versionDir
+     * @param workingDir where store the tmp slice and index, should exist
+     * @param globalDictMetadata the metadata of global dict
+     * @throws IOException on I/O error
+     */
+    public abstract void commit(String workingDir, GlobalDictMetadata globalDictMetadata) throws IOException;
+
+    /**
+     * Copy the latest version of this dict to another meta. The source is unchanged.
+     * @param srcConfig config of source meta
+     * @param dstConfig config of destination meta
+     * @return the new base directory for destination meta
+     * @throws IOException on I/O error
+     */
+    public abstract String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException;
+}