You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/03/13 13:36:21 UTC

[1/3] kylin git commit: KYLIN-2506 Refactor Global Dictionary

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-2506 [created] ee3f8e6e0


http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
deleted file mode 100644
index 3c29d9c..0000000
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/CachedTreeMapTest.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.dict;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.util.UUID;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Test;
-
-/**
- * Created by sunyerui on 16/7/12.
- */
-public class CachedTreeMapTest {
-
-    public static class Key implements WritableComparable {
-        int keyInt;
-
-        public static Key of(int keyInt) {
-            Key newKey = new Key();
-            newKey.keyInt = keyInt;
-            return newKey;
-        }
-
-        @Override
-        public int compareTo(Object o) {
-            return keyInt - ((Key)o).keyInt;
-        }
-
-        @Override
-        public void write(DataOutput out) throws IOException {
-            out.writeInt(keyInt);
-        }
-
-        @Override
-        public void readFields(DataInput in) throws IOException {
-            keyInt = in.readInt();
-        }
-
-        @Override
-        public String toString() {
-            return String.valueOf(keyInt);
-        }
-    }
-
-    public static boolean VALUE_WRITE_ERROR_TOGGLE = false;
-    public static class Value implements Writable {
-        String valueStr;
-
-        public static Value of(String valueStr) {
-            Value newValue = new Value();
-            newValue.valueStr = valueStr;
-            return newValue;
-        }
-
-        @Override
-        public void write(DataOutput out) throws IOException {
-            if (VALUE_WRITE_ERROR_TOGGLE) {
-                out.write(new byte[0]);
-                return;
-            }
-            out.writeUTF(valueStr);
-        }
-
-        @Override
-        public void readFields(DataInput in) throws IOException {
-            valueStr = in.readUTF();
-        }
-    }
-
-    public static class CachedFileFilter implements FileFilter {
-        @Override
-        public boolean accept(File pathname) {
-            return pathname.getName().startsWith(CachedTreeMap.CACHED_PREFIX);
-        }
-    }
-
-    public static class VersionFilter implements FileFilter {
-        @Override
-        public boolean accept(File pathname) {
-            return pathname.getName().startsWith(CachedTreeMap.VERSION_PREFIX);
-        }
-    }
-
-
-    static final UUID uuid = UUID.randomUUID();
-    static final String baseDir = "/tmp/kylin_cachedtreemap_test/" + uuid;
-    static final String workingDir = baseDir + "/working";
-
-    private static void cleanup() {
-        Path basePath = new Path(baseDir);
-        try {
-            HadoopUtil.getFileSystem(basePath).delete(basePath, true);
-        } catch (IOException e) {}
-        VALUE_WRITE_ERROR_TOGGLE = false;
-    }
-
-    @After
-    public void afterTest() {
-        cleanup();
-    }
-
-    @AfterClass
-    public static void tearDown() {
-        cleanup();
-    }
-
-    @Test
-    public void testCachedTreeMap() throws IOException {
-        CachedTreeMap map = createMutableMap();
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        map.put(Key.of(4), Value.of("d"));
-        map.put(Key.of(5), Value.of("e"));
-
-        File dir = new File(workingDir);
-        assertEquals(3, dir.listFiles(new CachedFileFilter()).length);
-
-        flushAndCommit(map, true, true, false);
-        assertFalse(new File(workingDir).exists());
-
-        dir = new File(map.getLatestVersion());
-        assertEquals(5, dir.listFiles(new CachedFileFilter()).length);
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals(5, map2.size());
-        assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr);
-
-        try {
-            map2.put(Key.of(6), Value.of("f"));
-            fail("Should be error when put value into immutable map");
-        } catch (AssertionError error) {}
-    }
-
-    @Test
-    public void testMultiVersions() throws IOException, InterruptedException {
-        CachedTreeMap map = createMutableMap();
-        Thread.sleep(3000);
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        flushAndCommit(map, true, true, false);
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals("b", ((Value)map2.get(Key.of(2))).valueStr);
-
-        // re-open dict, append new data
-        map = createMutableMap();
-        map.put(Key.of(4), Value.of("d"));
-        flushAndCommit(map, true, true, true);
-
-        // new data is not visible for map2
-        assertNull(map2.get(Key.of(4)));
-
-        // append data, and be visible for new immutable map
-        map.put(Key.of(5), Value.of("e"));
-        flushAndCommit(map, true, true, true);
-
-        CachedTreeMap map3 = createImmutableMap();
-        assertEquals("d", ((Value)map3.get(Key.of(4))).valueStr);
-        assertEquals("e", ((Value)map3.get(Key.of(5))).valueStr);
-
-        // Check versions retention
-        File dir = new File(baseDir);
-        assertEquals(3, dir.listFiles(new VersionFilter()).length);
-    }
-
-    @Test
-    public void testKeepAppend() throws IOException {
-        CachedTreeMap map = createMutableMap();
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        map.put(Key.of(4), Value.of("d"));
-        map.put(Key.of(5), Value.of("e"));
-
-        // flush with keepAppend false, map can't be append
-        flushAndCommit(map, true, true, false);
-        // append into map has closed
-        try {
-            map.put(Key.of(6), Value.of("f"));
-            fail();
-        } catch (AssertionError e) {
-            assertEquals("Only support put method with immutable false and keepAppend true", e.getMessage());
-        }
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-        assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr);
-        assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr);
-
-        map = createMutableMap();
-        map.put(Key.of(6), Value.of("f"));
-        map.put(Key.of(7), Value.of("g"));
-        map.put(Key.of(8), Value.of("h"));
-        // flush with keepAppend true
-        flushAndCommit(map, true, true, true);
-        map.put(Key.of(9), Value.of("i"));
-        // can still append data
-        flushAndCommit(map, true, true, false);
-
-        map2 = createImmutableMap();
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-        assertEquals("d", ((Value)map2.get(Key.of(4))).valueStr);
-        assertEquals("f", ((Value)map2.get(Key.of(6))).valueStr);
-        assertEquals("i", ((Value)map2.get(Key.of(9))).valueStr);
-    }
-
-    @Test
-    public void testVersionRetention() throws IOException, InterruptedException {
-        File dir = new File(baseDir);
-        // TTL for 3s and keep 3 versions
-        CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
-            .immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class)
-            .maxVersions(3).versionTTL(1000 * 3).build();
-        map.put(Key.of(1), Value.of("a"));
-
-        // has version 0 when create map
-        assertEquals(1, dir.listFiles(new VersionFilter()).length);
-        Thread.sleep(2500);
-
-        // flush version 1
-        flushAndCommit(map, true, true, true);
-        assertEquals(2, dir.listFiles(new VersionFilter()).length);
-
-        // flush version 2
-        flushAndCommit(map, true, true, true);
-        assertEquals(3, dir.listFiles(new VersionFilter()).length);
-
-        // flush version 3
-        flushAndCommit(map, true, true, true);
-        // won't delete version since 3s TTL
-        assertEquals(4, dir.listFiles(new VersionFilter()).length);
-
-        // sleep to make version 0 expired
-        Thread.sleep(500);
-        // flush verion 4
-        flushAndCommit(map, true, true, false);
-        assertEquals(4, dir.listFiles(new VersionFilter()).length);
-
-        // TTL for 100ms and keep 2 versions
-        map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
-            .immutable(false).maxSize(2).keyClazz(Key.class).valueClazz(Value.class)
-            .maxVersions(2).versionTTL(100).build();
-        flushAndCommit(map, true, true, false);
-        assertEquals(2, dir.listFiles(new VersionFilter()).length);
-    }
-
-    @Test
-    public void testWithOldFormat() throws IOException {
-        File dir = new File(baseDir);
-        CachedTreeMap map = createMutableMap();
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        map.put(Key.of(4), Value.of("d"));
-        map.put(Key.of(5), Value.of("e"));
-        flushAndCommit(map, true, true, true);
-
-        // move version dir to base dir, to simulate the older format
-        Path versionPath = new Path(map.getLatestVersion());
-        Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName());
-        FileSystem fs = HadoopUtil.getFileSystem(versionPath);
-        fs.rename(versionPath, tmpVersionPath);
-        fs.delete(new Path(baseDir), true);
-        fs.rename(tmpVersionPath, new Path(baseDir));
-        assertEquals(0, dir.listFiles(new VersionFilter()).length);
-        assertEquals(5, dir.listFiles(new CachedFileFilter()).length);
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals(5, map2.size());
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-        assertEquals("e", ((Value)map2.get(Key.of(5))).valueStr);
-
-        assertEquals(1, dir.listFiles(new VersionFilter()).length);
-        assertEquals(0, dir.listFiles(new CachedFileFilter()).length);
-    }
-
-    @Test
-    public void testWriteFailed() throws IOException {
-        // normal case
-        CachedTreeMap map = createMutableMap();
-        map.put(Key.of(1), Value.of("a"));
-        map.put(Key.of(2), Value.of("b"));
-        map.put(Key.of(3), Value.of("c"));
-        map.remove(Key.of(3));
-        map.put(Key.of(4), Value.of("d"));
-
-        flushAndCommit(map, true, true, false);
-
-        CachedTreeMap map2 = createImmutableMap();
-        assertEquals(3, map2.size());
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-
-        // suppose write value failed and didn't commit data
-        map = createMutableMap();
-        VALUE_WRITE_ERROR_TOGGLE = true;
-        map.put(Key.of(1), Value.of("aa"));
-        map.put(Key.of(2), Value.of("bb"));
-        VALUE_WRITE_ERROR_TOGGLE = false;
-        map.put(Key.of(3), Value.of("cc"));
-        map.put(Key.of(4), Value.of("dd"));
-        // suppose write value failed and didn't commit data
-        flushAndCommit(map, true, false, false);
-
-        // read map data should not be modified
-        map2 = createImmutableMap();
-        assertEquals(3, map2.size());
-        assertEquals("a", ((Value)map2.get(Key.of(1))).valueStr);
-
-        assertTrue(new File(workingDir).exists());
-    }
-
-    private CachedTreeMap createImmutableMap() throws IOException {
-        CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
-            .immutable(true).maxSize(2).keyClazz(Key.class).valueClazz(Value.class).build();
-        try (DataInputStream in = map.openIndexInput()) {
-            map.readFields(in);
-        }
-        return map;
-    }
-
-    private CachedTreeMap createMutableMap() throws IOException {
-        CachedTreeMap map = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir)
-            .immutable(false).maxSize(2).maxVersions(3).versionTTL(1000 * 3).keyClazz(Key.class).valueClazz(Value.class).build();
-        try (DataInputStream in = map.openIndexInput()) {
-            map.readFields(in);
-        } catch (IOException e) {}
-        return map;
-    }
-
-    private void flushAndCommit(CachedTreeMap map, boolean doFlush, boolean doCommit, boolean keepAppend) throws IOException {
-        if (doFlush) {
-            try (DataOutputStream out = map.openIndexOutput()) {
-                map.write(out);
-            }
-        }
-
-        if (doCommit) {
-            map.commit(keepAppend);
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index daa1053..08a8cb0 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -156,9 +156,6 @@ public class KylinKryoRegistrator implements KryoRegistrator {
         kyroClasses.add(org.apache.kylin.cube.model.SelectRule.class);
         kyroClasses.add(org.apache.kylin.cube.model.v1_4_0.CubeDesc.class);
         kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.class);
-        kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictNode.class);
-        kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictSlice.class);
-        kyroClasses.add(org.apache.kylin.dict.AppendTrieDictionary.DictSliceKey.class);
         kyroClasses.add(org.apache.kylin.dict.CacheDictionary.class);
         kyroClasses.add(org.apache.kylin.dict.DateStrDictionary.class);
         kyroClasses.add(org.apache.kylin.dict.DictionaryInfo.class);


[3/3] kylin git commit: KYLIN-2506 Refactor Global Dictionary

Posted by ka...@apache.org.
KYLIN-2506 Refactor Global Dictionary


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ee3f8e6e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ee3f8e6e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ee3f8e6e

Branch: refs/heads/KYLIN-2506
Commit: ee3f8e6e0d202ea3aacf24caf75762f1e6048ebb
Parents: 3a3e967
Author: kangkaisen <ka...@163.com>
Authored: Mon Feb 20 21:06:44 2017 +0800
Committer: kangkaisen <ka...@163.com>
Committed: Mon Mar 13 21:27:46 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/dict/AppendTrieDictionary.java | 1197 +-----------------
 .../kylin/dict/AppendTrieDictionaryBuilder.java |  289 +++++
 .../kylin/dict/AppendTrieDictionaryChecker.java |    9 +-
 .../org/apache/kylin/dict/CachedTreeMap.java    |  481 -------
 .../java/org/apache/kylin/dict/DictNode.java    |  376 ++++++
 .../java/org/apache/kylin/dict/DictSlice.java   |  283 +++++
 .../org/apache/kylin/dict/DictSliceKey.java     |   75 ++
 .../apache/kylin/dict/GlobalDictHDFSStore.java  |  419 ++++++
 .../apache/kylin/dict/GlobalDictMetadata.java   |   50 +
 .../org/apache/kylin/dict/GlobalDictStore.java  |  102 ++
 .../kylin/dict/GlobalDictionaryBuilder.java     |   12 +-
 .../kylin/dict/AppendTrieDictionaryTest.java    |  324 ++++-
 .../apache/kylin/dict/CachedTreeMapTest.java    |  378 ------
 .../engine/spark/KylinKryoRegistrator.java      |    3 -
 14 files changed, 1943 insertions(+), 2055 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
index 962686d..1205c92 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
@@ -15,1173 +15,125 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
 */
-
 package org.apache.kylin.dict;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NavigableSet;
 import java.util.Objects;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.concurrent.ExecutionException;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * A dictionary based on Trie data structure that maps enumerations of byte[] to
  * int IDs, used for global dictionary.
- *
- * Trie data is split into sub trees, called {@link DictSlice}, and stored in a {@link CachedTreeMap} with a configurable cache size.
- *
+ * <p>
+ * Trie data is split into sub trees, called {@link DictSlice}.
+ * <p>
  * With Trie the memory footprint of the mapping is kinda minimized at the cost
  * CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is
  * roughly 10 times slower, so there's a cache layer overlays on top of Trie and
  * gracefully fall back to Trie using a weak reference.
- *
+ * <p>
  * The implementation is NOT thread-safe for now.
- *
+ * <p>
  * TODO making it thread-safe
  *
  * @author sunyerui
  */
 @SuppressWarnings({ "rawtypes", "unchecked", "serial" })
 public class AppendTrieDictionary<T> extends CacheDictionary<T> {
-
     public static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict"
     public static final int HEAD_SIZE_I = HEAD_MAGIC.length;
-
-    public static final int BIT_IS_LAST_CHILD = 0x80;
-    public static final int BIT_IS_END_OF_VALUE = 0x40;
-
     private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionary.class);
 
     transient private String baseDir;
-    transient private int maxId;
-    transient private int maxValueLength;
-    transient private int nValues;
-
-    volatile private TreeMap<DictSliceKey, DictSlice> dictSliceMap;
-
-    // Constructor both for build and deserialize
-    public AppendTrieDictionary() {
-        enableCache();
-    }
+    transient private GlobalDictMetadata metadata;
+    transient private LoadingCache<DictSliceKey, DictSlice> dictCache;
 
-    public void initParams(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter) throws IOException {
+    public void init(String baseDir) throws IOException {
         this.baseDir = baseDir;
-        this.baseId = baseId;
-        this.maxId = maxId;
-        this.maxValueLength = maxValueLength;
-        this.nValues = nValues;
-        this.bytesConvert = bytesConverter;
-    }
-
-    public void initDictSliceMap(CachedTreeMap dictMap) throws IOException {
-        int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
-        long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
-        CachedTreeMap newDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(1).baseDir(baseDir).immutable(true).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
-        newDictSliceMap.loadEntry(dictMap);
-        this.dictSliceMap = newDictSliceMap;
-    }
-
-    public byte[] writeDictMap() throws IOException {
-        ByteArrayOutputStream buf = new ByteArrayOutputStream();
-        DataOutputStream out = new DataOutputStream(buf);
-        ((Writable) dictSliceMap).write(out);
-        byte[] dictMapBytes = buf.toByteArray();
-        buf.close();
-        out.close();
-
-        return dictMapBytes;
-    }
-
-    // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
-    public static void checkValidId(int id) {
-        if (id == 0 || id == -1) {
-            throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
-        }
-    }
-
-    public static class DictSliceKey implements WritableComparable, java.io.Serializable {
-        byte[] key;
-
-        public static DictSliceKey wrap(byte[] key) {
-            DictSliceKey dictKey = new DictSliceKey();
-            dictKey.key = key;
-            return dictKey;
-        }
-
-        @Override
-        public String toString() {
-            return Bytes.toStringBinary(key);
-        }
-
-        @Override
-        public int hashCode() {
-            return Arrays.hashCode(key);
-        }
-
-        @Override
-        public int compareTo(Object o) {
-            if (!(o instanceof DictSliceKey)) {
-                return -1;
-            }
-            DictSliceKey other = (DictSliceKey) o;
-            return Bytes.compareTo(key, other.key);
-        }
-
-        @Override
-        public void write(DataOutput out) throws IOException {
-            out.writeInt(key.length);
-            out.write(key);
-        }
-
-        @Override
-        public void readFields(DataInput in) throws IOException {
-            key = new byte[in.readInt()];
-            in.readFully(key);
-        }
-    }
-
-    public static class DictSlice<T> implements Writable, java.io.Serializable {
-        public DictSlice() {
-        }
-
-        public DictSlice(byte[] trieBytes) {
-            init(trieBytes);
-        }
-
-        private byte[] trieBytes;
-
-        // non-persistent part
-        transient private int headSize;
-        transient private int bodyLen;
-        transient private int sizeChildOffset;
-
-        transient private int nValues;
-        transient private int sizeOfId;
-        // mask MUST be long, since childOffset maybe 5 bytes at most
-        transient private long childOffsetMask;
-        transient private int firstByteOffset;
-
-        private void init(byte[] trieBytes) {
-            this.trieBytes = trieBytes;
-            if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0)
-                throw new IllegalArgumentException("Wrong file type (magic does not match)");
-
-            try {
-                DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I));
-                this.headSize = headIn.readShort();
-                this.bodyLen = headIn.readInt();
-                this.nValues = headIn.readInt();
-                this.sizeChildOffset = headIn.read();
-                this.sizeOfId = headIn.read();
-
-                this.childOffsetMask = ~(((long) (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8));
-                this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte
-            } catch (Exception e) {
-                if (e instanceof RuntimeException)
-                    throw (RuntimeException) e;
-                else
-                    throw new RuntimeException(e);
-            }
-        }
-
-        public byte[] getFirstValue() {
-            int nodeOffset = headSize;
-            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-            while (true) {
-                int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1);
-                bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen);
-                if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) {
-                    break;
-                }
-                nodeOffset = headSize + (int) (BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
-                if (nodeOffset == headSize) {
-                    break;
-                }
-            }
-            return bytes.toByteArray();
-        }
-
-        /**
-         * returns a code point from [0, nValues), preserving order of value
-         *
-         * @param n
-         *            -- the offset of current node
-         * @param inp
-         *            -- input value bytes to lookup
-         * @param o
-         *            -- offset in the input value bytes matched so far
-         * @param inpEnd
-         *            -- end of input
-         * @param roundingFlag
-         *            -- =0: return -1 if not found
-         *            -- <0: return closest smaller if not found, return -1
-         *            -- >0: return closest bigger if not found, return nValues
-         */
-        private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
-            while (true) {
-                // match the current node
-                int p = n + firstByteOffset; // start of node's value
-                int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value
-                for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0]
-                    if (trieBytes[p] != inp[o]) {
-                        return -1; // mismatch
-                    }
-                }
-
-                // node completely matched, is input all consumed?
-                boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
-                if (o == inpEnd) {
-                    return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1;
-                }
-
-                // find a child to continue
-                int c = headSize + (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
-                if (c == headSize) // has no children
-                    return -1;
-                byte inpByte = inp[o];
-                int comp;
-                while (true) {
-                    p = c + firstByteOffset;
-                    comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte);
-                    if (comp == 0) { // continue in the matching child, reset n and loop again
-                        n = c;
-                        break;
-                    } else if (comp < 0) { // try next child
-                        if (checkFlag(c, BIT_IS_LAST_CHILD))
-                            return -1;
-                        c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0);
-                    } else { // children are ordered by their first value byte
-                        return -1;
-                    }
-                }
-            }
-        }
-
-        private boolean checkFlag(int offset, int bit) {
-            return (trieBytes[offset] & bit) > 0;
-        }
-
-        public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
-            int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
-            return id;
-        }
-
-        private DictNode rebuildTrieTree() {
-            return rebuildTrieTreeR(headSize, null);
-        }
-
-        private DictNode rebuildTrieTreeR(int n, DictNode parent) {
-            DictNode root = null;
-            while (true) {
-                int p = n + firstByteOffset;
-                int childOffset = (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
-                int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
-                boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
-
-                byte[] value = new byte[parLen];
-                System.arraycopy(trieBytes, p, value, 0, parLen);
-
-                DictNode node = new DictNode(value, isEndOfValue);
-                if (isEndOfValue) {
-                    int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
-                    node.id = id;
-                }
-
-                if (parent == null) {
-                    root = node;
-                } else {
-                    parent.addChild(node);
-                }
-
-                if (childOffset != 0) {
-                    rebuildTrieTreeR(childOffset + headSize, node);
-                }
-
-                if (checkFlag(n, BIT_IS_LAST_CHILD)) {
-                    break;
-                } else {
-                    n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
-                }
-            }
-            return root;
-        }
-
-        public boolean doCheck() {
-            int offset = headSize;
-            HashSet<Integer> parentSet = new HashSet<>();
-            boolean lastChild = false;
-
-            while (offset < trieBytes.length) {
-                if (lastChild) {
-                    boolean contained = parentSet.remove(offset - headSize);
-                    // Can't find parent, the data is corrupted
-                    if (!contained) {
-                        return false;
-                    }
-                    lastChild = false;
-                }
-                int p = offset + firstByteOffset;
-                int childOffset = (int) (BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask);
-                int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
-                boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE);
-
-                // Copy value overflow, the data is corrupted
-                if (trieBytes.length < p + parLen) {
-                    return false;
-                }
-
-                // Check id is fine
-                if (isEndOfValue) {
-                    BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
-                }
-
-                // Record it if has children
-                if (childOffset != 0) {
-                    parentSet.add(childOffset);
-                }
-
-                // brothers done, move to next parent
-                if (checkFlag(offset, BIT_IS_LAST_CHILD)) {
-                    lastChild = true;
-                }
-
-                // move to next node
-                offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
-            }
-
-            // ParentMap is empty, meaning all nodes has parent, the data is correct
-            return parentSet.isEmpty();
-        }
-
-        public void write(DataOutput out) throws IOException {
-            out.write(trieBytes);
-        }
-
-        public void readFields(DataInput in) throws IOException {
-            byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE / Byte.SIZE + Integer.SIZE / Byte.SIZE];
-            in.readFully(headPartial);
-
-            if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0)
-                throw new IllegalArgumentException("Wrong file type (magic does not match)");
-
-            DataInputStream headIn = new DataInputStream(//
-                    new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
-            int headSize = headIn.readShort();
-            int bodyLen = headIn.readInt();
-            headIn.close();
-
-            byte[] all = new byte[headSize + bodyLen];
-            System.arraycopy(headPartial, 0, all, 0, headPartial.length);
-            in.readFully(all, headPartial.length, all.length - headPartial.length);
-
-            init(all);
-        }
-
-        public static DictNode rebuildNodeByDeserialize(DataInput in) throws IOException {
-            DictSlice slice = new DictSlice();
-            slice.readFields(in);
-            return slice.rebuildTrieTree();
-        }
-
-        @Override
-        public String toString() {
-            return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen);
-        }
-
-        @Override
-        public int hashCode() {
-            return Arrays.hashCode(trieBytes);
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if ((o instanceof AppendTrieDictionary.DictSlice) == false) {
-                logger.info("Equals return false because it's not DictInfo");
-                return false;
-            }
-            DictSlice that = (DictSlice) o;
-            return Arrays.equals(this.trieBytes, that.trieBytes);
-        }
-    }
-
-    public static class DictNode implements Writable, java.io.Serializable {
-        public byte[] part;
-        public int id = -1;
-        public boolean isEndOfValue;
-        public ArrayList<DictNode> children = new ArrayList<>();
-
-        public int nValuesBeneath;
-        public DictNode parent;
-        public int childrenCount = 1;
-
-        public DictNode() {
-        }
-
-        public void clone(DictNode o) {
-            this.part = o.part;
-            this.id = o.id;
-            this.isEndOfValue = o.isEndOfValue;
-            this.children = o.children;
-            for (DictNode child : o.children) {
-                child.parent = this;
-            }
-            this.nValuesBeneath = o.nValuesBeneath;
-            this.parent = o.parent;
-            this.childrenCount = o.childrenCount;
-        }
-
-        DictNode(byte[] value, boolean isEndOfValue) {
-            reset(value, isEndOfValue);
-        }
-
-        DictNode(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
-            reset(value, isEndOfValue, children);
-        }
-
-        void reset(byte[] value, boolean isEndOfValue) {
-            reset(value, isEndOfValue, new ArrayList<DictNode>());
-        }
-
-        void reset(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
-            this.part = value;
-            this.isEndOfValue = isEndOfValue;
-            clearChild();
-            for (DictNode child : children) {
-                addChild(child);
-            }
-            this.id = -1;
-        }
-
-        void clearChild() {
-            this.children.clear();
-            int childrenCountDelta = this.childrenCount - 1;
-            for (DictNode p = this; p != null; p = p.parent) {
-                p.childrenCount -= childrenCountDelta;
-            }
-        }
-
-        void addChild(DictNode child) {
-            addChild(-1, child);
-        }
-
-        void addChild(int index, DictNode child) {
-            child.parent = this;
-            if (index < 0) {
-                this.children.add(child);
-            } else {
-                this.children.add(index, child);
-            }
-            for (DictNode p = this; p != null; p = p.parent) {
-                p.childrenCount += child.childrenCount;
-            }
-        }
-
-        public DictNode removeChild(int index) {
-            DictNode child = children.remove(index);
-            child.parent = null;
-            for (DictNode p = this; p != null; p = p.parent) {
-                p.childrenCount -= child.childrenCount;
-            }
-            return child;
-        }
-
-        public DictNode duplicateNode() {
-            DictNode newChild = new DictNode(part, false);
-            newChild.parent = parent;
-            if (parent != null) {
-                int index = parent.children.indexOf(this);
-                parent.addChild(index + 1, newChild);
-            }
-            return newChild;
-        }
-
-        public byte[] firstValue() {
-            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-            DictNode p = this;
-            while (true) {
-                bytes.write(p.part, 0, p.part.length);
-                if (p.isEndOfValue || p.children.size() == 0) {
-                    break;
-                }
-                p = p.children.get(0);
-            }
-            return bytes.toByteArray();
-        }
-
-        public static DictNode splitNodeTree(DictNode splitNode) {
-            if (splitNode == null) {
-                return null;
-            }
-            DictNode current = splitNode;
-            DictNode p = current.parent;
-            while (p != null) {
-                int index = p.children.indexOf(current);
-                assert index != -1;
-                DictNode newParent = p.duplicateNode();
-                for (int i = p.children.size() - 1; i >= index; i--) {
-                    DictNode child = p.removeChild(i);
-                    newParent.addChild(0, child);
-                }
-                current = newParent;
-                p = p.parent;
-            }
-            return current;
-        }
-
-        public static void mergeSingleByteNode(DictNode root, int leftOrRight) {
-            DictNode current = root;
-            DictNode child;
-            while (!current.children.isEmpty()) {
-                child = leftOrRight == 0 ? current.children.get(0) : current.children.get(current.children.size() - 1);
-                if (current.children.size() > 1 || current.isEndOfValue) {
-                    current = child;
-                    continue;
-                }
-                byte[] newValue = new byte[current.part.length + child.part.length];
-                System.arraycopy(current.part, 0, newValue, 0, current.part.length);
-                System.arraycopy(child.part, 0, newValue, current.part.length, child.part.length);
-                current.reset(newValue, child.isEndOfValue, child.children);
-                current.id = child.id;
-            }
-        }
-
-        @Override
-        public void write(DataOutput out) throws IOException {
-            byte[] bytes = buildTrieBytes();
-            out.write(bytes);
-        }
-
-        @Override
-        public void readFields(DataInput in) throws IOException {
-            DictNode root = DictSlice.rebuildNodeByDeserialize(in);
-            this.clone(root);
-        }
-
-        protected byte[] buildTrieBytes() {
-            Stats stats = Stats.stats(this);
-            int sizeChildOffset = stats.mbpn_sizeChildOffset;
-            int sizeId = stats.mbpn_sizeId;
-
-            // write head
-            byte[] head;
-            try {
-                ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
-                DataOutputStream headOut = new DataOutputStream(byteBuf);
-                headOut.write(AppendTrieDictionary.HEAD_MAGIC);
-                headOut.writeShort(0); // head size, will back fill
-                headOut.writeInt(stats.mbpn_footprint); // body size
-                headOut.writeInt(stats.nValues);
-                headOut.write(sizeChildOffset);
-                headOut.write(sizeId);
-                headOut.close();
-                head = byteBuf.toByteArray();
-                BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2);
-            } catch (IOException e) {
-                throw new RuntimeException(e); // shall not happen, as we are
-            }
-
-            byte[] trieBytes = new byte[stats.mbpn_footprint + head.length];
-            System.arraycopy(head, 0, trieBytes, 0, head.length);
-
-            LinkedList<DictNode> open = new LinkedList<DictNode>();
-            IdentityHashMap<DictNode, Integer> offsetMap = new IdentityHashMap<DictNode, Integer>();
-
-            // write body
-            int o = head.length;
-            offsetMap.put(this, o);
-            o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes);
-            if (this.children.isEmpty() == false)
-                open.addLast(this);
-
-            while (open.isEmpty() == false) {
-                DictNode parent = open.removeFirst();
-                build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
-                for (int i = 0; i < parent.children.size(); i++) {
-                    DictNode c = parent.children.get(i);
-                    boolean isLastChild = (i == parent.children.size() - 1);
-                    offsetMap.put(c, o);
-                    o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes);
-                    if (c.children.isEmpty() == false)
-                        open.addLast(c);
-                }
-            }
-
-            if (o != trieBytes.length)
-                throw new RuntimeException();
-            return trieBytes;
-        }
-
-        private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
-            int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
-            BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
-            trieBytes[parentOffset] |= flags;
-        }
-
-        private int build_writeNode(DictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) {
-            int o = offset;
-
-            // childOffset
-            if (isLastChild)
-                trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
-            if (n.isEndOfValue)
-                trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
-            o += sizeChildOffset;
-
-            // nValueBytes
-            if (n.part.length > 255)
-                throw new RuntimeException("Value length is " + n.part.length + " and larger than 255: " + Bytes.toStringBinary(n.part));
-            BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
-            o++;
-
-            // valueBytes
-            System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
-            o += n.part.length;
-
-            if (n.isEndOfValue) {
-                checkValidId(n.id);
-                BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId);
-                o += sizeId;
-            }
-
-            return o;
-        }
-
-        @Override
-        public String toString() {
-            return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue()));
-        }
-    }
-
-    public static class Stats {
-        public interface Visitor {
-            void visit(DictNode n, int level);
-        }
-
-        private static void traverseR(DictNode node, Visitor visitor, int level) {
-            visitor.visit(node, level);
-            for (DictNode c : node.children)
-                traverseR(c, visitor, level + 1);
-        }
-
-        private static void traversePostOrderR(DictNode node, Visitor visitor, int level) {
-            for (DictNode c : node.children)
-                traversePostOrderR(c, visitor, level + 1);
-            visitor.visit(node, level);
-        }
-
-        public int nValues; // number of values in total
-        public int nValueBytesPlain; // number of bytes for all values
-        // uncompressed
-        public int nValueBytesCompressed; // number of values bytes in Trie
-        // (compressed)
-        public int maxValueLength; // size of longest value in bytes
-
-        // the trie is multi-byte-per-node
-        public int mbpn_nNodes; // number of nodes in trie
-        public int mbpn_trieDepth; // depth of trie
-        public int mbpn_maxFanOut; // the maximum no. children
-        public int mbpn_nChildLookups; // number of child lookups during lookup
-        // every value once
-        public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every
-        // value once
-        public int mbpn_sizeValueTotal; // the sum of value space in all nodes
-        public int mbpn_sizeNoValueBytes; // size of field noValueBytes
-        public int mbpn_sizeChildOffset; // size of field childOffset, points to
-        // first child in flattened array
-        public int mbpn_sizeId; // size of id value, always be 4
-        public int mbpn_footprint; // MBPN footprint in bytes
-
-        /**
-         * out print some statistics of the trie and the dictionary built from it
-         */
-        public static Stats stats(DictNode root) {
-            // calculate nEndValueBeneath
-            traversePostOrderR(root, new Visitor() {
-                @Override
-                public void visit(DictNode n, int level) {
-                    n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
-                    for (DictNode c : n.children)
-                        n.nValuesBeneath += c.nValuesBeneath;
-                }
-            }, 0);
-
-            // run stats
-            final Stats s = new Stats();
-            final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
-            traverseR(root, new Visitor() {
-                @Override
-                public void visit(DictNode n, int level) {
-                    if (n.isEndOfValue)
-                        s.nValues++;
-                    s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
-                    s.nValueBytesCompressed += n.part.length;
-                    s.mbpn_nNodes++;
-                    if (s.mbpn_trieDepth < level + 1)
-                        s.mbpn_trieDepth = level + 1;
-                    if (n.children.size() > 0) {
-                        if (s.mbpn_maxFanOut < n.children.size())
-                            s.mbpn_maxFanOut = n.children.size();
-                        int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
-                        s.mbpn_nChildLookups += childLookups;
-                        s.mbpn_nTotalFanOut += childLookups * n.children.size();
-                    }
-
-                    if (level < lenAtLvl.size())
-                        lenAtLvl.set(level, n.part.length);
-                    else
-                        lenAtLvl.add(n.part.length);
-                    int lenSoFar = 0;
-                    for (int i = 0; i <= level; i++)
-                        lenSoFar += lenAtLvl.get(i);
-                    if (lenSoFar > s.maxValueLength)
-                        s.maxValueLength = lenSoFar;
-                }
-            }, 0);
-
-            // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode
-            s.mbpn_sizeId = 4;
-            s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId;
-            s.mbpn_sizeNoValueBytes = 1;
-            s.mbpn_sizeChildOffset = 5;
-            s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset);
-            while (true) { // minimize the offset size to match the footprint
-                int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1);
-                // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
-                // expand t to long before *4, avoiding exceed Integer.MAX_VALUE
-                if (BytesUtil.sizeForValue((long) t * 4) <= s.mbpn_sizeChildOffset - 1) {
-                    s.mbpn_sizeChildOffset--;
-                    s.mbpn_footprint = t;
-                } else
-                    break;
-            }
-
-            return s;
-        }
-
-        /**
-         * out print trie for debug
-         */
-        public void print(DictNode root) {
-            print(root, System.out);
-        }
-
-        public void print(DictNode root, final PrintStream out) {
-            traverseR(root, new Visitor() {
-                @Override
-                public void visit(DictNode n, int level) {
-                    try {
-                        for (int i = 0; i < level; i++)
-                            out.print("  ");
-                        out.print(new String(n.part, "UTF-8"));
-                        out.print(" - ");
-                        if (n.nValuesBeneath > 0)
-                            out.print(n.nValuesBeneath);
-                        if (n.isEndOfValue)
-                            out.print("* [" + n.id + "]");
-                        out.print("\n");
-                    } catch (UnsupportedEncodingException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }, 0);
-        }
-    }
-
-    public static class Builder<T> {
-        private static ConcurrentHashMap<String, Pair<Integer, Builder>> builderInstanceAndCountMap = new ConcurrentHashMap();
-
-        public static Builder getInstance(String resourcePath) throws IOException {
-            return getInstance(resourcePath, null);
-        }
-
-        public synchronized static Builder getInstance(String resourcePath, AppendTrieDictionary dict) throws IOException {
-            Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath);
-            if (entry == null) {
-                entry = new Pair<>(0, createNewBuilder(resourcePath, dict));
-                builderInstanceAndCountMap.put(resourcePath, entry);
-            }
-            entry.setFirst(entry.getFirst() + 1);
-            return entry.getSecond();
-        }
-
-        // return true if entry still in map
-        private synchronized static boolean releaseInstance(String resourcePath) {
-            Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath);
-            if (entry != null) {
-                entry.setFirst(entry.getFirst() - 1);
-                if (entry.getFirst() <= 0) {
-                    builderInstanceAndCountMap.remove(resourcePath);
-                    return false;
-                }
-                return true;
-            }
-            return false;
-        }
-
-        public static Builder createNewBuilder(String resourcePath, AppendTrieDictionary existDict) throws IOException {
-            String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourcePath + "/";
-
-            AppendTrieDictionary dictToUse = existDict;
-            if (dictToUse == null) {
-                // Try to load the existing dict from cache, making sure there's only the same one object in memory
-                NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(resourcePath);
-                ArrayList<String> appendDicts = new ArrayList<>();
-                if (dicts != null && !dicts.isEmpty()) {
-                    for (String dict : dicts) {
-                        DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
-                        if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) {
-                            appendDicts.add(dict);
-                        }
-                    }
-                }
-                if (appendDicts.isEmpty()) {
-                    dictToUse = null;
-                } else if (appendDicts.size() == 1) {
-                    dictToUse = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0));
-                } else {
-                    throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", resourcePath, appendDicts.size()));
-                }
-            }
-
-            AppendTrieDictionary.Builder<String> builder;
-            if (dictToUse == null) {
-                logger.info("GlobalDict {} is empty, create new one", resourcePath);
-                builder = new Builder<>(resourcePath, null, dictDir, 0, 0, 0, new StringBytesConverter(), null);
-            } else {
-                logger.info("GlobalDict {} exist, append value", resourcePath);
-                builder = new Builder<>(resourcePath, dictToUse, dictToUse.baseDir, dictToUse.maxId, dictToUse.maxValueLength, dictToUse.nValues, dictToUse.bytesConvert, dictToUse.writeDictMap());
-            }
-
-            return builder;
-        }
-
-        private final String resourcePath;
-        private final String baseDir;
-        private int maxId;
-        private int maxValueLength;
-        private int nValues;
-        private final BytesConverter<T> bytesConverter;
-
-        private final AppendTrieDictionary dict;
-
-        private final TreeMap<DictSliceKey, DictNode> mutableDictSliceMap;
-        private int MAX_ENTRY_IN_SLICE = 10_000_000;
-        private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0;
-
-        private int processedCount = 0;
-
-        // Constructor for a new Dict
-        private Builder(String resourcePath, AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException {
-            this.resourcePath = resourcePath;
-            if (dict == null) {
-                this.dict = new AppendTrieDictionary<T>();
-            } else {
-                this.dict = dict;
-            }
-            this.baseDir = baseDir;
-            this.maxId = maxId;
-            this.maxValueLength = maxValueLength;
-            this.nValues = nValues;
-            this.bytesConverter = bytesConverter;
-
-            MAX_ENTRY_IN_SLICE = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
-            int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
-            long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
-            // create a new cached map with baseDir
-            mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(1).baseDir(baseDir).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).immutable(false).build();
-            if (dictMapBytes != null) {
-                ((Writable) mutableDictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes)));
-            }
-        }
-
-        public void addValue(T value) {
-            addValue(bytesConverter.convertToBytes(value));
-        }
-
-        private synchronized void addValue(byte[] value) {
-            if (++processedCount % 1_000_000 == 0) {
-                logger.debug("add value count " + processedCount);
-            }
-            maxValueLength = Math.max(maxValueLength, value.length);
-
-            if (mutableDictSliceMap.isEmpty()) {
-                DictNode root = new DictNode(new byte[0], false);
-                mutableDictSliceMap.put(DictSliceKey.wrap(new byte[0]), root);
-            }
-            DictSliceKey sliceKey = mutableDictSliceMap.floorKey(DictSliceKey.wrap(value));
-            if (sliceKey == null) {
-                sliceKey = mutableDictSliceMap.firstKey();
-            }
-            DictNode root = mutableDictSliceMap.get(sliceKey);
-            addValueR(root, value, 0);
-            if (root.childrenCount > MAX_ENTRY_IN_SLICE * MAX_ENTRY_OVERHEAD_FACTOR) {
-                mutableDictSliceMap.remove(sliceKey);
-                DictNode newRoot = splitNodeTree(root);
-                DictNode.mergeSingleByteNode(root, 1);
-                DictNode.mergeSingleByteNode(newRoot, 0);
-                mutableDictSliceMap.put(DictSliceKey.wrap(root.firstValue()), root);
-                mutableDictSliceMap.put(DictSliceKey.wrap(newRoot.firstValue()), newRoot);
-            }
-        }
-
-        private DictNode splitNodeTree(DictNode root) {
-            DictNode parent = root;
-            DictNode splitNode;
-            int childCountToSplit = (int) (MAX_ENTRY_IN_SLICE * MAX_ENTRY_OVERHEAD_FACTOR / 2);
-            while (true) {
-                List<DictNode> children = parent.children;
-                if (children.size() == 0) {
-                    splitNode = parent;
-                    break;
-                } else if (children.size() == 1) {
-                    parent = children.get(0);
-                    continue;
-                } else {
-                    for (int i = children.size() - 1; i >= 0; i--) {
-                        parent = children.get(i);
-                        if (childCountToSplit > children.get(i).childrenCount) {
-                            childCountToSplit -= children.get(i).childrenCount;
-                        } else {
-                            childCountToSplit--;
-                            break;
-                        }
-                    }
-                }
-            }
-            return DictNode.splitNodeTree(splitNode);
-        }
-
-        private int createNextId() {
-            int id = ++maxId;
-            checkValidId(id);
-            nValues++;
-            return id;
-        }
-
-        // Only used for test
-        public void setMaxId(int id) {
-            this.maxId = id;
-        }
-
-        // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree
-        private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) {
-            DictNode head = null;
-            DictNode current = null;
-            for (; start + 255 < end; start += 255) {
-                DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false);
-                if (head == null) {
-                    head = c;
-                    current = c;
-                } else {
-                    current.addChild(c);
-                    current = c;
-                }
-            }
-            DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true);
-            last.id = createNextId();
-            if (head == null) {
-                head = last;
-            } else {
-                current.addChild(last);
-            }
-            return head;
-        }
-
-        private void addValueR(DictNode node, byte[] value, int start) {
-            // match the value part of current node
-            int i = 0, j = start;
-            int n = node.part.length, nn = value.length;
-            int comp = 0;
-            for (; i < n && j < nn; i++, j++) {
-                comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
-                if (comp != 0)
-                    break;
-            }
-
-            if (j == nn) {
-                // if value fully matched within the current node
-                if (i == n) {
-                    // if equals to current node, just mark end of value
-                    if (!node.isEndOfValue) {
-                        // if the first match, assign an Id to nodt
-                        node.id = createNextId();
-                    }
-                    node.isEndOfValue = true;
-                } else {
-                    // otherwise, split the current node into two
-                    DictNode c = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
-                    c.id = node.id;
-                    node.reset(BytesUtil.subarray(node.part, 0, i), true);
-                    node.addChild(c);
-                    node.id = createNextId();
-                }
-                return;
-            }
-
-            // if partially matched the current, split the current node, add the new
-            // value, make a 3-way
-            if (i < n) {
-                DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
-                c1.id = node.id;
-                DictNode c2 = addNodeMaybeOverflow(value, j, nn);
-                node.reset(BytesUtil.subarray(node.part, 0, i), false);
-                if (comp < 0) {
-                    node.addChild(c1);
-                    node.addChild(c2);
-                } else {
-                    node.addChild(c2);
-                    node.addChild(c1);
-                }
-                return;
-            }
-
-            // out matched the current, binary search the next byte for a child node
-            // to continue
-            byte lookfor = value[j];
-            int lo = 0;
-            int hi = node.children.size() - 1;
-            int mid = 0;
-            boolean found = false;
-            comp = 0;
-            while (!found && lo <= hi) {
-                mid = lo + (hi - lo) / 2;
-                DictNode c = node.children.get(mid);
-                comp = BytesUtil.compareByteUnsigned(lookfor, c.part[0]);
-                if (comp < 0)
-                    hi = mid - 1;
-                else if (comp > 0)
-                    lo = mid + 1;
-                else
-                    found = true;
-            }
-            if (found) {
-                // found a child node matching the first byte, continue in that child
-                addValueR(node.children.get(mid), value, j);
-            } else {
-                // otherwise, make the value a new child
-                DictNode c = addNodeMaybeOverflow(value, j, nn);
-                node.addChild(comp <= 0 ? mid : mid + 1, c);
-            }
-        }
-
-        public synchronized AppendTrieDictionary<T> build(int baseId) throws IOException {
-            boolean keepAppend = releaseInstance(resourcePath);
-            CachedTreeMap dictSliceMap = (CachedTreeMap) mutableDictSliceMap;
-            dict.initParams(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter);
-            dict.flushIndex(dictSliceMap, keepAppend);
-            dict.initDictSliceMap(dictSliceMap);
-
-            return dict;
-        }
+        final GlobalDictStore globalDictStore = new GlobalDictHDFSStore(baseDir);
+        long[] versions = globalDictStore.listAllVersions();
+        checkState(versions.length > 0, "Global dict at %s is empty", baseDir);
+        final long latestVersion = versions[versions.length - 1];
+        final Path latestVersionPath = globalDictStore.getVersionDir(latestVersion);
+        this.metadata = globalDictStore.getMetadata(latestVersion);
+        this.bytesConvert = metadata.bytesConverter;
+        this.dictCache = CacheBuilder.newBuilder().softValues().removalListener(new RemovalListener<DictSliceKey, DictSlice>() {
+            @Override
+            public void onRemoval(RemovalNotification<DictSliceKey, DictSlice> notification) {
+                logger.info("Evict slice with key {} and value {} caused by {}, size {}/{}", notification.getKey(), notification.getValue(), notification.getCause(), dictCache.size(), metadata.sliceFileMap.size());
+            }
+        }).build(new CacheLoader<DictSliceKey, DictSlice>() {
+            @Override
+            public DictSlice load(DictSliceKey key) throws Exception {
+                DictSlice slice = globalDictStore.readSlice(latestVersionPath.toString(), metadata.sliceFileMap.get(key));
+                logger.info("Load slice with key {} and value {}", key, slice);
+                return slice;
+            }
+        });
     }
 
     @Override
     protected int getIdFromValueBytesWithoutCache(byte[] value, int offset, int len, int roundingFlag) {
-        if (dictSliceMap.isEmpty()) {
-            return -1;
-        }
-        byte[] tempVal = new byte[len];
-        System.arraycopy(value, offset, tempVal, 0, len);
-        DictSliceKey sliceKey = dictSliceMap.floorKey(DictSliceKey.wrap(tempVal));
+        byte[] val = Arrays.copyOfRange(value, offset, offset + len);
+        DictSliceKey sliceKey = metadata.sliceFileMap.floorKey(DictSliceKey.wrap(val));
         if (sliceKey == null) {
-            sliceKey = dictSliceMap.firstKey();
+            sliceKey = metadata.sliceFileMap.firstKey();
         }
-        DictSlice slice = dictSliceMap.get(sliceKey);
-        int id = slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag);
-        return id;
-    }
-
-    @Override
-    protected byte[] getValueBytesFromIdWithoutCache(int id) {
-        throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id");
+        DictSlice slice;
+        try {
+            slice = dictCache.get(sliceKey);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Failed to load slice with key " + sliceKey, e.getCause());
+        }
+        return slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag);
     }
 
     @Override
     public int getMinId() {
-        return baseId;
+        return metadata.baseId;
     }
 
     @Override
     public int getMaxId() {
-        return maxId;
+        return metadata.maxId;
     }
 
     @Override
     public int getSizeOfId() {
-        return 4;
+        return Integer.SIZE / Byte.SIZE;
     }
 
     @Override
     public int getSizeOfValue() {
-        return maxValueLength;
+        return metadata.maxValueLength;
     }
 
-    public void flushIndex(CachedTreeMap dictSliceMap, boolean keepAppend) throws IOException {
-        try (FSDataOutputStream indexOut = dictSliceMap.openIndexOutput()) {
-            indexOut.writeInt(baseId);
-            indexOut.writeInt(maxId);
-            indexOut.writeInt(maxValueLength);
-            indexOut.writeInt(nValues);
-            indexOut.writeUTF(bytesConvert.getClass().getName());
-            dictSliceMap.write(indexOut);
-            dictSliceMap.commit(keepAppend);
-        }
+    @Override
+    protected byte[] getValueBytesFromIdWithoutCache(int id) {
+        throw new UnsupportedOperationException("AppendTrieDictionary can't retrieve value from id");
     }
 
     @Override
     public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
-        //copy appendDict
-        Path base = new Path(baseDir);
-        FileSystem srcFs = HadoopUtil.getFileSystem(base);
-        Path srcPath = CachedTreeMap.getLatestVersion(HadoopUtil.getCurrentConfiguration(), srcFs, base);
-        Path dstPath = new Path(srcPath.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()));
-        logger.info("Copy appendDict from {} to {}", srcPath, dstPath);
-
-        FileSystem dstFs = HadoopUtil.getFileSystem(dstPath);
-        if (dstFs.exists(dstPath)) {
-            logger.info("Delete existing AppendDict {}", dstPath);
-            dstFs.delete(dstPath, true);
-        }
-        FileUtil.copy(srcFs, srcPath, dstFs, dstPath, false, true, HadoopUtil.getCurrentConfiguration());
-
-        // init new AppendTrieDictionary
+        GlobalDictStore store = new GlobalDictHDFSStore(baseDir);
+        String dstBaseDir = store.copyToAnotherMeta(srcConfig, dstConfig);
         AppendTrieDictionary newDict = new AppendTrieDictionary();
-        newDict.initParams(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConvert);
-        newDict.initDictSliceMap((CachedTreeMap) dictSliceMap);
-
+        newDict.init(dstBaseDir);
         return newDict;
     }
 
@@ -1192,34 +144,12 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> {
 
     @Override
     public void readFields(DataInput in) throws IOException {
-        String baseDir = in.readUTF();
-        Configuration conf = HadoopUtil.getCurrentConfiguration();
-        try (FSDataInputStream input = CachedTreeMap.openLatestIndexInput(conf, baseDir)) {
-            int baseId = input.readInt();
-            int maxId = input.readInt();
-            int maxValueLength = input.readInt();
-            int nValues = input.readInt();
-            String converterName = input.readUTF();
-            BytesConverter converter = null;
-            if (converterName.isEmpty() == false) {
-                try {
-                    converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
-                } catch (Exception e) {
-                    throw new IOException(e);
-                }
-            }
-            initParams(baseDir, baseId, maxId, maxValueLength, nValues, converter);
-
-            // Create instance for deserialize data, and update to map in dict
-            CachedTreeMap dictMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().baseDir(baseDir).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
-            dictMap.readFields(input);
-            initDictSliceMap(dictMap);
-        }
+        init(in.readUTF());
     }
 
     @Override
     public void dump(PrintStream out) {
-        out.println("Total " + nValues + " values, " + (dictSliceMap == null ? 0 : dictSliceMap.size()) + " slice");
+        out.println(String.format("Total %d values and %d slices", metadata.nValues, metadata.sliceFileMap.size()));
     }
 
     @Override
@@ -1248,5 +178,4 @@ public class AppendTrieDictionary<T> extends CacheDictionary<T> {
     public boolean contains(Dictionary other) {
         return false;
     }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
new file mode 100644
index 0000000..4e86d4c
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryBuilder.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeMap;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class AppendTrieDictionaryBuilder {
+    private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionaryBuilder.class);
+
+    private final String baseDir;
+    private final String workingDir;
+    private final int maxEntriesPerSlice;
+
+    private GlobalDictStore store;
+    private int maxId;
+    private int maxValueLength;
+    private int nValues;
+    private BytesConverter bytesConverter;
+    private TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>(); // slice key -> slice file name
+    private int counter;
+
+    private DictSliceKey curKey;
+    private DictNode curNode;
+
+    public AppendTrieDictionaryBuilder(String resourceDir, int maxEntriesPerSlice) throws IOException {
+        this.baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourceDir + "/";
+        this.workingDir = this.baseDir + "/working";
+        this.maxEntriesPerSlice = maxEntriesPerSlice;
+        init();
+    }
+
+    public void init() throws IOException {
+        this.store = new GlobalDictHDFSStore(baseDir);
+        long[] versions = store.listAllVersions();
+
+        if (versions.length == 0) { // build dict for the first time
+            this.maxId = 0;
+            this.maxValueLength = 0;
+            this.nValues = 0;
+            this.bytesConverter = new StringBytesConverter();
+
+        } else { // append values to last version
+            GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]);
+            this.maxId = metadata.maxId;
+            this.maxValueLength = metadata.maxValueLength;
+            this.nValues = metadata.nValues;
+            this.bytesConverter = metadata.bytesConverter;
+            this.sliceFileMap = new TreeMap<>(metadata.sliceFileMap);
+        }
+
+        store.prepareForWrite(workingDir);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void addValue(String value) {
+        Preconditions.checkNotNull(value, "value");
+        byte[] valueBytes = bytesConverter.convertToBytes(value);
+
+        if (sliceFileMap.isEmpty()) {
+            curNode = new DictNode(new byte[0], false);
+            sliceFileMap.put(DictSliceKey.START_KEY, null);
+        }
+        checkState(sliceFileMap.firstKey().equals(DictSliceKey.START_KEY), "first key should be \"\", but got \"%s\"", sliceFileMap.firstKey());
+
+        DictSliceKey nextKey = sliceFileMap.floorKey(DictSliceKey.wrap(valueBytes));
+
+        if (curKey != null && !nextKey.equals(curKey)) {
+            // you may suppose nextKey>=curKey, but nextKey<curKey could happen when a node splits.
+            // for example, suppose we have curNode [1-10], and add value "2" triggers split:
+            //   first half [1-5] is flushed out, make second half [6-10] curNode and "6" curKey.
+            // then value "3" is added, now we got nextKey "1" smaller than curKey "6", surprise!
+            // in this case, we need to flush [6-10] and read [1-5] back.
+            flushCurrentNode();
+            curNode = null;
+        }
+        if (curNode == null) { // read next slice
+            DictSlice slice = store.readSlice(workingDir, sliceFileMap.get(nextKey));
+            curNode = slice.rebuildTrieTree();
+        }
+        curKey = nextKey;
+
+        addValueR(curNode, valueBytes, 0);
+
+        // split slice if it's too large
+        if (curNode.childrenCount > maxEntriesPerSlice) {
+            DictNode newRoot = splitNodeTree(curNode);
+            flushCurrentNode();
+            curNode = newRoot;
+            curKey = DictSliceKey.wrap(newRoot.firstValue());
+            sliceFileMap.put(curKey, null);
+        }
+
+        maxValueLength = Math.max(maxValueLength, valueBytes.length);
+        if (counter > 0 && counter++ % 1_000_000 == 0) {
+            logger.info("processed {} values", counter);
+        }
+    }
+
+    public AppendTrieDictionary build(int baseId) throws IOException {
+        if (curNode != null) {
+            flushCurrentNode();
+        }
+
+        GlobalDictMetadata metadata = new GlobalDictMetadata(baseId, this.maxId, this.maxValueLength, this.nValues, this.bytesConverter, sliceFileMap);
+        store.commit(workingDir, metadata);
+
+        AppendTrieDictionary dict = new AppendTrieDictionary();
+        dict.init(this.baseDir);
+        return dict;
+    }
+
+    private void flushCurrentNode() {
+        String newSliceFile = store.writeSlice(workingDir, curKey, curNode);
+        String oldSliceFile = sliceFileMap.put(curKey, newSliceFile);
+        if (oldSliceFile != null) {
+            store.deleteSlice(workingDir, oldSliceFile);
+        }
+    }
+
+    private void addValueR(DictNode node, byte[] value, int start) {
+        // match the value part of current node
+        int i = 0, j = start;
+        int n = node.part.length, nn = value.length;
+        int comp = 0;
+        for (; i < n && j < nn; i++, j++) {
+            comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
+            if (comp != 0)
+                break;
+        }
+
+        if (j == nn) {
+            // if value fully matched within the current node
+            if (i == n) {
+                // on first match, mark end of value and assign an ID
+                if (!node.isEndOfValue) {
+                    node.id = createNextId();
+                    node.isEndOfValue = true;
+                }
+            } else {
+                // otherwise, split the current node into two
+                DictNode c = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+                c.id = node.id;
+                node.reset(BytesUtil.subarray(node.part, 0, i), true);
+                node.addChild(c);
+                node.id = createNextId();
+            }
+            return;
+        }
+
+        // if partially matched the current, split the current node, add the new
+        // value, make a 3-way
+        if (i < n) {
+            DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+            c1.id = node.id;
+            DictNode c2 = addNodeMaybeOverflow(value, j, nn);
+            node.reset(BytesUtil.subarray(node.part, 0, i), false);
+            if (comp < 0) {
+                node.addChild(c1);
+                node.addChild(c2);
+            } else {
+                node.addChild(c2);
+                node.addChild(c1);
+            }
+            return;
+        }
+
+        // out matched the current, binary search the next byte for a child node
+        // to continue
+        byte lookfor = value[j];
+        int lo = 0;
+        int hi = node.children.size() - 1;
+        int mid = 0;
+        boolean found = false;
+        comp = 0;
+        while (!found && lo <= hi) {
+            mid = lo + (hi - lo) / 2;
+            DictNode c = node.children.get(mid);
+            comp = BytesUtil.compareByteUnsigned(lookfor, c.part[0]);
+            if (comp < 0)
+                hi = mid - 1;
+            else if (comp > 0)
+                lo = mid + 1;
+            else
+                found = true;
+        }
+        if (found) {
+            // found a child node matching the first byte, continue in that child
+            addValueR(node.children.get(mid), value, j);
+        } else {
+            // otherwise, make the value a new child
+            DictNode c = addNodeMaybeOverflow(value, j, nn);
+            node.addChild(comp <= 0 ? mid : mid + 1, c);
+        }
+    }
+
+    private int createNextId() {
+        int id = ++maxId;
+        checkValidId(id);
+        nValues++;
+        return id;
+    }
+
+    // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
+    private void checkValidId(int id) {
+        if (id == 0 || id == -1) {
+            throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
+        }
+    }
+
+    // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree
+    private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) {
+        DictNode head = null;
+        DictNode current = null;
+        for (; start + 255 < end; start += 255) {
+            DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false);
+            if (head == null) {
+                head = c;
+                current = c;
+            } else {
+                current.addChild(c);
+                current = c;
+            }
+        }
+        DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true);
+        last.id = createNextId();
+        if (head == null) {
+            head = last;
+        } else {
+            current.addChild(last);
+        }
+        return head;
+    }
+
+    private DictNode splitNodeTree(DictNode root) {
+        DictNode parent = root;
+        int childCountToSplit = (int) (maxEntriesPerSlice * 1.0 / 2);
+        while (true) {
+            List<DictNode> children = parent.children;
+            if (children.size() == 0) {
+                break;
+            }
+            if (children.size() == 1) {
+                parent = children.get(0);
+            } else {
+                for (int i = children.size() - 1; i >= 0; i--) {
+                    parent = children.get(i);
+                    if (childCountToSplit > children.get(i).childrenCount) {
+                        childCountToSplit -= children.get(i).childrenCount;
+                    } else {
+                        childCountToSplit--;
+                        break;
+                    }
+                }
+            }
+        }
+        return DictNode.splitNodeTree(parent);
+    }
+
+    // Only used for test
+    void setMaxId(int id) {
+        this.maxId = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
index 4b3817a..b7c39fa 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 
+import static org.apache.kylin.dict.GlobalDictHDFSStore.BUFFER_SIZE;
+
 /**
  * Created by sunyerui on 16/11/15.
  */
@@ -67,16 +69,15 @@ public class AppendTrieDictionaryChecker {
                 listDictSlicePath(fs, status, list);
             }
         } else {
-            if (path.getPath().getName().startsWith(CachedTreeMap.CACHED_PREFIX)) {
+            if (path.getPath().getName().startsWith(GlobalDictHDFSStore.IndexFormatV1.SLICE_PREFIX)) {
                 list.add(path.getPath());
             }
         }
     }
 
     public boolean doCheck(FileSystem fs, Path filePath) {
-        try (FSDataInputStream input = fs.open(filePath, CachedTreeMap.BUFFER_SIZE)) {
-            AppendTrieDictionary.DictSlice slice = new AppendTrieDictionary.DictSlice();
-            slice.readFields(input);
+        try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) {
+            DictSlice slice = DictSlice.deserializeFrom(input);
             return slice.doCheck();
         } catch (Exception e) {
             return false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
deleted file mode 100644
index ee69df7..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.AbstractCollection;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-
-/**
- * Created by sunyerui on 16/5/2.
- * TODO Depends on HDFS for now, ideally just depends on storage interface
- */
-public class CachedTreeMap<K extends WritableComparable, V extends Writable> extends TreeMap<K, V> implements Writable {
-    private static final Logger logger = LoggerFactory.getLogger(CachedTreeMap.class);
-
-    private final Class<K> keyClazz;
-    private final Class<V> valueClazz;
-    transient volatile Collection<V> values;
-    private final LoadingCache<K, V> valueCache;
-    private final Configuration conf;
-    private final Path baseDir;
-    private final Path versionDir;
-    private final Path workingDir;
-    private final FileSystem fs;
-    private final boolean immutable;
-    private final int maxVersions;
-    private final long versionTTL;
-    private boolean keepAppend;
-
-    public static final int BUFFER_SIZE = 8 * 1024 * 1024;
-
-    public static final String CACHED_PREFIX = "cached_";
-    public static final String VERSION_PREFIX = "version_";
-
-    public static class CachedTreeMapBuilder<K, V> {
-        private Class<K> keyClazz;
-        private Class<V> valueClazz;
-        private int maxCount = 8;
-        private String baseDir;
-        private boolean immutable;
-        private int maxVersions;
-        private long versionTTL;
-
-        public static CachedTreeMapBuilder newBuilder() {
-            return new CachedTreeMapBuilder();
-        }
-
-        private CachedTreeMapBuilder() {
-        }
-
-        public CachedTreeMapBuilder keyClazz(Class<K> clazz) {
-            this.keyClazz = clazz;
-            return this;
-        }
-
-        public CachedTreeMapBuilder valueClazz(Class<V> clazz) {
-            this.valueClazz = clazz;
-            return this;
-        }
-
-        public CachedTreeMapBuilder<K, V> maxSize(int maxCount) {
-            this.maxCount = maxCount;
-            return this;
-        }
-
-        public CachedTreeMapBuilder<K, V> baseDir(String baseDir) {
-            this.baseDir = baseDir;
-            return this;
-        }
-
-        public CachedTreeMapBuilder<K, V> immutable(boolean immutable) {
-            this.immutable = immutable;
-            return this;
-        }
-
-        public CachedTreeMapBuilder<K, V> maxVersions(int maxVersions) {
-            this.maxVersions = maxVersions;
-            return this;
-        }
-
-        public CachedTreeMapBuilder<K, V> versionTTL(long versionTTL) {
-            this.versionTTL = versionTTL;
-            return this;
-        }
-
-        public CachedTreeMap build() throws IOException {
-            if (baseDir == null) {
-                throw new RuntimeException("CachedTreeMap need a baseDir to cache data");
-            }
-            if (keyClazz == null || valueClazz == null) {
-                throw new RuntimeException("CachedTreeMap need key and value clazz to serialize data");
-            }
-            CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, immutable, maxVersions, versionTTL);
-            return map;
-        }
-    }
-
-    private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String basePath,
-                          boolean immutable, int maxVersions, long versionTTL) throws IOException {
-        super();
-        this.keyClazz = keyClazz;
-        this.valueClazz = valueClazz;
-        this.immutable = immutable;
-        this.keepAppend = true;
-        this.maxVersions = maxVersions;
-        this.versionTTL = versionTTL;
-        this.conf = HadoopUtil.getCurrentConfiguration();
-        if (basePath.endsWith("/")) {
-            basePath = basePath.substring(0, basePath.length()-1);
-        }
-        this.baseDir = new Path(basePath);
-        this.fs = HadoopUtil.getFileSystem(baseDir, conf);
-        if (!fs.exists(baseDir)) {
-            fs.mkdirs(baseDir);
-        }
-        this.versionDir = getLatestVersion(conf, fs, baseDir);
-        this.workingDir = new Path(baseDir, "working");
-        if (!this.immutable) {
-            // For mutable map, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt
-            if (fs.exists(workingDir)) {
-                fs.delete(workingDir, true);
-            }
-            FileUtil.copy(fs, versionDir, fs, workingDir, false, true, conf);
-        }
-        CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new RemovalListener<K, V>() {
-            @Override
-            public void onRemoval(RemovalNotification<K, V> notification) {
-                logger.info(String.format("Evict cache key %s(%d) with value %s caused by %s, size %d/%d ", notification.getKey(), notification.getKey().hashCode(), notification.getValue(), notification.getCause(), size(), valueCache.size()));
-                switch (notification.getCause()) {
-                case SIZE:
-                    writeValue(notification.getKey(), notification.getValue());
-                    break;
-                case EXPLICIT:
-                    deleteValue(notification.getKey());
-                    break;
-                default:
-                }
-            }
-        });
-        if (this.immutable) {
-            // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc
-            builder.softValues();
-        } else {
-            builder.maximumSize(maxCount);
-        }
-        this.valueCache = builder.build(new CacheLoader<K, V>() {
-            @Override
-            public V load(K key) throws Exception {
-                V value = readValue(key);
-                logger.info(String.format("Load cache by key %s(%d) with value %s", key, key.hashCode(), value));
-                return value;
-            }
-        });
-    }
-
-    private String generateFileName(K key) {
-        String file = getCurrentDir() + "/" + CACHED_PREFIX + key.toString();
-        return file;
-    }
-
-    private String getCurrentDir() {
-        return immutable ? versionDir.toString() : workingDir.toString();
-    }
-
-    private static String[] listAllVersions(FileSystem fs, Path baseDir) throws IOException {
-        FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() {
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith(VERSION_PREFIX)) {
-                    return true;
-                }
-                return false;
-            }
-        });
-        TreeSet<String> versions = new TreeSet<>();
-        for (FileStatus status : fileStatus) {
-            versions.add(status.getPath().toString());
-        }
-        return versions.toArray(new String[versions.size()]);
-    }
-
-    // only for test
-    public String getLatestVersion() throws IOException {
-        return getLatestVersion(conf, fs, baseDir).toUri().getPath();
-    }
-
-    public static Path getLatestVersion(Configuration conf, FileSystem fs, Path baseDir) throws IOException {
-        String[] versions = listAllVersions(fs, baseDir);
-        if (versions.length > 0) {
-            return new Path(versions[versions.length - 1]);
-        } else {
-            // Old format, directly use base dir, convert to new format
-            Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis());
-            Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis());
-            Path indexFile = new Path(baseDir, ".index");
-            FileStatus[] cachedFiles;
-            try {
-                cachedFiles = fs.listStatus(baseDir, new PathFilter() {
-                    @Override
-                    public boolean accept(Path path) {
-                        if (path.getName().startsWith(CACHED_PREFIX)) {
-                            return true;
-                        }
-                        return false;
-                    }
-                });
-                fs.mkdirs(tmpNewVersionDir);
-                if (fs.exists(indexFile) && cachedFiles.length > 0) {
-                    FileUtil.copy(fs, indexFile, fs, tmpNewVersionDir, false, true, conf);
-                    for (FileStatus file : cachedFiles) {
-                        FileUtil.copy(fs, file.getPath(), fs, tmpNewVersionDir, false, true, conf);
-                    }
-                }
-                fs.rename(tmpNewVersionDir, newVersionDir);
-                if (fs.exists(indexFile) && cachedFiles.length > 0) {
-                    fs.delete(indexFile, true);
-                    for (FileStatus file : cachedFiles) {
-                        fs.delete(file.getPath(), true);
-                    }
-                }
-            } finally {
-                if (fs.exists(tmpNewVersionDir)) {
-                    fs.delete(tmpNewVersionDir, true);
-                }
-            }
-            return newVersionDir;
-        }
-    }
-
-    public void commit(boolean keepAppend) throws IOException {
-        assert this.keepAppend && !immutable : "Only support commit method with immutable false and keepAppend true";
-
-        Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis());
-        if (keepAppend) {
-            // Copy to tmp dir, and rename to new version, make sure it's complete when be visible
-            Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis());
-            try {
-                FileUtil.copy(fs, workingDir, fs, tmpNewVersionDir, false, true, conf);
-                fs.rename(tmpNewVersionDir, newVersionDir);
-            } finally {
-                if (fs.exists(tmpNewVersionDir)) {
-                    fs.delete(tmpNewVersionDir, true);
-                }
-            }
-        } else {
-            fs.rename(workingDir, newVersionDir);
-        }
-        this.keepAppend = keepAppend;
-
-        // Check versions count, delete expired versions
-        String[] versions = listAllVersions(fs, baseDir);
-        long timestamp = System.currentTimeMillis();
-        for (int i = 0; i < versions.length - maxVersions; i++) {
-            String versionString = versions[i].substring(versions[i].lastIndexOf(VERSION_PREFIX) + VERSION_PREFIX.length());
-            long version = Long.parseLong(versionString);
-            if (version + versionTTL < timestamp) {
-                fs.delete(new Path(versions[i]), true);
-            }
-        }
-    }
-
-    public void loadEntry(CachedTreeMap other) {
-        for (Object key : other.keySet()) {
-            super.put((K)key, null);
-        }
-    }
-
-    private void writeValue(K key, V value) {
-        if (immutable) {
-            return;
-        }
-        String fileName = generateFileName(key);
-        Path filePath = new Path(fileName);
-        try (FSDataOutputStream out = fs.create(filePath, true, BUFFER_SIZE, (short) 5, BUFFER_SIZE * 8L)) {
-            value.write(out);
-        } catch (Exception e) {
-            logger.error(String.format("write value into %s exception: %s", fileName, e), e);
-            throw new RuntimeException(e.getCause());
-        }
-    }
-
-    private V readValue(K key) throws Exception {
-        String fileName = generateFileName(key);
-        Path filePath = new Path(fileName);
-        try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) {
-            V value = valueClazz.newInstance();
-            value.readFields(input);
-            return value;
-        } catch (Exception e) {
-            logger.error(String.format("read value from %s exception: %s", fileName, e), e);
-            return null;
-        }
-    }
-
-    private void deleteValue(K key) {
-        if (immutable) {
-            return;
-        }
-        String fileName = generateFileName(key);
-        Path filePath = new Path(fileName);
-        try {
-            if (fs.exists(filePath)) {
-                fs.delete(filePath, true);
-            }
-        } catch (Exception e) {
-            logger.error(String.format("delete value file %s exception: %s", fileName, e), e);
-        }
-    }
-
-    @Override
-    public V put(K key, V value) {
-        assert keepAppend && !immutable : "Only support put method with immutable false and keepAppend true";
-        super.put(key, null);
-        valueCache.put(key, value);
-        return null;
-    }
-
-    @Override
-    public V get(Object key) {
-        if (super.containsKey(key)) {
-            try {
-                return valueCache.get((K) key);
-            } catch (ExecutionException e) {
-                logger.error(String.format("get value with key %s exception: %s", key, e), e);
-                return null;
-            }
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public V remove(Object key) {
-        assert keepAppend && !immutable : "Only support remove method with immutable false keepAppend true";
-        super.remove(key);
-        valueCache.invalidate(key);
-        return null;
-    }
-
-    @Override
-    public void clear() {
-        super.clear();
-        values = null;
-        valueCache.invalidateAll();
-    }
-
-    public Collection<V> values() {
-        Collection<V> vs = values;
-        return (vs != null) ? vs : (values = new Values());
-    }
-
-    class Values extends AbstractCollection<V> {
-        @Override
-        public Iterator<V> iterator() {
-            return new ValueIterator<>();
-        }
-
-        @Override
-        public int size() {
-            return CachedTreeMap.this.size();
-        }
-    }
-
-    class ValueIterator<V> implements Iterator<V> {
-        Iterator<K> keyIterator;
-        K currentKey;
-
-        public ValueIterator() {
-            keyIterator = CachedTreeMap.this.keySet().iterator();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return keyIterator.hasNext();
-        }
-
-        @Override
-        public V next() {
-            currentKey = keyIterator.next();
-            try {
-                return (V) valueCache.get(currentKey);
-            } catch (ExecutionException e) {
-                logger.error(String.format("get value with key %s exception: %s", currentKey, e), e);
-                return null;
-            }
-        }
-
-        @Override
-        public void remove() {
-            assert keepAppend && !immutable : "Only support remove method with immutable false and keepAppend true";
-            keyIterator.remove();
-            valueCache.invalidate(currentKey);
-        }
-    }
-
-    public FSDataOutputStream openIndexOutput() throws IOException {
-        assert keepAppend && !immutable : "Only support write method with immutable false and keepAppend true";
-        Path indexPath = new Path(getCurrentDir(), ".index");
-        return fs.create(indexPath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8);
-    }
-
-    public FSDataInputStream openIndexInput() throws IOException {
-        Path indexPath = new Path(getCurrentDir(), ".index");
-        return fs.open(indexPath, 8 * 1024 * 1024);
-    }
-
-    public static FSDataInputStream openLatestIndexInput(Configuration conf, String baseDir) throws IOException {
-        Path basePath = new Path(baseDir);
-        FileSystem fs = HadoopUtil.getFileSystem(basePath, conf);
-        Path indexPath = new Path(getLatestVersion(conf, fs, basePath), ".index");
-        return fs.open(indexPath, 8 * 1024 * 1024);
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        out.writeInt(size());
-        for (K key : keySet()) {
-            key.write(out);
-            V value = valueCache.getIfPresent(key);
-            if (null != value) {
-                writeValue(key, value);
-            }
-        }
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        int size = in.readInt();
-        try {
-            for (int i = 0; i < size; i++) {
-                K key = keyClazz.newInstance();
-                key.readFields(in);
-                super.put(key, null);
-            }
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-}


[2/3] kylin git commit: KYLIN-2506 Refactor Global Dictionary

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java
new file mode 100644
index 0000000..dd9593a
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+
+public class DictNode {
+    public byte[] part;
+    public int id = -1;
+    public boolean isEndOfValue;
+    public ArrayList<DictNode> children = new ArrayList<>();
+
+    public int nValuesBeneath;
+    public DictNode parent;
+    public int childrenCount = 1;
+
+    DictNode(byte[] value, boolean isEndOfValue) {
+        reset(value, isEndOfValue);
+    }
+
+    DictNode(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
+        reset(value, isEndOfValue, children);
+    }
+
+    void reset(byte[] value, boolean isEndOfValue) {
+        reset(value, isEndOfValue, new ArrayList<DictNode>());
+    }
+
+    void reset(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
+        this.part = value;
+        this.isEndOfValue = isEndOfValue;
+        clearChild();
+        for (DictNode child : children) {
+            addChild(child);
+        }
+        this.id = -1;
+    }
+
+    void clearChild() {
+        this.children.clear();
+        int childrenCountDelta = this.childrenCount - 1;
+        for (DictNode p = this; p != null; p = p.parent) {
+            p.childrenCount -= childrenCountDelta;
+        }
+    }
+
+    void addChild(DictNode child) {
+        addChild(-1, child);
+    }
+
+    void addChild(int index, DictNode child) {
+        child.parent = this;
+        if (index < 0) {
+            this.children.add(child);
+        } else {
+            this.children.add(index, child);
+        }
+        for (DictNode p = this; p != null; p = p.parent) {
+            p.childrenCount += child.childrenCount;
+        }
+    }
+
+    private DictNode removeChild(int index) {
+        DictNode child = children.remove(index);
+        child.parent = null;
+        for (DictNode p = this; p != null; p = p.parent) {
+            p.childrenCount -= child.childrenCount;
+        }
+        return child;
+    }
+
+    private DictNode duplicateNode() {
+        DictNode newChild = new DictNode(part, false);
+        newChild.parent = parent;
+        if (parent != null) {
+            int index = parent.children.indexOf(this);
+            parent.addChild(index + 1, newChild);
+        }
+        return newChild;
+    }
+
+    public byte[] firstValue() {
+        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+        DictNode p = this;
+        while (true) {
+            bytes.write(p.part, 0, p.part.length);
+            if (p.isEndOfValue || p.children.size() == 0) {
+                break;
+            }
+            p = p.children.get(0);
+        }
+        return bytes.toByteArray();
+    }
+
+    public static DictNode splitNodeTree(final DictNode splitNode) {
+        if (splitNode == null) {
+            return null;
+        }
+        DictNode current = splitNode;
+        DictNode p = current.parent;
+        while (p != null) {
+            int index = p.children.indexOf(current);
+            assert index != -1;
+            DictNode newParent = p.duplicateNode();
+            for (int i = p.children.size() - 1; i >= index; i--) {
+                DictNode child = p.removeChild(i);
+                newParent.addChild(0, child);
+            }
+            current = newParent;
+            p = p.parent;
+        }
+        return current;
+    }
+
+    public byte[] buildTrieBytes() {
+        Stats stats = Stats.stats(this);
+        int sizeChildOffset = stats.mbpn_sizeChildOffset;
+        int sizeId = stats.mbpn_sizeId;
+
+        // write head
+        byte[] head;
+        try {
+            ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+            DataOutputStream headOut = new DataOutputStream(byteBuf);
+            headOut.write(AppendTrieDictionary.HEAD_MAGIC);
+            headOut.writeShort(0); // head size, will back fill
+            headOut.writeInt(stats.mbpn_footprint); // body size
+            headOut.writeInt(stats.nValues);
+            headOut.write(sizeChildOffset);
+            headOut.write(sizeId);
+            headOut.close();
+            head = byteBuf.toByteArray();
+            BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2);
+        } catch (IOException e) {
+            throw new RuntimeException(e); // shall not happen, as we are
+        }
+
+        byte[] trieBytes = new byte[stats.mbpn_footprint + head.length];
+        System.arraycopy(head, 0, trieBytes, 0, head.length);
+
+        LinkedList<DictNode> open = new LinkedList<DictNode>();
+        IdentityHashMap<DictNode, Integer> offsetMap = new IdentityHashMap<DictNode, Integer>();
+
+        // write body
+        int o = head.length;
+        offsetMap.put(this, o);
+        o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes);
+        if (this.children.isEmpty() == false)
+            open.addLast(this);
+
+        while (open.isEmpty() == false) {
+            DictNode parent = open.removeFirst();
+            build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
+            for (int i = 0; i < parent.children.size(); i++) {
+                DictNode c = parent.children.get(i);
+                boolean isLastChild = (i == parent.children.size() - 1);
+                offsetMap.put(c, o);
+                o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes);
+                if (c.children.isEmpty() == false)
+                    open.addLast(c);
+            }
+        }
+
+        if (o != trieBytes.length)
+            throw new RuntimeException();
+        return trieBytes;
+    }
+
+    private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
+        int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
+        BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
+        trieBytes[parentOffset] |= flags;
+    }
+
+    private int build_writeNode(DictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) {
+        int o = offset;
+
+        // childOffset
+        if (isLastChild)
+            trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
+        if (n.isEndOfValue)
+            trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
+        o += sizeChildOffset;
+
+        // nValueBytes
+        if (n.part.length > 255)
+            throw new RuntimeException("Value length is " + n.part.length + " and larger than 255: " + Bytes.toStringBinary(n.part));
+        BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
+        o++;
+
+        // valueBytes
+        System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
+        o += n.part.length;
+
+        if (n.isEndOfValue) {
+            checkValidId(n.id);
+            BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId);
+            o += sizeId;
+        }
+
+        return o;
+    }
+
+    // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
+    private void checkValidId(int id) {
+        if (id == 0 || id == -1) {
+            throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue()));
+    }
+
+    static class Stats {
+        public interface Visitor {
+            void visit(DictNode n, int level);
+        }
+
+        private static void traverseR(DictNode node, Visitor visitor, int level) {
+            visitor.visit(node, level);
+            for (DictNode c : node.children)
+                traverseR(c, visitor, level + 1);
+        }
+
+        private static void traversePostOrderR(DictNode node, Visitor visitor, int level) {
+            for (DictNode c : node.children)
+                traversePostOrderR(c, visitor, level + 1);
+            visitor.visit(node, level);
+        }
+
+        public int nValues; // number of values in total
+        public int nValueBytesPlain; // number of bytes for all values
+        // uncompressed
+        public int nValueBytesCompressed; // number of values bytes in Trie
+        // (compressed)
+        public int maxValueLength; // size of longest value in bytes
+
+        // the trie is multi-byte-per-node
+        public int mbpn_nNodes; // number of nodes in trie
+        public int mbpn_trieDepth; // depth of trie
+        public int mbpn_maxFanOut; // the maximum no. children
+        public int mbpn_nChildLookups; // number of child lookups during lookup
+        // every value once
+        public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every
+        // value once
+        public int mbpn_sizeValueTotal; // the sum of value space in all nodes
+        public int mbpn_sizeNoValueBytes; // size of field noValueBytes
+        public int mbpn_sizeChildOffset; // size of field childOffset, points to
+        // first child in flattened array
+        public int mbpn_sizeId; // size of id value, always be 4
+        public int mbpn_footprint; // MBPN footprint in bytes
+
+        /**
+         * out print some statistics of the trie and the dictionary built from it
+         */
+        public static Stats stats(DictNode root) {
+            // calculate nEndValueBeneath
+            traversePostOrderR(root, new Visitor() {
+                @Override
+                public void visit(DictNode n, int level) {
+                    n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
+                    for (DictNode c : n.children)
+                        n.nValuesBeneath += c.nValuesBeneath;
+                }
+            }, 0);
+
+            // run stats
+            final Stats s = new Stats();
+            final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
+            traverseR(root, new Visitor() {
+                @Override
+                public void visit(DictNode n, int level) {
+                    if (n.isEndOfValue)
+                        s.nValues++;
+                    s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
+                    s.nValueBytesCompressed += n.part.length;
+                    s.mbpn_nNodes++;
+                    if (s.mbpn_trieDepth < level + 1)
+                        s.mbpn_trieDepth = level + 1;
+                    if (n.children.size() > 0) {
+                        if (s.mbpn_maxFanOut < n.children.size())
+                            s.mbpn_maxFanOut = n.children.size();
+                        int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
+                        s.mbpn_nChildLookups += childLookups;
+                        s.mbpn_nTotalFanOut += childLookups * n.children.size();
+                    }
+
+                    if (level < lenAtLvl.size())
+                        lenAtLvl.set(level, n.part.length);
+                    else
+                        lenAtLvl.add(n.part.length);
+                    int lenSoFar = 0;
+                    for (int i = 0; i <= level; i++)
+                        lenSoFar += lenAtLvl.get(i);
+                    if (lenSoFar > s.maxValueLength)
+                        s.maxValueLength = lenSoFar;
+                }
+            }, 0);
+
+            // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode
+            s.mbpn_sizeId = 4;
+            s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId;
+            s.mbpn_sizeNoValueBytes = 1;
+            s.mbpn_sizeChildOffset = 5;
+            s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset);
+            while (true) { // minimize the offset size to match the footprint
+                int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1);
+                // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
+                // expand t to long before *4, avoiding exceed Integer.MAX_VALUE
+                if (BytesUtil.sizeForValue((long) t * 4) <= s.mbpn_sizeChildOffset - 1) {
+                    s.mbpn_sizeChildOffset--;
+                    s.mbpn_footprint = t;
+                } else
+                    break;
+            }
+
+            return s;
+        }
+
+        /**
+         * out print trie for debug
+         */
+        public void print(DictNode root) {
+            print(root, System.out);
+        }
+
+        public void print(DictNode root, final PrintStream out) {
+            traverseR(root, new Visitor() {
+                @Override
+                public void visit(DictNode n, int level) {
+                    try {
+                        for (int i = 0; i < level; i++)
+                            out.print("  ");
+                        out.print(new String(n.part, "UTF-8"));
+                        out.print(" - ");
+                        if (n.nValuesBeneath > 0)
+                            out.print(n.nValuesBeneath);
+                        if (n.isEndOfValue)
+                            out.print("* [" + n.id + "]");
+                        out.print("\n");
+                    } catch (UnsupportedEncodingException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }, 0);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java
new file mode 100644
index 0000000..3ca0d8f
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class DictSlice {
+    static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict"
+    static final int HEAD_SIZE_I = HEAD_MAGIC.length;
+    static final int BIT_IS_LAST_CHILD = 0x80;
+    static final int BIT_IS_END_OF_VALUE = 0x40;
+
+    private byte[] trieBytes;
+
+    // non-persistent part
+    transient private int headSize;
+    transient private int bodyLen;
+    transient private int sizeChildOffset;
+
+    transient private int nValues;
+    transient private int sizeOfId;
+    // mask MUST be long, since childOffset maybe 5 bytes at most
+    transient private long childOffsetMask;
+    transient private int firstByteOffset;
+
+    public DictSlice(byte[] bytes) {
+        this.trieBytes = bytes;
+        init();
+    }
+
+    private void init() {
+        if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0)
+            throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+        try {
+            DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I));
+            this.headSize = headIn.readShort();
+            this.bodyLen = headIn.readInt();
+            this.nValues = headIn.readInt();
+            this.sizeChildOffset = headIn.read();
+            this.sizeOfId = headIn.read();
+
+            this.childOffsetMask = ~(((long) (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8));
+            this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte
+        } catch (Exception e) {
+            if (e instanceof RuntimeException)
+                throw (RuntimeException) e;
+            else
+                throw new RuntimeException(e);
+        }
+    }
+
+    public static DictSlice deserializeFrom(DataInput in) throws IOException {
+        byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE / Byte.SIZE + Integer.SIZE / Byte.SIZE];
+        in.readFully(headPartial);
+
+        if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0)
+            throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+        DataInputStream headIn = new DataInputStream(//
+                new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
+        int headSize = headIn.readShort();
+        int bodyLen = headIn.readInt();
+        headIn.close();
+
+        byte[] all = new byte[headSize + bodyLen];
+        System.arraycopy(headPartial, 0, all, 0, headPartial.length);
+        in.readFully(all, headPartial.length, all.length - headPartial.length);
+
+        return new DictSlice(all);
+    }
+
+    public byte[] getFirstValue() {
+        int nodeOffset = headSize;
+        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+        while (true) {
+            int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1);
+            bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen);
+            if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) {
+                break;
+            }
+            nodeOffset = headSize + (int) (BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
+            if (nodeOffset == headSize) {
+                break;
+            }
+        }
+        return bytes.toByteArray();
+    }
+
+    /**
+     * returns a code point from [0, nValues), preserving order of value
+     *
+     * @param n            -- the offset of current node
+     * @param inp          -- input value bytes to lookup
+     * @param o            -- offset in the input value bytes matched so far
+     * @param inpEnd       -- end of input
+     * @param roundingFlag -- =0: return -1 if not found
+     *                     -- <0: return closest smaller if not found, return -1
+     *                     -- >0: return closest bigger if not found, return nValues
+     */
+    private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
+        while (true) {
+            // match the current node
+            int p = n + firstByteOffset; // start of node's value
+            int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value
+            for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0]
+                if (trieBytes[p] != inp[o]) {
+                    return -1; // mismatch
+                }
+            }
+
+            // node completely matched, is input all consumed?
+            boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+            if (o == inpEnd) {
+                return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1;
+            }
+
+            // find a child to continue
+            int c = headSize + (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
+            if (c == headSize) // has no children
+                return -1;
+            byte inpByte = inp[o];
+            int comp;
+            while (true) {
+                p = c + firstByteOffset;
+                comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte);
+                if (comp == 0) { // continue in the matching child, reset n and loop again
+                    n = c;
+                    break;
+                } else if (comp < 0) { // try next child
+                    if (checkFlag(c, BIT_IS_LAST_CHILD))
+                        return -1;
+                    c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0);
+                } else { // children are ordered by their first value byte
+                    return -1;
+                }
+            }
+        }
+    }
+
+    private boolean checkFlag(int offset, int bit) {
+        return (trieBytes[offset] & bit) > 0;
+    }
+
+    public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+        int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
+        return id;
+    }
+
+    public DictNode rebuildTrieTree() {
+        return rebuildTrieTreeR(headSize, null);
+    }
+
+    private DictNode rebuildTrieTreeR(int n, DictNode parent) {
+        DictNode root = null;
+        while (true) {
+            int p = n + firstByteOffset;
+            int childOffset = (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
+            int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+            boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+
+            byte[] value = new byte[parLen];
+            System.arraycopy(trieBytes, p, value, 0, parLen);
+
+            DictNode node = new DictNode(value, isEndOfValue);
+            if (isEndOfValue) {
+                int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+                node.id = id;
+            }
+
+            if (parent == null) {
+                root = node;
+            } else {
+                parent.addChild(node);
+            }
+
+            if (childOffset != 0) {
+                rebuildTrieTreeR(childOffset + headSize, node);
+            }
+
+            if (checkFlag(n, BIT_IS_LAST_CHILD)) {
+                break;
+            } else {
+                n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+            }
+        }
+        return root;
+    }
+
+    public boolean doCheck() {
+        int offset = headSize;
+        HashSet<Integer> parentSet = new HashSet<>();
+        boolean lastChild = false;
+
+        while (offset < trieBytes.length) {
+            if (lastChild) {
+                boolean contained = parentSet.remove(offset - headSize);
+                // Can't find parent, the data is corrupted
+                if (!contained) {
+                    return false;
+                }
+                lastChild = false;
+            }
+            int p = offset + firstByteOffset;
+            int childOffset = (int) (BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask);
+            int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+            boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE);
+
+            // Copy value overflow, the data is corrupted
+            if (trieBytes.length < p + parLen) {
+                return false;
+            }
+
+            // Check id is fine
+            if (isEndOfValue) {
+                BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+            }
+
+            // Record it if has children
+            if (childOffset != 0) {
+                parentSet.add(childOffset);
+            }
+
+            // brothers done, move to next parent
+            if (checkFlag(offset, BIT_IS_LAST_CHILD)) {
+                lastChild = true;
+            }
+
+            // move to next node
+            offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+        }
+
+        // ParentMap is empty, meaning all nodes has parent, the data is correct
+        return parentSet.isEmpty();
+    }
+
+    @Override
+    public String toString() {
+        return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen);
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(trieBytes);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof DictSlice)) {
+            return false;
+        }
+        DictSlice that = (DictSlice) o;
+        return Arrays.equals(this.trieBytes, that.trieBytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java
new file mode 100644
index 0000000..8fc3f78
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class DictSliceKey implements Comparable<DictSliceKey> {
+    static final DictSliceKey START_KEY = DictSliceKey.wrap(new byte[0]);
+
+    byte[] key;
+
+    public static DictSliceKey wrap(byte[] key) {
+        DictSliceKey dictKey = new DictSliceKey();
+        dictKey.key = key;
+        return dictKey;
+    }
+
+    @Override
+    public String toString() {
+        return Bytes.toStringBinary(key);
+    }
+
+    @Override
+    public int hashCode() {
+        return Arrays.hashCode(key);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o instanceof DictSliceKey) {
+            DictSliceKey that = (DictSliceKey) o;
+            return Arrays.equals(this.key, that.key);
+        }
+        return false;
+    }
+
+    @Override
+    public int compareTo(DictSliceKey that) {
+        return Bytes.compareTo(key, that.key);
+    }
+
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(key.length);
+        out.write(key);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        key = new byte[in.readInt()];
+        in.readFully(key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
new file mode 100644
index 0000000..b0b9908
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class GlobalDictHDFSStore extends GlobalDictStore {
+
+    static final Logger logger = LoggerFactory.getLogger(GlobalDictHDFSStore.class);
+    static final String V1_INDEX_NAME = ".index";
+    static final String V2_INDEX_NAME = ".index_v2";
+    static final String VERSION_PREFIX = "version_";
+    static final int BUFFER_SIZE = 8 * 1024 * 1024;
+
+    private final Path basePath;
+    private final Configuration conf;
+    private final FileSystem fileSystem;
+
+    protected GlobalDictHDFSStore(String baseDir) throws IOException {
+        super(baseDir);
+        this.basePath = new Path(baseDir);
+        this.conf = HadoopUtil.getCurrentConfiguration();
+        this.fileSystem = HadoopUtil.getFileSystem(baseDir);
+
+        if (!fileSystem.exists(basePath)) {
+            logger.info("Global dict at {} doesn't exist, create a new one", basePath);
+            fileSystem.mkdirs(basePath);
+        }
+
+        migrateOldLayout();
+    }
+
+    // Previously we put slice files and index file directly in base directory,
+    // should migrate to the new versioned layout
+    private void migrateOldLayout() throws IOException {
+        FileStatus[] sliceFiles = fileSystem.listStatus(basePath, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return path.getName().startsWith(IndexFormatV1.SLICE_PREFIX);
+            }
+        });
+        Path indexFile = new Path(basePath, V1_INDEX_NAME);
+
+        if (fileSystem.exists(indexFile) && sliceFiles.length > 0) { // old layout
+            final long version = System.currentTimeMillis();
+            Path tempDir = new Path(basePath, "tmp_" + VERSION_PREFIX + version);
+            Path versionDir = getVersionDir(version);
+
+            logger.info("Convert global dict at {} to new layout with version {}", basePath, version);
+
+            fileSystem.mkdirs(tempDir);
+            // convert to new layout
+            try {
+                // copy index and slice files to temp
+                FileUtil.copy(fileSystem, indexFile, fileSystem, tempDir, false, conf);
+                for (FileStatus sliceFile : sliceFiles) {
+                    FileUtil.copy(fileSystem, sliceFile.getPath(), fileSystem, tempDir, false, conf);
+                }
+                // rename
+                fileSystem.rename(tempDir, versionDir);
+                // delete index and slices files in base dir
+                fileSystem.delete(indexFile, false);
+                for (FileStatus sliceFile : sliceFiles) {
+                    fileSystem.delete(sliceFile.getPath(), true);
+                }
+
+            } finally {
+                if (fileSystem.exists(tempDir)) {
+                    fileSystem.delete(tempDir, true);
+                }
+            }
+        }
+    }
+
+    @Override
+    void prepareForWrite(String workingDir) throws IOException {
+        // TODO create lock file
+        Path working = new Path(workingDir);
+
+        if (fileSystem.exists(working)) {
+            fileSystem.delete(working, true);
+            logger.info("Working directory {} exits, delete it first", working);
+        }
+
+        // when build dict, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt
+        long[] versions = listAllVersions();
+        if (versions.length > 0) {
+            Path latestVersion = getVersionDir(versions[versions.length - 1]);
+            FileUtil.copy(fileSystem, latestVersion, fileSystem, working, false, true, conf);
+        } else {
+            fileSystem.mkdirs(working);
+        }
+    }
+
+    @Override
+    long[] listAllVersions() throws IOException {
+        FileStatus[] versionDirs = fileSystem.listStatus(basePath, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return path.getName().startsWith(VERSION_PREFIX);
+            }
+        });
+        long[] versions = new long[versionDirs.length];
+        for (int i = 0; i < versionDirs.length; i++) {
+            Path path = versionDirs[i].getPath();
+            versions[i] = Long.parseLong(path.getName().substring(VERSION_PREFIX.length()));
+        }
+        return versions;
+    }
+
+    @Override
+    public Path getVersionDir(long version) {
+        return new Path(basePath, VERSION_PREFIX + version);
+    }
+
+    @Override
+    GlobalDictMetadata getMetadata(long version) throws IOException {
+        Path versionDir = getVersionDir(version);
+        FileStatus[] indexFiles = fileSystem.listStatus(versionDir, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return path.getName().startsWith(V1_INDEX_NAME);
+            }
+        });
+        checkState(indexFiles.length == 1, "zero or more than one index file found: %s", Arrays.toString(indexFiles));
+
+        IndexFormat format;
+        String indexFile = indexFiles[0].getPath().getName();
+        if (V2_INDEX_NAME.equals(indexFile)) {
+            format = new IndexFormatV2(fileSystem, conf);
+        } else if (V1_INDEX_NAME.equals(indexFile)) {
+            format = new IndexFormatV1(fileSystem, conf);
+        } else {
+            throw new RuntimeException("Unknown index file: " + indexFile);
+        }
+
+        return format.readIndexFile(versionDir);
+    }
+
+    @Override
+    DictSlice readSlice(String directory, String sliceFileName) {
+        Path path = new Path(directory, sliceFileName);
+        logger.info("read slice from {}", path);
+        try (FSDataInputStream input = fileSystem.open(path, BUFFER_SIZE)) {
+            return DictSlice.deserializeFrom(input);
+        } catch (IOException e) {
+            throw new RuntimeException(String.format("read slice %s failed", path), e);
+        }
+    }
+
+    @Override
+    String writeSlice(String workingDir, DictSliceKey key, DictNode slice) {
+        //write new slice
+        String sliceFile = IndexFormatV2.sliceFileName(key);
+        Path path = new Path(workingDir, sliceFile);
+
+        logger.info("write slice with key {} into file {}", key, path);
+        try (FSDataOutputStream out = fileSystem.create(path, true, BUFFER_SIZE)) {
+            byte[] bytes = slice.buildTrieBytes();
+            out.write(bytes);
+        } catch (IOException e) {
+            throw new RuntimeException(String.format("write slice with key %s into file %s failed", key, path), e);
+        }
+        return sliceFile;
+    }
+
+    @Override
+    void deleteSlice(String workingDir, String sliceFileName) {
+        Path path = new Path(workingDir, sliceFileName);
+        logger.info("delete slice at {}", path);
+        try {
+            if (fileSystem.exists(path)) {
+                fileSystem.delete(path, false);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(String.format("delete slice at %s failed", path), e);
+        }
+    }
+
+    @Override
+    void commit(String workingDir, GlobalDictMetadata metadata) throws IOException {
+        Path workingPath = new Path(workingDir);
+
+        // delete v1 index file
+        Path oldIndexFile = new Path(workingPath, V1_INDEX_NAME);
+        if (fileSystem.exists(oldIndexFile)) {
+            fileSystem.delete(oldIndexFile, false);
+        }
+        // write v2 index file
+        IndexFormat index = new IndexFormatV2(fileSystem, conf);
+        index.writeIndexFile(workingPath, metadata);
+        index.sanityCheck(workingPath, metadata);
+
+        // copy working dir to newVersion dir
+        Path newVersionPath = new Path(basePath, VERSION_PREFIX + System.currentTimeMillis());
+        fileSystem.rename(workingPath, newVersionPath);
+
+        cleanUp();
+    }
+
+    // Check versions count, delete expired versions
+    private void cleanUp() throws IOException {
+        long[] versions = listAllVersions();
+        long timestamp = System.currentTimeMillis();
+        for (int i = 0; i < versions.length - maxVersions; i++) {
+            if (versions[i] + versionTTL < timestamp) {
+                fileSystem.delete(getVersionDir(versions[i]), true);
+            }
+        }
+    }
+
+    @Override
+    String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
+        checkArgument(baseDir.startsWith(srcConfig.getHdfsWorkingDirectory()), "Please check why current directory {} doesn't belong to source working directory {}", baseDir, srcConfig.getHdfsWorkingDirectory());
+
+        final String dstBaseDir = baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory());
+
+        long[] versions = listAllVersions();
+        if (versions.length == 0) { // empty dict, nothing to copy
+            return dstBaseDir;
+        }
+
+        Path srcVersionDir = getVersionDir(versions[versions.length - 1]);
+        Path dstVersionDir = new Path(srcVersionDir.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()));
+        FileSystem dstFS = dstVersionDir.getFileSystem(conf);
+        if (dstFS.exists(dstVersionDir)) {
+            dstFS.delete(dstVersionDir, true);
+        }
+        FileUtil.copy(fileSystem, srcVersionDir, dstFS, dstVersionDir, false, true, conf);
+
+        return dstBaseDir;
+    }
+
+    public interface IndexFormat {
+        GlobalDictMetadata readIndexFile(Path dir) throws IOException;
+
+        void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException;
+
+        void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException;
+    }
+
+    public static class IndexFormatV1 implements IndexFormat {
+        static final String SLICE_PREFIX = "cached_";
+
+        protected final FileSystem fs;
+        protected final Configuration conf;
+
+        protected IndexFormatV1(FileSystem fs, Configuration conf) {
+            this.fs = fs;
+            this.conf = conf;
+        }
+
+        @Override
+        public GlobalDictMetadata readIndexFile(Path dir) throws IOException {
+            Path indexFile = new Path(dir, V1_INDEX_NAME);
+            try (FSDataInputStream in = fs.open(indexFile)) {
+                int baseId = in.readInt();
+                int maxId = in.readInt();
+                int maxValueLength = in.readInt();
+                int nValues = in.readInt();
+                String converterName = in.readUTF();
+                BytesConverter converter;
+                try {
+                    converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+                } catch (Exception e) {
+                    throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e);
+                }
+
+                int nSlices = in.readInt();
+                TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>();
+                for (int i = 0; i < nSlices; i++) {
+                    DictSliceKey key = new DictSliceKey();
+                    key.readFields(in);
+                    sliceFileMap.put(key, sliceFileName(key));
+                }
+                // make sure first key is always ""
+                String firstFile = sliceFileMap.remove(sliceFileMap.firstKey());
+                sliceFileMap.put(DictSliceKey.START_KEY, firstFile);
+
+                return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap);
+            }
+        }
+
+        //only for test
+        @Override
+        public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException {
+            Path indexFile = new Path(dir, V1_INDEX_NAME);
+            try (FSDataOutputStream out = fs.create(indexFile, true)) {
+                out.writeInt(metadata.baseId);
+                out.writeInt(metadata.maxId);
+                out.writeInt(metadata.maxValueLength);
+                out.writeInt(metadata.nValues);
+                out.writeUTF(metadata.bytesConverter.getClass().getName());
+                out.writeInt(metadata.sliceFileMap.size());
+                for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+                    entry.getKey().write(out);
+                }
+            }
+        }
+
+        @Override
+        public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException {
+            throw new UnsupportedOperationException("sanityCheck V1 format is no longer supported");
+        }
+
+        public static String sliceFileName(DictSliceKey key) {
+            return SLICE_PREFIX + key;
+        }
+    }
+
+    public static class IndexFormatV2 extends IndexFormatV1 {
+        static final String SLICE_PREFIX = "cached_";
+        static final int MINOR_VERSION_V1 = 0x01;
+
+        protected IndexFormatV2(FileSystem fs, Configuration conf) {
+            super(fs, conf);
+        }
+
+        @Override
+        public GlobalDictMetadata readIndexFile(Path dir) throws IOException {
+            Path indexFile = new Path(dir, V2_INDEX_NAME);
+            try (FSDataInputStream in = fs.open(indexFile)) {
+                byte minorVersion = in.readByte(); // include a header to allow minor format changes
+                if (minorVersion != MINOR_VERSION_V1) {
+                    throw new RuntimeException("Unsupported minor version " + minorVersion);
+                }
+                int baseId = in.readInt();
+                int maxId = in.readInt();
+                int maxValueLength = in.readInt();
+                int nValues = in.readInt();
+                String converterName = in.readUTF();
+                BytesConverter converter;
+                try {
+                    converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+                } catch (Exception e) {
+                    throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e);
+                }
+
+                int nSlices = in.readInt();
+                TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>();
+                for (int i = 0; i < nSlices; i++) {
+                    DictSliceKey key = new DictSliceKey();
+                    key.readFields(in);
+                    String sliceFileName = in.readUTF();
+                    sliceFileMap.put(key, sliceFileName);
+                }
+
+                return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap);
+            }
+        }
+
+        @Override
+        public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException {
+            Path indexFile = new Path(dir, V2_INDEX_NAME);
+            try (FSDataOutputStream out = fs.create(indexFile, true)) {
+                out.writeByte(MINOR_VERSION_V1);
+                out.writeInt(metadata.baseId);
+                out.writeInt(metadata.maxId);
+                out.writeInt(metadata.maxValueLength);
+                out.writeInt(metadata.nValues);
+                out.writeUTF(metadata.bytesConverter.getClass().getName());
+                out.writeInt(metadata.sliceFileMap.size());
+                for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+                    entry.getKey().write(out);
+                    out.writeUTF(entry.getValue());
+                }
+            }
+        }
+
+        @Override
+        public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException {
+            for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+                if (!fs.exists(new Path(dir, entry.getValue()))) {
+                    throw new RuntimeException("The slice file " + entry.getValue() + " for the key: " + entry.getKey() + " must be existed!");
+                }
+            }
+        }
+
+        public static String sliceFileName(DictSliceKey key) {
+            return String.format("%s%d_%d", SLICE_PREFIX, System.currentTimeMillis(), key.hashCode());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java
new file mode 100644
index 0000000..65c80ca
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Encapsulates the metadata for a particular version of the global dictionary.
+ * Usually each version of a global dictionary stores its metadata in an index file.
+ */
+public class GlobalDictMetadata {
+    final int baseId;
+    final int maxId;
+    final int maxValueLength;
+    final int nValues;
+    final BytesConverter bytesConverter;
+    final TreeMap<DictSliceKey, String> sliceFileMap; // slice key -> slice file name
+
+    public GlobalDictMetadata(int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, NavigableMap<DictSliceKey, String> sliceFileMap) {
+
+        Preconditions.checkNotNull(bytesConverter, "bytesConverter");
+        Preconditions.checkNotNull(sliceFileMap, "sliceFileMap");
+
+        this.baseId = baseId;
+        this.maxId = maxId;
+        this.maxValueLength = maxValueLength;
+        this.nValues = nValues;
+        this.bytesConverter = bytesConverter;
+        this.sliceFileMap = new TreeMap<>(sliceFileMap);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
new file mode 100644
index 0000000..76957f6
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+
+import java.io.IOException;
+
+public abstract class GlobalDictStore {
+
+    protected final String baseDir; // base directory containing all versions of this global dict
+    protected final int maxVersions;
+    protected final int versionTTL;
+
+    protected GlobalDictStore(String baseDir) {
+        this.baseDir = Preconditions.checkNotNull(baseDir, "baseDir");
+        this.maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
+        this.versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
+    }
+
+    // workingDir should be an absolute path, will create if not exists
+    abstract void prepareForWrite(String workingDir) throws IOException;
+
+    /**
+     * @return all versions of this dictionary in ascending order
+     * @throws IOException on I/O error
+     */
+    abstract long[] listAllVersions() throws IOException;
+
+    // return the path of specified version dir
+    abstract Path getVersionDir(long version);
+
+    /**
+     * Get the metadata for a particular version of the dictionary.
+     * @param version version number
+     * @return <i>GlobalDictMetadata</i> for the specified version
+     * @throws IOException on I/O error
+     */
+    abstract GlobalDictMetadata getMetadata(long version) throws IOException;
+
+    /**
+     * Read a <i>DictSlice</i> from a slice file.
+     * @param workingDir directory of the slice file
+     * @param sliceFileName file name of the slice
+     * @return a <i>DictSlice</i>
+     * @throws IOException on I/O error
+     */
+    abstract DictSlice readSlice(String workingDir, String sliceFileName);
+
+    /**
+     * Write a slice with the given key to the specified directory.
+     * @param workingDir where to write the slice, should exist
+     * @param key slice key
+     * @param slice slice to write
+     * @return file name of the new written slice
+     * @throws IOException on I/O error
+     */
+    abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice);
+
+    /**
+     * Delete a slice with the specified file name.
+     * @param workingDir directory of the slice file, should exist
+     * @param sliceFileName file name of the slice, should exist
+     * @throws IOException on I/O error
+     */
+    abstract void deleteSlice(String workingDir, String sliceFileName);
+
+    /**
+     * commit the <i>DictSlice</i> and <i>GlobalDictMetadata</i> in workingDir to new versionDir
+     * @param workingDir where store the tmp slice and index, should exist
+     * @param globalDictMetadata the metadata of global dict
+     * @throws IOException on I/O error
+     */
+    abstract void commit(String workingDir, GlobalDictMetadata globalDictMetadata) throws IOException;
+
+    /**
+     * Copy the latest version of this dict to another meta. The source is unchanged.
+     * @param srcConfig config of source meta
+     * @param dstConfig config of destination meta
+     * @return the new base directory for destination meta
+     * @throws IOException on I/O error
+     */
+    abstract String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index cda3c2b..7921980 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict;
 
 import java.io.IOException;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Dictionary;
 
 /**
@@ -28,7 +29,7 @@ import org.apache.kylin.common.util.Dictionary;
  * Created by sunyerui on 16/5/24.
  */
 public class GlobalDictionaryBuilder implements IDictionaryBuilder {
-    AppendTrieDictionary.Builder<String> builder;
+    AppendTrieDictionaryBuilder builder;
     int baseId;
 
     @Override
@@ -36,19 +37,20 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
         if (dictInfo == null) {
             throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
         }
-        this.builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir());
+
+        int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
+        this.builder = new AppendTrieDictionaryBuilder(dictInfo.getResourceDir(), maxEntriesPerSlice);
         this.baseId = baseId;
     }
-    
+
     @Override
     public boolean addValue(String value) {
         if (value == null)
             return false;
-        
         builder.addValue(value);
         return true;
     }
-    
+
     @Override
     public Dictionary<String> build() throws IOException {
         return builder.build(baseId);

http://git-wip-us.apache.org/repos/asf/kylin/blob/ee3f8e6e/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index e2af338..72b251b 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.dict;
 
+import static org.apache.kylin.dict.GlobalDictHDFSStore.V2_INDEX_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -29,62 +30,75 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileFilter;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
-/**
- * Created by sunyerui on 16/4/28.
- */
 public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
 
-    public static final String BASE_DIR = "file:///tmp/kylin_append_dict";
-    public static final String RESOURCE_DIR = "/dict/append_dict_test";
+    private static final UUID uuid = UUID.randomUUID();
+    private static final String BASE_DIR = "file:///tmp/kylin_append_dict/" + uuid;
+    private static final String RESOURCE_DIR = "/dict/append_dict_test";
+    private static final String LOCAL_BASE_DIR = "/tmp/kylin_append_dict/" + uuid + "/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/";
 
     @Before
-    public void setUp() {
+    public void beforeTest() {
         staticCreateTestMetadata();
-        System.setProperty("kylin.dictionary.append-entry-size", "50000");
-        System.setProperty("kylin.env.hdfs-working-dir", BASE_DIR);
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setProperty("kylin.dictionary.append-entry-size", "50000");
+        config.setProperty("kylin.env.hdfs-working-dir", BASE_DIR);
     }
 
     @After
-    public void after() {
+    public void afterTest() {
         cleanup();
         staticCleanupTestMetadata();
     }
 
-    public static void cleanup() {
+    private void cleanup() {
         Path basePath = new Path(BASE_DIR);
         try {
             HadoopUtil.getFileSystem(basePath).delete(basePath, true);
-        } catch (IOException e) {}
+        } catch (IOException e) {
+        }
     }
 
-    public static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
+    private static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
             "", // empty
-            "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
-            "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
+            "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
             "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
-              + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
+                    + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
             "paint", "tar", "try", // some dup
     };
 
+    private static AppendTrieDictionaryBuilder createBuilder(String resourceDir) throws IOException {
+        int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
+        return new AppendTrieDictionaryBuilder(resourceDir, maxEntriesPerSlice);
+    }
+
     @Test
     public void testStringRepeatly() throws IOException {
         ArrayList<String> list = new ArrayList<>();
@@ -94,20 +108,22 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
         notfound.add("pars");
         notfound.add("tri");
         notfound.add("\u5b57");
-        for (int i = 0; i < 100; i++) {
+        for (int i = 0; i < 50; i++) {
             testStringDictAppend(list, notfound, true);
+            //to speed up the test
+            cleanup();
         }
     }
 
     @Test
-    public void englishWordsTest() throws Exception {
+    public void testEnglishWords() throws Exception {
         InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
         ArrayList<String> str = loadStrings(is);
         testStringDictAppend(str, null, false);
     }
 
     @Test
-    public void categoryNamesTest() throws Exception {
+    public void testCategoryNames() throws Exception {
         InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
         ArrayList<String> str = loadStrings(is);
         testStringDictAppend(str, null, true);
@@ -133,7 +149,8 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
     @Ignore("need huge key set")
     @Test
     public void testHugeKeySet() throws IOException {
-        AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+
         AppendTrieDictionary<String> dict = null;
 
         InputStream is = new FileInputStream("src/test/resources/dict/huge_key");
@@ -143,13 +160,13 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
             while ((word = reader.readLine()) != null) {
                 word = word.trim();
                 if (!word.isEmpty())
-                    b.addValue(word);
+                    builder.addValue(word);
             }
         } finally {
             reader.close();
             is.close();
         }
-        dict = b.build(0);
+        dict = builder.build(0);
         dict.dump(System.out);
     }
 
@@ -162,8 +179,8 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
         }
         BytesConverter converter = new StringBytesConverter();
 
-        AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
-        AppendTrieDictionary<String> dict = null;
+        AppendTrieDictionaryBuilder b = createBuilder(RESOURCE_DIR);
+
         TreeMap<Integer, String> checkMap = new TreeMap<>();
         int firstAppend = rnd.nextInt(strList.size() / 2);
         int secondAppend = firstAppend + rnd.nextInt((strList.size() - firstAppend) / 2);
@@ -173,7 +190,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
         for (; appendIndex < firstAppend; appendIndex++) {
             b.addValue(strList.get(appendIndex));
         }
-        dict = b.build(0);
+        AppendTrieDictionary<String> dict = b.build(0);
         dict.dump(System.out);
         for (; checkIndex < firstAppend; checkIndex++) {
             String str = strList.get(checkIndex);
@@ -185,13 +202,13 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
         }
 
         // reopen dict and append
-//        b = AppendTrieDictionary.Builder.create(dict);
-        b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
+        b = createBuilder(RESOURCE_DIR);
+
         for (; appendIndex < secondAppend; appendIndex++) {
             b.addValue(strList.get(appendIndex));
         }
-        AppendTrieDictionary newDict = b.build(0);
-        assert newDict == dict;
+        AppendTrieDictionary<String> newDict = b.build(0);
+        assert newDict.equals(dict);
         dict = newDict;
         dict.dump(System.out);
         checkIndex = 0;
@@ -210,12 +227,13 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
         }
 
         // reopen dict and append rest str
-        b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
+        b = createBuilder(RESOURCE_DIR);
+
         for (; appendIndex < strList.size(); appendIndex++) {
             b.addValue(strList.get(appendIndex));
         }
         newDict = b.build(0);
-        assert newDict == dict;
+        assert newDict.equals(dict);
         dict = newDict;
         dict.dump(System.out);
         checkIndex = 0;
@@ -268,7 +286,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testMaxInteger() throws IOException {
-        AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
         builder.setMaxId(Integer.MAX_VALUE - 2);
         builder.addValue("a");
         builder.addValue("ab");
@@ -284,7 +302,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
     @Ignore("Only occurred when value is very long (>8000 bytes)")
     @Test
     public void testSuperLongValue() throws IOException {
-        AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
         String value = "a";
         for (int i = 0; i < 10000; i++) {
             value += "a";
@@ -302,14 +320,12 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
     private static class SharedBuilderThread extends Thread {
         CountDownLatch startLatch;
         CountDownLatch finishLatch;
-        String resourcePath;
         String prefix;
         int count;
 
-        SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String resourcePath, String prefix, int count) {
+        SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) {
             this.startLatch = startLatch;
             this.finishLatch = finishLatch;
-            this.resourcePath = resourcePath;
             this.prefix = prefix;
             this.count = count;
         }
@@ -317,27 +333,28 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
         @Override
         public void run() {
             try {
-                AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
+                AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
                 startLatch.countDown();
                 for (int i = 0; i < count; i++) {
                     builder.addValue(prefix + i);
                 }
                 builder.build(0);
                 finishLatch.countDown();
-            } catch (IOException e) {}
+            } catch (IOException e) {
+            }
         }
     }
 
+    @Ignore
     @Test
     public void testSharedBuilder() throws IOException, InterruptedException {
-        String resourcePath = "shared_builder";
         final CountDownLatch startLatch = new CountDownLatch(3);
         final CountDownLatch finishLatch = new CountDownLatch(3);
 
-        AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
-        Thread t1 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t1_", 10000);
-        Thread t2 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t2_", 10);
-        Thread t3 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t3_", 100000);
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000);
+        Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10);
+        Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000);
         t1.start();
         t2.start();
         t3.start();
@@ -345,23 +362,230 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
         AppendTrieDictionary dict = builder.build(0);
         assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS));
         assertEquals(110010, dict.getMaxId());
-        try {
-            builder.addValue("fail");
-            fail("Builder should be closed");
-        } catch (Exception e) {}
 
-        builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict);
+        builder = createBuilder(RESOURCE_DIR);
         builder.addValue("success");
+        builder.addValue("s");
         dict = builder.build(0);
-        for (int i = 0; i < 10000; i ++) {
+        for (int i = 0; i < 10000; i++) {
             assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
         }
-        for (int i = 0; i < 10; i ++) {
+        for (int i = 0; i < 10; i++) {
             assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
         }
-        for (int i = 0; i < 100000; i ++) {
+        for (int i = 0; i < 100000; i++) {
             assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
         }
         assertEquals(110011, dict.getIdFromValue("success"));
+        assertEquals(110012, dict.getIdFromValue("s"));
     }
+
+    @Test
+    public void testSplitContainSuperLongValue() throws IOException {
+        String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";
+
+        createAppendTrieDict(Arrays.asList("a", superLongValue));
+    }
+
+    @Test
+    public void testSuperLongValueAsFileName() throws IOException {
+        String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";
+
+        createAppendTrieDict(Arrays.asList("a", superLongValue));
+    }
+
+    @Test
+    public void testIllegalFileNameValue() throws IOException {
+        createAppendTrieDict(Arrays.asList("::", ":"));
+    }
+
+    @Test
+    public void testSkipAddValue() throws IOException {
+        createAppendTrieDict(new ArrayList<String>());
+    }
+
+    private void createAppendTrieDict(List<String> valueList) throws IOException {
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "1");
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+
+        for (String value : valueList) {
+            builder.addValue(value);
+        }
+
+        builder.build(0);
+    }
+
+    private static class CachedFileFilter implements FileFilter {
+        @Override
+        public boolean accept(File pathname) {
+            return pathname.getName().startsWith("cached_");
+        }
+    }
+
+    private static class VersionFilter implements FileFilter {
+        @Override
+        public boolean accept(File pathname) {
+            return pathname.getName().startsWith(GlobalDictHDFSStore.VERSION_PREFIX);
+        }
+    }
+
+    @Test
+    public void testMultiVersions() throws IOException, InterruptedException {
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("a");
+        builder.addValue("b");
+        builder.addValue("c");
+        builder.addValue("d");
+        builder.addValue("e");
+        builder.addValue("f");
+        AppendTrieDictionary dict = builder.build(0);
+
+        assertEquals(2, dict.getIdFromValue("b"));
+
+        // re-open dict, append new data
+        builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("g");
+
+        // new data is not visible
+        try {
+            dict.getIdFromValue("g");
+            fail("Value 'g' (g) not exists!");
+        } catch (IllegalArgumentException e) {
+
+        }
+
+        // append data, and be visible for new immutable map
+        builder.addValue("h");
+
+        AppendTrieDictionary newDict = builder.build(0);
+        assert newDict.equals(dict);
+
+        assertEquals(7, newDict.getIdFromValue("g"));
+        assertEquals(8, newDict.getIdFromValue("h"));
+
+        // Check versions retention
+        File dir = new File(LOCAL_BASE_DIR);
+        assertEquals(2, dir.listFiles(new VersionFilter()).length);
+    }
+
+    @Test
+    public void testVersionRetention() throws IOException, InterruptedException {
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-max-versions", "1");
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-version-ttl", "1000");
+
+        //String baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + RESOURCE_DIR + "/";
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("a");
+
+        //version 1
+        builder.build(0);
+
+        // Check versions retention
+        File dir = new File(LOCAL_BASE_DIR);
+        assertEquals(1, dir.listFiles(new VersionFilter()).length);
+
+        // sleep to make version 1 expired
+        Thread.sleep(1000);
+
+        //version 2
+        builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("");
+        builder.build(0);
+
+        // Check versions retention
+        assertEquals(1, dir.listFiles(new VersionFilter()).length);
+    }
+
+    @Test
+    public void testOldDirFormat() throws IOException {
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("a");
+        builder.addValue("b");
+        builder.addValue("c");
+        builder.addValue("d");
+        builder.addValue("e");
+        builder.addValue("f");
+        builder.build(0);
+
+        convertDirToOldFormat(LOCAL_BASE_DIR);
+
+        File dir = new File(LOCAL_BASE_DIR);
+        assertEquals(0, dir.listFiles(new VersionFilter()).length);
+        assertEquals(3, dir.listFiles(new CachedFileFilter()).length);
+
+        //convert older format to new format when builder init
+        builder = createBuilder(RESOURCE_DIR);
+        builder.build(0);
+
+        assertEquals(1, dir.listFiles(new VersionFilter()).length);
+    }
+
+    private void convertDirToOldFormat(String baseDir) throws IOException {
+        Path basePath = new Path(baseDir);
+        FileSystem fs = HadoopUtil.getFileSystem(basePath);
+
+        // move version dir to base dir, to simulate the older format
+        GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir);
+        long[] versions = store.listAllVersions();
+        Path versionPath = store.getVersionDir(versions[versions.length - 1]);
+        Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName());
+        fs.rename(versionPath, tmpVersionPath);
+        fs.delete(new Path(baseDir), true);
+        fs.rename(tmpVersionPath, new Path(baseDir));
+    }
+
+    @Test
+    public void testOldIndexFormat() throws IOException {
+        KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+
+        AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("a");
+        builder.addValue("b");
+        builder.addValue("c");
+        builder.addValue("d");
+        builder.addValue("e");
+        builder.addValue("f");
+        builder.build(0);
+
+        convertIndexToOldFormat(LOCAL_BASE_DIR);
+
+        builder = createBuilder(RESOURCE_DIR);
+        builder.addValue("g");
+        builder.addValue("h");
+        builder.addValue("i");
+        AppendTrieDictionary dict = builder.build(0);
+
+        assertEquals(1, dict.getIdFromValue("a"));
+        assertEquals(7, dict.getIdFromValue("g"));
+    }
+
+    private void convertIndexToOldFormat(String baseDir) throws IOException {
+        Path basePath = new Path(baseDir);
+        FileSystem fs = HadoopUtil.getFileSystem(basePath);
+
+        GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir);
+        long[] versions = store.listAllVersions();
+        GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]);
+
+        //convert v2 index to v1 index
+        Path versionPath = store.getVersionDir(versions[versions.length - 1]);
+        Path v2IndexFile = new Path(versionPath, V2_INDEX_NAME);
+
+        fs.delete(v2IndexFile, true);
+        GlobalDictHDFSStore.IndexFormat indexFormatV1 = new GlobalDictHDFSStore.IndexFormatV1(fs, HadoopUtil.getCurrentConfiguration());
+        indexFormatV1.writeIndexFile(versionPath, metadata);
+
+        //convert v2 fileName format to v1 fileName format
+        for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+            fs.rename(new Path(versionPath, entry.getValue()), new Path(versionPath, "cached_" + entry.getKey()));
+        }
+    }
+
 }