You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by su...@apache.org on 2016/06/04 06:41:38 UTC

[3/6] kylin git commit: KYLIN-1705 Global (and more scalable) dictionary

KYLIN-1705 Global (and more scalable) dictionary


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d1a9bab6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d1a9bab6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d1a9bab6

Branch: refs/heads/KYLIN-1705-B2
Commit: d1a9bab640a7ad422f3f8ef091243033161417ab
Parents: 773ca51
Author: sunyerui <su...@gmail.com>
Authored: Fri May 27 00:50:14 2016 +0800
Committer: sunyerui <su...@gmail.com>
Committed: Fri Jun 3 18:59:34 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   16 +
 .../common/persistence/ComparableWritable.java  |   25 +
 .../apache/kylin/cube/model/DictionaryDesc.java |    9 +
 .../model/validation/CubeMetadataValidator.java |    3 +-
 .../model/validation/rule/DictionaryRule.java   |   87 +-
 .../validation/rule/DictionaryRuleTest.java     |  108 ++
 core-dictionary/pom.xml                         |    5 +
 .../apache/kylin/dict/AppendTrieDictionary.java | 1125 ++++++++++++++++++
 .../org/apache/kylin/dict/CachedTreeMap.java    |  348 ++++++
 .../apache/kylin/dict/DictionaryGenerator.java  |   24 +-
 .../apache/kylin/dict/DictionaryManager.java    |    2 +-
 .../kylin/dict/GlobalDictionaryBuilder.java     |   85 ++
 .../apache/kylin/dict/IDictionaryBuilder.java   |    2 +-
 .../kylin/dict/AppendTrieDictionaryTest.java    |  229 ++++
 .../engine/mr/steps/MergeCuboidMapper.java      |   18 +-
 15 files changed, 2066 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0199378..af0c00b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -703,4 +703,20 @@ abstract public class KylinConfigBase implements Serializable {
     public Map<String, String> getUDFs() {
         return getPropertiesByPrefix("kylin.query.udf.");
     }
+
+    public int getAppendDictEntrySize() {
+        return Integer.parseInt(getOptional("kylin.dict.append.entry.size", "10000000"));
+    }
+
+    public void setAppendDictEntrySize(int entrySize) {
+        setProperty("kylin.dict.append.entry.size", String.valueOf(entrySize));
+    }
+
+    public int getAppendDictCacheSize() {
+        return Integer.parseInt(getOptional("kylin.dict.append.cache.size", "20"));
+    }
+
+    public void setAppendDictCacheSize(int cacheSize) {
+        setProperty("kylin.dict.append.cache.size", String.valueOf(cacheSize));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-common/src/main/java/org/apache/kylin/common/persistence/ComparableWritable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ComparableWritable.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ComparableWritable.java
new file mode 100644
index 0000000..5dae9cb
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ComparableWritable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.persistence;
+
+/**
+ * Created by sunyerui on 16/5/11.
+ */
+public interface ComparableWritable extends Comparable, Writable {
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
index 47bba33..3682e41 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/DictionaryDesc.java
@@ -65,4 +65,13 @@ public class DictionaryDesc {
     public String getBuilderClass() {
         return builderClass;
     }
+
+    // for test
+    public static DictionaryDesc create(String column, String reuseColumn, String builderClass) {
+        DictionaryDesc desc = new DictionaryDesc();
+        desc.column = column;
+        desc.reuseColumn = reuseColumn;
+        desc.builderClass = builderClass;
+        return desc;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java
index 7d7710c..d2f84f3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/CubeMetadataValidator.java
@@ -20,6 +20,7 @@ package org.apache.kylin.cube.model.validation;
 
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.rule.AggregationGroupRule;
+import org.apache.kylin.cube.model.validation.rule.DictionaryRule;
 import org.apache.kylin.cube.model.validation.rule.FunctionRule;
 import org.apache.kylin.cube.model.validation.rule.RowKeyAttrRule;
 
@@ -31,7 +32,7 @@ import org.apache.kylin.cube.model.validation.rule.RowKeyAttrRule;
  */
 public class CubeMetadataValidator {
     @SuppressWarnings("unchecked")
-    private IValidatorRule<CubeDesc>[] rules = new IValidatorRule[] { new FunctionRule(), new AggregationGroupRule(), new RowKeyAttrRule() };
+    private IValidatorRule<CubeDesc>[] rules = new IValidatorRule[] { new FunctionRule(), new AggregationGroupRule(), new RowKeyAttrRule(), new DictionaryRule()};
 
     public ValidateContext validate(CubeDesc cube) {
         return validate(cube, false);

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
index 7ce263d..effe30c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
@@ -1 +1,86 @@
-package org.apache.kylin.cube.model.validation.rule;

/**
 * Created by sunyerui on 16/6/1.
 */
public class DictionaryRule {
}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.model.validation.rule;
+
+import org.apache.kylin.cube.model.*;
+import org.apache.kylin.cube.model.validation.IValidatorRule;
+import org.apache.kylin.cube.model.validation.ResultLevel;
+import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.dict.AppendTrieDictionary;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Created by sunyerui on 16/6/1.
+ */
+public class DictionaryRule implements IValidatorRule<CubeDesc> {
+
+    @Override
+    public void validate(CubeDesc cubeDesc, ValidateContext context) {
+        List<DictionaryDesc> dictDescs = cubeDesc.getDictionaries();
+        if (dictDescs == null || dictDescs.isEmpty()) {
+            return;
+        }
+
+        HashMap<TblColRef, String> colToBuilderMap = new HashMap<>();
+        HashMap<TblColRef, TblColRef> colToReuseColMap = new HashMap<>();
+        for (DictionaryDesc dictDesc : dictDescs) {
+            // Make sure the same column associate with same builder class, check the reuse column by default
+            TblColRef dictCol = dictDesc.getResuseColumnRef();
+            if (dictCol == null) {
+                dictCol = dictDesc.getColumnRef();
+            }
+            if (dictCol == null) {
+                context.addResult(ResultLevel.ERROR, "Some column in dictionaries not found");
+                return;
+            }
+            String builder = dictDesc.getBuilderClass();
+            String oldBuilder = colToBuilderMap.put(dictCol, builder);
+            if (oldBuilder != null && !oldBuilder.equals(builder)) {
+                context.addResult(ResultLevel.ERROR, "Column " + dictCol + " has inconsistent builders " + builder + " and " + oldBuilder);
+                return;
+            }
+
+            // Make sure one column only reuse another one column
+            if (dictDesc.getResuseColumnRef() != null) {
+                TblColRef oldReuseCol = colToReuseColMap.put(dictDesc.getColumnRef(), dictDesc.getResuseColumnRef());
+                if (oldReuseCol != null && !dictDesc.getResuseColumnRef().equals(oldReuseCol)) {
+                    context.addResult(ResultLevel.ERROR, "Column " + dictDesc.getColumnRef() + " reuse inconsistent column " + dictDesc.getResuseColumnRef() + " and " + oldReuseCol);
+                    return;
+                }
+            }
+        }
+
+        // Make sure one column do not use GlobalDictBuilder and DimensionDictEncoding together
+        RowKeyColDesc[] rowKeyColDescs = cubeDesc.getRowkey().getRowKeyColumns();
+        for (RowKeyColDesc desc : rowKeyColDescs) {
+            if (desc.isUsingDictionary()) {
+                String builder = colToBuilderMap.get(desc.getColRef());
+                if (builder != null && builder.equals(GlobalDictionaryBuilder.class.getName())) {
+                    context.addResult(ResultLevel.ERROR, "Column " + desc.getColRef() + " used as dimension and conflict with GlobalDictBuilder");
+                    return;
+                }
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
new file mode 100644
index 0000000..ba58d40
--- /dev/null
+++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.model.validation.rule;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DictionaryDesc;
+import org.apache.kylin.cube.model.validation.ValidateContext;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
+import org.apache.kylin.metadata.MetadataManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Created by sunyerui on 16/6/1.
+ */
+public class DictionaryRuleTest extends LocalFileMetadataTestCase {
+    private static KylinConfig config;
+    private static MetadataManager metadataManager;
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        config = KylinConfig.getInstanceFromEnv();
+        metadataManager = MetadataManager.getInstance(config);
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGoodDesc() throws IOException {
+        DictionaryRule rule = new DictionaryRule();
+
+        for (File f : new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/").listFiles()) {
+            CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class);
+            desc.init(config, metadataManager.getAllTablesMap());
+            ValidateContext vContext = new ValidateContext();
+            rule.validate(desc, vContext);
+            vContext.print(System.out);
+            assertTrue(vContext.getResults().length == 0);
+        }
+    }
+
+    @Test
+    public void testBadDesc() throws IOException {
+        testBadDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.SELLER_ID has inconsistent builders " +
+                "FakeBuilderClass and org.apache.kylin.dict.GlobalDictionaryBuilder",
+            DictionaryDesc.create("SELLER_ID", null, "FakeBuilderClass"));
+    }
+
+    @Test
+    public void testBadDesc2() throws IOException {
+        testBadDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.SELLER_ID has inconsistent builders " +
+                        "FakeBuilderClass and org.apache.kylin.dict.GlobalDictionaryBuilder",
+                DictionaryDesc.create("lstg_site_id", "SELLER_ID", "FakeBuilderClass"));
+    }
+
+    @Test
+    public void testBadDesc3() throws IOException {
+        testBadDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.LSTG_SITE_ID used as dimension and conflict with GlobalDictBuilder",
+                DictionaryDesc.create("lstg_site_id", null, GlobalDictionaryBuilder.class.getName()));
+    }
+
+    private void testBadDictionaryDesc(String expectMessage, DictionaryDesc... descs) throws IOException {
+        DictionaryRule rule = new DictionaryRule();
+        File f = new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/test_kylin_cube_without_slr_left_join_desc.json");
+        CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class);
+
+        for (DictionaryDesc dictDesc: descs) {
+            desc.getDictionaries().add(dictDesc);
+        }
+
+        desc.init(config, metadataManager.getAllTablesMap());
+        ValidateContext vContext = new ValidateContext();
+        rule.validate(desc, vContext);
+        vContext.print(System.out);
+        assertTrue(vContext.getResults().length >= 1);
+        assertEquals(expectMessage, vContext.getResults()[0].getMessage());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-dictionary/pom.xml
----------------------------------------------------------------------
diff --git a/core-dictionary/pom.xml b/core-dictionary/pom.xml
index 17319e0..ddcbf53 100644
--- a/core-dictionary/pom.xml
+++ b/core-dictionary/pom.xml
@@ -41,6 +41,11 @@
             <artifactId>kylin-core-metadata</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>compile</scope>
+        </dependency>
 
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
new file mode 100644
index 0000000..a51a798
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
@@ -0,0 +1,1125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ComparableWritable;
+import org.apache.kylin.common.persistence.Writable;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.lang.ref.SoftReference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A dictionary based on Trie data structure that maps enumerations of byte[] to
+ * int IDs, used for global dictionary.
+ *
+ * Trie data is split into sub trees, called {@link DictSlice}, and stored in a {@link CachedTreeMap} with a configurable cache size.
+ * 
+ * With Trie the memory footprint of the mapping is kinda minimized at the cost
+ * CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is
+ * roughly 10 times slower, so there's a cache layer overlays on top of Trie and
+ * gracefully fall back to Trie using a weak reference.
+ * 
+ * The implementation is NOT thread-safe for now.
+ *
+ * TODO making it thread-safe
+ * 
+ * @author sunyerui
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class AppendTrieDictionary<T> extends Dictionary<T> {
+
+    public static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict"
+    public static final int HEAD_SIZE_I = HEAD_MAGIC.length;
+
+    public static final int BIT_IS_LAST_CHILD = 0x80;
+    public static final int BIT_IS_END_OF_VALUE = 0x40;
+
+    private static final Logger logger = LoggerFactory.getLogger(AppendTrieDictionary.class);
+
+    transient private String baseDir;
+    transient private int baseId;
+    transient private int maxId;
+    transient private int maxValueLength;
+    transient private int nValues;
+    transient private BytesConverter<T> bytesConverter;
+
+    private TreeMap<DictSliceKey, DictSlice> dictSliceMap;
+
+    transient private boolean enableValueCache = true;
+    transient private SoftReference<HashMap> valueToIdCache;
+
+    // Constructor both for build and deserialize
+    public AppendTrieDictionary() {
+        if (enableValueCache) {
+            valueToIdCache = new SoftReference<>(new HashMap());
+        }
+    }
+
+    public void update(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, byte[] dictMapBytes) throws IOException {
+        ByteArrayInputStream buf = new ByteArrayInputStream(dictMapBytes);
+        DataInputStream input = new DataInputStream(buf);
+        update(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter, input);
+    }
+
+    public void update(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, DataInput input) throws IOException {
+        this.baseDir = baseDir;
+        this.baseId = baseId;
+        this.maxId = maxId;
+        this.maxValueLength = maxValueLength;
+        this.nValues = nValues;
+        this.bytesConverter = bytesConverter;
+
+        int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
+        if (dictSliceMap == null) {
+            dictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize)
+                    .baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
+        }
+        dictSliceMap.clear();
+        ((Writable)dictSliceMap).readFields(input);
+    }
+
+
+    public byte[] writeDictMap() throws IOException {
+        ByteArrayOutputStream buf = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(buf);
+        ((Writable)dictSliceMap).write(out);
+        byte[] dictMapBytes = buf.toByteArray();
+        buf.close();
+        out.close();
+
+        return dictMapBytes;
+    }
+
+    public static class DictSliceKey implements ComparableWritable {
+        byte[] key;
+
+        public static DictSliceKey wrap(byte[] key) {
+            DictSliceKey dictKey = new DictSliceKey();
+            dictKey.key = key;
+            return dictKey;
+        }
+
+        @Override
+        public String toString() {
+            return Bytes.toStringBinary(key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(key);
+        }
+
+        @Override
+        public int compareTo(Object o) {
+            if (!(o instanceof DictSliceKey)) {
+                return -1;
+            }
+            DictSliceKey other = (DictSliceKey)o;
+            return Bytes.compareTo(key, other.key);
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            out.writeInt(key.length);
+            out.write(key);
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException {
+            key = new byte[in.readInt()];
+            in.readFully(key);
+        }
+    }
+
+    public static class DictSlice<T> implements Writable {
+        public DictSlice() {}
+
+        public DictSlice(byte[] trieBytes) {
+            init(trieBytes);
+        }
+
+        private byte[] trieBytes;
+
+        // non-persistent part
+        transient private int headSize;
+        @SuppressWarnings("unused")
+        transient private int bodyLen;
+        transient private int sizeChildOffset;
+
+        transient private int nValues;
+        transient private int sizeOfId;
+        transient private int childOffsetMask;
+        transient private int firstByteOffset;
+
+        private void init(byte[] trieBytes) {
+            this.trieBytes = trieBytes;
+            if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0)
+                throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+            try {
+                DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I));
+                this.headSize = headIn.readShort();
+                this.bodyLen = headIn.readInt();
+                this.nValues = headIn.readInt();
+                this.sizeChildOffset = headIn.read();
+                this.sizeOfId = headIn.read();
+
+                this.childOffsetMask = ~((BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE) << ((sizeChildOffset - 1) * 8));
+                this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte
+            } catch (Exception e) {
+                if (e instanceof RuntimeException)
+                    throw (RuntimeException) e;
+                else
+                    throw new RuntimeException(e);
+            }
+        }
+
+        public byte[] getFirstValue() {
+            int nodeOffset = headSize;
+            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+            while (true) {
+                int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1);
+                bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen);
+                if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) {
+                    break;
+                }
+                nodeOffset = headSize + (BytesUtil.readUnsigned(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
+                if (nodeOffset == headSize) {
+                    break;
+                }
+            }
+            return bytes.toByteArray();
+        }
+
+        /**
+         * returns a code point from [0, nValues), preserving order of value
+         *
+         * @param n
+         *            -- the offset of current node
+         * @param inp
+         *            -- input value bytes to lookup
+         * @param o
+         *            -- offset in the input value bytes matched so far
+         * @param inpEnd
+         *            -- end of input
+         * @param roundingFlag
+         *            -- =0: return -1 if not found
+         *            -- <0: return closest smaller if not found, return -1
+         *            -- >0: return closest bigger if not found, return nValues
+         */
+        private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
+            while (true) {
+                // match the current node
+                int p = n + firstByteOffset; // start of node's value
+                int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value
+                for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0]
+                    if (trieBytes[p] != inp[o]) {
+                        return -1; // mismatch
+                    }
+                }
+
+                // node completely matched, is input all consumed?
+                boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+                if (o == inpEnd) {
+                    return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1;
+                }
+
+                // find a child to continue
+                int c = headSize + (BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask);
+                if (c == headSize) // has no children
+                    return -1;
+                byte inpByte = inp[o];
+                int comp;
+                while (true) {
+                    p = c + firstByteOffset;
+                    comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte);
+                    if (comp == 0) { // continue in the matching child, reset n and loop again
+                        n = c;
+                        break;
+                    } else if (comp < 0) { // try next child
+                        if (checkFlag(c, BIT_IS_LAST_CHILD))
+                            return -1;
+                        c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0);
+                    } else { // children are ordered by their first value byte
+                        return -1;
+                    }
+                }
+            }
+        }
+
+        private boolean checkFlag(int offset, int bit) {
+            return (trieBytes[offset] & bit) > 0;
+        }
+
+        public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+            int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
+            return id;
+        }
+
+        private DictNode rebuildTrieTree() {
+           return rebuildTrieTreeR(headSize, null);
+        }
+
+        private DictNode rebuildTrieTreeR(int n, DictNode parent) {
+            DictNode root = null;
+            while (true) {
+                int p = n + firstByteOffset;
+                int childOffset = BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask;
+                int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+                boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+
+                byte[] value = new byte[parLen];
+                System.arraycopy(trieBytes, p, value, 0, parLen);
+
+                DictNode node = new DictNode(value, isEndOfValue);
+                if (isEndOfValue) {
+                    int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+                    node.id = id;
+                }
+
+                if (parent == null) {
+                    root = node;
+                } else {
+                    parent.addChild(node);
+                }
+
+                if (childOffset != 0) {
+                    rebuildTrieTreeR(childOffset + headSize, node);
+                }
+
+                if (checkFlag(n, BIT_IS_LAST_CHILD)) {
+                    break;
+                } else {
+                    n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+                }
+            }
+            return root;
+        }
+
+        public void write(DataOutput out) throws IOException {
+            out.write(trieBytes);
+        }
+
+        public void readFields(DataInput in) throws IOException {
+            byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE/Byte.SIZE + Integer.SIZE/Byte.SIZE];
+            in.readFully(headPartial);
+
+            if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0)
+                throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+            DataInputStream headIn = new DataInputStream( //
+                    new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
+            int headSize = headIn.readShort();
+            int bodyLen = headIn.readInt();
+            headIn.close();
+
+            byte[] all = new byte[headSize + bodyLen];
+            System.arraycopy(headPartial, 0, all, 0, headPartial.length);
+            in.readFully(all, headPartial.length, all.length - headPartial.length);
+
+            init(all);
+        }
+
+        public static DictNode rebuildNodeByDeserialize(DataInput in) throws IOException {
+            DictSlice slice = new DictSlice();
+            slice.readFields(in);
+            return slice.rebuildTrieTree();
+        }
+
+        @Override
+        public String toString() {
+            return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen);
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(trieBytes);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if ((o instanceof AppendTrieDictionary.DictSlice) == false) {
+                logger.info("Equals return false because it's not DictInfo");
+                return false;
+            }
+            DictSlice that = (DictSlice) o;
+            return Arrays.equals(this.trieBytes, that.trieBytes);
+        }
+    }
+
+    public static class DictNode implements Writable {
+        public byte[] part;
+        public int id = -1;
+        public boolean isEndOfValue;
+        public ArrayList<DictNode> children = new ArrayList<>();
+
+        public int nValuesBeneath;
+        public DictNode parent;
+        public int childrenCount = 1;
+
+        public DictNode() {}
+
+        public void clone(DictNode o) {
+            this.part = o.part;
+            this.id = o.id;
+            this.isEndOfValue = o.isEndOfValue;
+            this.children = o.children;
+            this.nValuesBeneath = o.nValuesBeneath;
+            this.parent = o.parent;
+            this.childrenCount = o.childrenCount;
+        }
+
+        DictNode(byte[] value, boolean isEndOfValue) {
+            reset(value, isEndOfValue);
+        }
+
+        DictNode(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
+            reset(value, isEndOfValue, children);
+        }
+
+        void reset(byte[] value, boolean isEndOfValue) {
+            reset(value, isEndOfValue, new ArrayList<DictNode>());
+        }
+
+        void reset(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
+            this.part = value;
+            this.isEndOfValue = isEndOfValue;
+            clearChild();
+            for (DictNode child : children) {
+                addChild(child);
+            }
+            this.id = -1;
+        }
+
+        void clearChild() {
+            this.children.clear();
+            int childrenCountDelta = this.childrenCount - 1;
+            for (DictNode p = this; p != null; p = p.parent) {
+                p.childrenCount -= childrenCountDelta;
+            }
+        }
+
+        void addChild(DictNode child) {
+            addChild(-1, child);
+        }
+
+        void addChild(int index, DictNode child) {
+            child.parent = this;
+            if (index < 0) {
+                this.children.add(child);
+            } else {
+                this.children.add(index, child);
+            }
+            for (DictNode p = this; p != null; p = p.parent) {
+                p.childrenCount += child.childrenCount;
+            }
+        }
+
+        public DictNode removeChild(int index) {
+            DictNode child = children.remove(index);
+            child.parent = null;
+            for (DictNode p = this; p != null; p = p.parent) {
+                p.childrenCount -= child.childrenCount;
+            }
+            return child;
+        }
+
+        public DictNode duplicateNode() {
+            DictNode newChild = new DictNode(part, false);
+            newChild.parent = parent;
+            if (parent != null) {
+                int index = parent.children.indexOf(this);
+                parent.addChild(index + 1, newChild);
+            }
+            return newChild;
+        }
+
+        public byte[] firstValue() {
+            ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+            DictNode p = this;
+            while (true) {
+                bytes.write(p.part, 0, p.part.length);
+                if (p.isEndOfValue || p.children.size() == 0) {
+                    break;
+                }
+                p = p.children.get(0);
+            }
+            return bytes.toByteArray();
+        }
+
+        public static DictNode splitNodeTree(DictNode splitNode) {
+            if (splitNode == null) {
+                return null;
+            }
+            DictNode current = splitNode;
+            DictNode p = current.parent;
+            while (p != null) {
+                int index = p.children.indexOf(current);
+                assert index != -1;
+                DictNode newParent = p.duplicateNode();
+                for (int i = p.children.size()-1; i >= index; i--) {
+                    DictNode child = p.removeChild(i);
+                    newParent.addChild(0, child);
+                }
+                current = newParent;
+                p = p.parent;
+            }
+            return current;
+        }
+
+        public static void mergeSingleByteNode(DictNode root, int leftOrRight) {
+            DictNode current = root;
+            DictNode child;
+            while (!current.children.isEmpty()) {
+                child = leftOrRight == 0 ? current.children.get(0) : current.children.get(current.children.size()-1);
+                if (current.children.size() > 1 || current.isEndOfValue) {
+                    current = child;
+                    continue;
+                }
+                byte[] newValue = new byte[current.part.length+child.part.length];
+                System.arraycopy(current.part, 0, newValue, 0, current.part.length);
+                System.arraycopy(child.part, 0, newValue, current.part.length, child.part.length);
+                current.reset(newValue, child.isEndOfValue, child.children);
+                current.id = child.id;
+            }
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            byte[] bytes = buildTrieBytes();
+            out.write(bytes);
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException {
+            DictNode root = DictSlice.rebuildNodeByDeserialize(in);
+            this.clone(root);
+        }
+
+        protected byte[] buildTrieBytes() {
+            Stats stats = Stats.stats(this);
+            int sizeChildOffset = stats.mbpn_sizeChildOffset;
+            int sizeId = stats.mbpn_sizeId;
+
+            // write head
+            byte[] head;
+            try {
+                ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+                DataOutputStream headOut = new DataOutputStream(byteBuf);
+                headOut.write(AppendTrieDictionary.HEAD_MAGIC);
+                headOut.writeShort(0); // head size, will back fill
+                headOut.writeInt(stats.mbpn_footprint); // body size
+                headOut.writeInt(stats.nValues);
+                headOut.write(sizeChildOffset);
+                headOut.write(sizeId);
+                headOut.close();
+                head = byteBuf.toByteArray();
+                BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2);
+            } catch (IOException e) {
+                throw new RuntimeException(e); // shall not happen, as we are
+            }
+
+            byte[] trieBytes = new byte[stats.mbpn_footprint + head.length];
+            System.arraycopy(head, 0, trieBytes, 0, head.length);
+
+            LinkedList<DictNode> open = new LinkedList<DictNode>();
+            IdentityHashMap<DictNode, Integer> offsetMap = new IdentityHashMap<DictNode, Integer>();
+
+            // write body
+            int o = head.length;
+            offsetMap.put(this, o);
+            o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes);
+            if (this.children.isEmpty() == false)
+                open.addLast(this);
+
+            while (open.isEmpty() == false) {
+                DictNode parent = open.removeFirst();
+                build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
+                for (int i = 0; i < parent.children.size(); i++) {
+                    DictNode c = parent.children.get(i);
+                    boolean isLastChild = (i == parent.children.size() - 1);
+                    offsetMap.put(c, o);
+                    o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes);
+                    if (c.children.isEmpty() == false)
+                        open.addLast(c);
+                }
+            }
+
+            if (o != trieBytes.length)
+                throw new RuntimeException();
+            return trieBytes;
+        }
+
+        private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
+            int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
+            BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
+            trieBytes[parentOffset] |= flags;
+        }
+
+        private int build_writeNode(DictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) {
+            int o = offset;
+
+            // childOffset
+            if (isLastChild)
+                trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
+            if (n.isEndOfValue)
+                trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
+            o += sizeChildOffset;
+
+            // nValueBytes
+            if (n.part.length > 255)
+                throw new RuntimeException();
+            BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
+            o++;
+
+            // valueBytes
+            System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
+            o += n.part.length;
+
+            if (n.isEndOfValue) {
+                assert n.id > 0;
+                BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId);
+                o += sizeId;
+            }
+
+            return o;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue()));
+        }
+    }
+
+    public static class Stats {
+        public interface Visitor {
+            void visit(DictNode n, int level);
+        }
+
+        private static void traverseR(DictNode node, Visitor visitor, int level) {
+            visitor.visit(node, level);
+            for (DictNode c : node.children)
+                traverseR(c, visitor, level + 1);
+        }
+
+        private static void traversePostOrderR(DictNode node, Visitor visitor, int level) {
+            for (DictNode c : node.children)
+                traversePostOrderR(c, visitor, level + 1);
+            visitor.visit(node, level);
+        }
+
+
+        public int nValues; // number of values in total
+        public int nValueBytesPlain; // number of bytes for all values
+        // uncompressed
+        public int nValueBytesCompressed; // number of values bytes in Trie
+        // (compressed)
+        public int maxValueLength; // size of longest value in bytes
+
+        // the trie is multi-byte-per-node
+        public int mbpn_nNodes; // number of nodes in trie
+        public int mbpn_trieDepth; // depth of trie
+        public int mbpn_maxFanOut; // the maximum no. children
+        public int mbpn_nChildLookups; // number of child lookups during lookup
+        // every value once
+        public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every
+        // value once
+        public int mbpn_sizeValueTotal; // the sum of value space in all nodes
+        public int mbpn_sizeNoValueBytes; // size of field noValueBytes
+        public int mbpn_sizeChildOffset; // size of field childOffset, points to
+        // first child in flattened array
+        public int mbpn_sizeId;          // size of id value, always be 4
+        public int mbpn_footprint; // MBPN footprint in bytes
+
+        /**
+         * out print some statistics of the trie and the dictionary built from it
+         */
+        public static Stats stats(DictNode root) {
+            // calculate nEndValueBeneath
+            traversePostOrderR(root, new Visitor() {
+                @Override
+                public void visit(DictNode n, int level) {
+                    n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
+                    for (DictNode c : n.children)
+                        n.nValuesBeneath += c.nValuesBeneath;
+                }
+            }, 0);
+
+            // run stats
+            final Stats s = new Stats();
+            final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
+            traverseR(root, new Visitor() {
+                @Override
+                public void visit(DictNode n, int level) {
+                    if (n.isEndOfValue)
+                        s.nValues++;
+                    s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
+                    s.nValueBytesCompressed += n.part.length;
+                    s.mbpn_nNodes++;
+                    if (s.mbpn_trieDepth < level + 1)
+                        s.mbpn_trieDepth = level + 1;
+                    if (n.children.size() > 0) {
+                        if (s.mbpn_maxFanOut < n.children.size())
+                            s.mbpn_maxFanOut = n.children.size();
+                        int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
+                        s.mbpn_nChildLookups += childLookups;
+                        s.mbpn_nTotalFanOut += childLookups * n.children.size();
+                    }
+
+                    if (level < lenAtLvl.size())
+                        lenAtLvl.set(level, n.part.length);
+                    else
+                        lenAtLvl.add(n.part.length);
+                    int lenSoFar = 0;
+                    for (int i = 0; i <= level; i++)
+                        lenSoFar += lenAtLvl.get(i);
+                    if (lenSoFar > s.maxValueLength)
+                        s.maxValueLength = lenSoFar;
+                }
+            }, 0);
+
+            // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode
+            s.mbpn_sizeId = 4;
+            s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId;
+            s.mbpn_sizeNoValueBytes = 1;
+            s.mbpn_sizeChildOffset = 4;
+            s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset);
+            while (true) { // minimize the offset size to match the footprint
+                int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1);
+                // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
+                if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) {
+                    s.mbpn_sizeChildOffset--;
+                    s.mbpn_footprint = t;
+                } else
+                    break;
+            }
+
+            return s;
+        }
+
+        /**
+         * out print trie for debug
+         */
+        public void print(DictNode root) {
+            print(root, System.out);
+        }
+
+        public void print(DictNode root, final PrintStream out) {
+            traverseR(root, new Visitor() {
+                @Override
+                public void visit(DictNode n, int level) {
+                    try {
+                        for (int i = 0; i < level; i++)
+                            out.print("  ");
+                        out.print(new String(n.part, "UTF-8"));
+                        out.print(" - ");
+                        if (n.nValuesBeneath > 0)
+                            out.print(n.nValuesBeneath);
+                        if (n.isEndOfValue)
+                            out.print("* [" + n.id + "]");
+                        out.print("\n");
+                    } catch (UnsupportedEncodingException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }, 0);
+        }
+    }
+
+    public static class Builder<T> {
+        private String baseDir;
+        private int maxId;
+        private int maxValueLength;
+        private int nValues;
+        private BytesConverter<T> bytesConverter;
+
+        private AppendTrieDictionary dict;
+
+        private TreeMap<DictSliceKey, DictNode> dictSliceMap;
+        private static int MAX_ENTRY_IN_SLICE = 10_000_000;
+        private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0;
+
+        private int processedCount = 0;
+
+        public static Builder create(String baseDir) throws IOException {
+            return new Builder<>(null, baseDir, 0, 0, 0, new StringBytesConverter(), null);
+        }
+
+        public static Builder create(AppendTrieDictionary dict) throws IOException {
+            return new Builder<>(dict, dict.baseDir, dict.maxId, dict.maxValueLength, dict.nValues, dict.bytesConverter, dict.writeDictMap());
+        }
+
+        // Constructor for a new Dict
+        private Builder(AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException {
+            this.dict = dict;
+            this.baseDir = baseDir;
+            this.maxId = maxId;
+            this.maxValueLength = maxValueLength;
+            this.nValues = nValues;
+            this.bytesConverter = bytesConverter;
+
+            MAX_ENTRY_IN_SLICE = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
+            int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
+            // create a new cached map with baseDir
+            dictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize)
+                    .baseDir(baseDir).keyClazz(DictSliceKey.class).valueClazz(DictNode.class)
+                    .persistent(true).immutable(false).build();
+            if (dictMapBytes != null) {
+                ((Writable)dictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes)));
+            }
+        }
+
+        public void addValue(T value) {
+            addValue(bytesConverter.convertToBytes(value));
+        }
+
+        public void addValue(byte[] value) {
+            if (++processedCount % 1_000_000 == 0) {
+                logger.debug("add value count " + processedCount);
+            }
+            maxValueLength = Math.max(maxValueLength, value.length);
+
+            if (dictSliceMap.isEmpty()) {
+                DictNode root = new DictNode(new byte[0], false);
+                dictSliceMap.put(DictSliceKey.wrap(new byte[0]), root);
+            }
+            DictSliceKey sliceKey = dictSliceMap.floorKey(DictSliceKey.wrap(value));
+            if (sliceKey == null) {
+                sliceKey = dictSliceMap.firstKey();
+            }
+            DictNode root = dictSliceMap.get(sliceKey);
+            addValueR(root, value, 0);
+            if (root.childrenCount > MAX_ENTRY_IN_SLICE * MAX_ENTRY_OVERHEAD_FACTOR) {
+                dictSliceMap.remove(sliceKey);
+                DictNode newRoot = splitNodeTree(root);
+                DictNode.mergeSingleByteNode(root, 1);
+                DictNode.mergeSingleByteNode(newRoot, 0);
+                dictSliceMap.put(DictSliceKey.wrap(root.firstValue()), root);
+                dictSliceMap.put(DictSliceKey.wrap(newRoot.firstValue()), newRoot);
+            }
+        }
+
+        private DictNode splitNodeTree(DictNode root) {
+            DictNode parent = root;
+            DictNode splitNode;
+            int childCountToSplit = (int)(MAX_ENTRY_IN_SLICE * MAX_ENTRY_OVERHEAD_FACTOR / 2);
+            while (true) {
+                List<DictNode> children = parent.children;
+                if (children.size() == 0){
+                    splitNode = parent;
+                    break;
+                } else if (children.size() == 1) {
+                    parent = children.get(0);
+                    continue;
+                } else {
+                    for (int i = children.size()-1; i >= 0; i--) {
+                        parent = children.get(i);
+                        if (childCountToSplit > children.get(i).childrenCount) {
+                            childCountToSplit -= children.get(i).childrenCount;
+                        } else {
+                            childCountToSplit --;
+                            break;
+                        }
+                    }
+                }
+            }
+            return DictNode.splitNodeTree(splitNode);
+        }
+
+        private int createNextId() {
+            int id = ++maxId;
+            if (maxId < 0) {
+                throw new IllegalArgumentException("AppendTrieDictionary Id overflow Integer.MAX_VALUE");
+            }
+            nValues ++;
+            return id;
+        }
+
+        private void addValueR(DictNode node, byte[] value, int start) {
+            assert value.length - start <= 255 : "value bytes overflow than 255";
+            // match the value part of current node
+            int i = 0, j = start;
+            int n = node.part.length, nn = value.length;
+            int comp = 0;
+            for (; i < n && j < nn; i++, j++) {
+                comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
+                if (comp != 0)
+                    break;
+            }
+
+            // if value fully matched within the current node
+            if (j == nn) {
+                // if equals to current node, just mark end of value
+                if (i == n) {
+                    // if the first match, assign an Id to nodt
+                    if (!node.isEndOfValue) {
+                        node.id = createNextId();
+                    }
+                    node.isEndOfValue = true;
+                }
+                // otherwise, split the current node into two
+                else {
+                    DictNode c = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+                    c.id = node.id;
+                    node.reset(BytesUtil.subarray(node.part, 0, i), true);
+                    node.addChild(c);
+                    node.id = createNextId();
+                }
+                return;
+            }
+
+            // if partially matched the current, split the current node, add the new
+            // value, make a 3-way
+            if (i < n) {
+                DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+                c1.id = node.id;
+                DictNode c2 = new DictNode(BytesUtil.subarray(value, j, nn), true);
+                c2.id = createNextId();
+                node.reset(BytesUtil.subarray(node.part, 0, i), false);
+                if (comp < 0) {
+                    node.addChild(c1);
+                    node.addChild(c2);
+                } else {
+                    node.addChild(c2);
+                    node.addChild(c1);
+                }
+                return;
+            }
+
+            // out matched the current, binary search the next byte for a child node
+            // to continue
+            byte lookfor = value[j];
+            int lo = 0;
+            int hi = node.children.size() - 1;
+            int mid = 0;
+            boolean found = false;
+            comp = 0;
+            while (!found && lo <= hi) {
+                mid = lo + (hi - lo) / 2;
+                DictNode c = node.children.get(mid);
+                comp = BytesUtil.compareByteUnsigned(lookfor, c.part[0]);
+                if (comp < 0)
+                    hi = mid - 1;
+                else if (comp > 0)
+                    lo = mid + 1;
+                else
+                    found = true;
+            }
+            // found a child node matching the first byte, continue in that child
+            if (found) {
+                addValueR(node.children.get(mid), value, j);
+            }
+            // otherwise, make the value a new child
+            else {
+                DictNode c = new DictNode(BytesUtil.subarray(value, j, nn), true);
+                c.id = createNextId();
+                node.addChild(comp <= 0 ? mid : mid + 1, c);
+            }
+        }
+
+        public AppendTrieDictionary<T> build(int baseId) throws IOException {
+            ByteArrayOutputStream buf = new ByteArrayOutputStream();
+            DataOutputStream out = new DataOutputStream(buf);
+            ((Writable)dictSliceMap).write(out);
+            byte[] dictMapBytes = buf.toByteArray();
+            buf.close();
+            out.close();
+
+            if (dict == null) {
+                dict = new AppendTrieDictionary<T>();
+            }
+            dict.update(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter, dictMapBytes);
+            dict.flushIndex();
+
+            return dict;
+        }
+    }
+
+
+    @Override
+    protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+        if (dictSliceMap.isEmpty()) {
+            return -1;
+        }
+        byte[] tempVal = new byte[len];
+        System.arraycopy(value, offset, tempVal, 0, len);
+        DictSliceKey sliceKey = dictSliceMap.floorKey(DictSliceKey.wrap(tempVal));
+        if (sliceKey == null) {
+            sliceKey = dictSliceMap.firstKey();
+        }
+        DictSlice slice = dictSliceMap.get(sliceKey);
+        int id = slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag);
+        if (id < 0)
+            logger.error("Not a valid value: " + bytesConverter.convertFromBytes(value, offset, len));
+        return id;
+    }
+
+    @Override
+    public int getMinId() {
+        return baseId;
+    }
+
+    @Override
+    public int getMaxId() {
+        return maxId;
+    }
+
+    @Override
+    public int getSizeOfId() {
+        return 4;
+    }
+
+    @Override
+    public int getSizeOfValue() {
+        return maxValueLength;
+    }
+
+    @Override
+    final protected int getIdFromValueImpl(T value, int roundingFlag) {
+        if (enableValueCache && roundingFlag == 0) {
+            HashMap cache = valueToIdCache.get(); // SoftReference to skip cache gracefully when short of memory
+            if (cache != null) {
+                Integer id = null;
+                id = (Integer) cache.get(value);
+                if (id != null)
+                    return id.intValue();
+
+                byte[] valueBytes = bytesConverter.convertToBytes(value);
+                id = getIdFromValueBytes(valueBytes, 0, valueBytes.length, roundingFlag);
+
+                cache.put(value, id);
+                return id;
+            }
+        }
+        byte[] valueBytes = bytesConverter.convertToBytes(value);
+        return getIdFromValueBytes(valueBytes, 0, valueBytes.length, roundingFlag);
+    }
+
+    @Override
+    final protected T getValueFromIdImpl(int id) {
+        throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id");
+    }
+
+    @Override
+    protected byte[] getValueBytesFromIdImpl(int id) {
+        throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id");
+    }
+
+    @Override
+    protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset) {
+        throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id");
+    }
+
+    public void flushIndex() throws IOException {
+        Path filePath = new Path(baseDir+"/.index");
+        Configuration conf = new Configuration();
+        try (FSDataOutputStream indexOut = (FileSystem.get(filePath.toUri(), conf)).create(filePath, true, 8*1024*1024, (short)2, 8*1024*1024*8)) {
+            indexOut.writeInt(baseId);
+            indexOut.writeInt(maxId);
+            indexOut.writeInt(maxValueLength);
+            indexOut.writeInt(nValues);
+            indexOut.writeUTF(bytesConverter.getClass().getName());
+            ((Writable)dictSliceMap).write(indexOut);
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeUTF(baseDir);
+        flushIndex();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        String baseDir = in.readUTF();
+        Path filePath = new Path(baseDir+"/.index");
+        Configuration conf = new Configuration();
+        try (FSDataInputStream input = (FileSystem.get(filePath.toUri(), conf)).open(filePath, 8*1024*1024)) {
+            int baseId = input.readInt();
+            int maxId = input.readInt();
+            int maxValueLength = input.readInt();
+            int nValues = input.readInt();
+            String converterName = input.readUTF();
+            BytesConverter converter = null;
+            if (converterName.isEmpty() == false) {
+                try {
+                    converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+            update(baseDir, baseId, maxId, maxValueLength, nValues, converter, input);
+        }
+    }
+
+    @Override
+    public void dump(PrintStream out) {
+        out.println("Total " + nValues + " values, " + (dictSliceMap == null ? 0 : dictSliceMap.size()) + " slice");
+    }
+
+    @Override
+    public int hashCode() {
+        int hashCode = 31;
+        for (DictSlice slice : dictSliceMap.values()) {
+            hashCode += 31 * slice.hashCode();
+        }
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean contains(Dictionary other) {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
new file mode 100644
index 0000000..81ac82f
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import com.google.common.cache.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.persistence.ComparableWritable;
+import org.apache.kylin.common.persistence.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Created by sunyerui on 16/5/2.
+ * TODO Depends on HDFS for now, ideally just depends on storage interface
+ */
+public class CachedTreeMap<K extends ComparableWritable, V extends Writable> extends TreeMap<K, V> implements Writable {
+    private static final Logger logger = LoggerFactory.getLogger(CachedTreeMap.class);
+
+    private final Class<K> keyClazz;
+    private final Class<V> valueClazz;
+    transient volatile Collection<V> values;
+    private final LoadingCache<K, V> valueCache;
+    private final TreeSet<String> fileList;
+    private final Configuration conf;
+    private final String baseDir;
+    private final boolean persistent;
+    private final boolean immutable;
+    private long writeValueTime = 0;
+    private long readValueTime = 0;
+
+    private static final int BUFFER_SIZE = 8 * 1024 * 1024;
+
+    public static class CachedTreeMapBuilder<K, V> {
+        private Class<K> keyClazz;
+        private Class<V> valueClazz;
+        private int maxCount = 8;
+        private String baseDir;
+        private boolean persistent;
+        private boolean immutable;
+
+        public static CachedTreeMapBuilder newBuilder() {
+            return new CachedTreeMapBuilder();
+        }
+
+        private CachedTreeMapBuilder() {}
+
+        public CachedTreeMapBuilder keyClazz(Class<K> clazz) {
+            this.keyClazz = clazz;
+            return this;
+        }
+
+        public CachedTreeMapBuilder valueClazz(Class<V> clazz) {
+            this.valueClazz = clazz;
+            return this;
+        }
+
+        public CachedTreeMapBuilder<K, V> maxSize(int maxCount) {
+            this.maxCount = maxCount;
+            return this;
+        }
+
+        public CachedTreeMapBuilder<K, V> baseDir(String baseDir) {
+            this.baseDir = baseDir;
+            return this;
+        }
+
+        public CachedTreeMapBuilder<K, V> persistent(boolean persistent) {
+            this.persistent = persistent;
+            return this;
+        }
+
+        public CachedTreeMapBuilder<K, V> immutable(boolean immutable) {
+            this.immutable = immutable;
+            return this;
+        }
+
+        public CachedTreeMap build() {
+            if (baseDir == null) {
+                throw new RuntimeException("CachedTreeMap need a baseDir to cache data");
+            }
+            if (keyClazz == null || valueClazz == null) {
+                throw new RuntimeException("CachedTreeMap need key and value clazz to serialize data");
+            }
+            CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, persistent, immutable);
+            return map;
+        }
+    }
+
+    private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String baseDir, boolean persistent, boolean immutable) {
+        super();
+        this.keyClazz = keyClazz;
+        this.valueClazz = valueClazz;
+        this.fileList = new TreeSet<>();
+        this.conf = new Configuration();
+        this.baseDir = baseDir;
+        this.persistent = persistent;
+        this.immutable = immutable;
+        CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new RemovalListener<K, V>() {
+            @Override
+            public void onRemoval(RemovalNotification<K, V> notification) {
+                logger.info(String.format("Evict cache key %s(%d) with value %s caused by %s, size %d/%d ",
+                        notification.getKey(), notification.getKey().hashCode(), notification.getValue(), notification.getCause(),
+                        size(), valueCache.size()));
+                switch (notification.getCause()) {
+                    case SIZE:
+                        writeValue(notification.getKey(), notification.getValue());
+                        break;
+                    case EXPLICIT:
+                        // skip delete files to recover from error during dict appending
+                        // deleteValue(notification.getKey());
+                        break;
+                    default:
+                        throw new RuntimeException("unexpected evict reason " + notification.getCause());
+                }
+            }
+        }).maximumSize(maxCount);
+        // For immutable values, use soft reference to free memory when gc, and just load again when need it
+        if (this.immutable) {
+            builder.softValues();
+        }
+        this.valueCache = builder.build(new CacheLoader<K, V>() {
+            @Override
+            public V load(K key) throws Exception {
+                V value = readValue(key);
+                logger.info(String.format("Load cache by key %s(%d) with value %s", key, key.hashCode(), value));
+                return value;
+            }
+        });
+    }
+
+    private String generateFileName(K key) {
+        String file = baseDir + "/cached_" + key.toString();
+        return file;
+    }
+
+    private void writeValue(K key, V value) {
+        if (immutable) {
+            return;
+        }
+        long t0 = System.currentTimeMillis();
+        String fileName = generateFileName(key);
+        Path filePath = new Path(fileName);
+        try (FSDataOutputStream out = (FileSystem.get(filePath.toUri(), conf)).create(filePath, true, BUFFER_SIZE, (short)2, BUFFER_SIZE*8)) {
+            value.write(out);
+            if (!persistent) {
+                FileSystem.get(filePath.toUri(), conf).deleteOnExit(filePath);
+            }
+        } catch (Exception e) {
+            logger.error(String.format("write value into %s exception: %s", fileName, e), e);
+            throw new RuntimeException(e.getCause());
+        } finally {
+            fileList.add(fileName);
+            writeValueTime += System.currentTimeMillis() - t0;
+        }
+    }
+
+    private V readValue(K key) throws Exception {
+        long t0 = System.currentTimeMillis();
+        String fileName = generateFileName(key);
+        Path filePath = new Path(fileName);
+        try (FSDataInputStream input = (FileSystem.get(filePath.toUri(), conf)).open(filePath, BUFFER_SIZE)) {
+            V value = valueClazz.newInstance();
+            value.readFields(input);
+            return value;
+        } catch (Exception e) {
+            logger.error(String.format("read value from %s exception: %s", fileName, e), e);
+            return null;
+        } finally {
+            readValueTime += System.currentTimeMillis() - t0;
+        }
+    }
+
+    private void deleteValue(K key) {
+        if (persistent && immutable) {
+            return;
+        }
+        String fileName = generateFileName(key);
+        Path filePath = new Path(fileName);
+        try {
+            FileSystem fs = FileSystem.get(filePath.toUri(), conf);
+            if (fs.exists(filePath)) {
+                fs.delete(filePath, true);
+            }
+        } catch (Exception e) {
+            logger.error(String.format("delete value file %s exception: %s", fileName, e), e);
+        } finally {
+            fileList.remove(fileName);
+        }
+    }
+
+    @Override
+    public V put(K key, V value) {
+        super.put(key, null);
+        valueCache.put(key, value);
+        return null;
+    }
+
+    @Override
+    public V get(Object key) {
+        if (super.containsKey(key)) {
+            try {
+                return valueCache.get((K)key);
+            } catch (ExecutionException e) {
+                logger.error(String.format("get value with key %s exception: ", key, e), e);
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public V remove(Object key) {
+        super.remove(key);
+        valueCache.invalidate(key);
+        return null;
+    }
+
+    @Override
+    public void clear() {
+        super.clear();
+        values = null;
+        valueCache.invalidateAll();
+    }
+
+    public Collection<V> values() {
+        Collection<V> vs = values;
+        return (vs != null) ? vs : (values = new Values());
+    }
+
+    class Values extends AbstractCollection<V> {
+        @Override
+        public Iterator<V> iterator() {
+            return new ValueIterator<>();
+        }
+
+        @Override
+        public int size() {
+            return CachedTreeMap.this.size();
+        }
+    }
+
+    class ValueIterator<V> implements Iterator<V> {
+        Iterator<K> keyIterator;
+        K currentKey;
+
+        public ValueIterator() {
+            keyIterator = CachedTreeMap.this.keySet().iterator();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return keyIterator.hasNext();
+        }
+
+        @Override
+        public V next() {
+            currentKey = keyIterator.next();
+            try {
+                return (V)valueCache.get(currentKey);
+            } catch (ExecutionException e) {
+                logger.error(String.format("get value with key %s exception: ", currentKey, e), e);
+                return null;
+            }
+        }
+
+        @Override
+        public void remove() {
+            keyIterator.remove();
+            valueCache.invalidate(currentKey);
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        assert persistent : "Only support serialize with persistent true";
+        out.writeInt(size());
+        for (K key : keySet()) {
+            key.write(out);
+            V value = valueCache.getIfPresent(key);
+            if (null != value) {
+                writeValue(key, value);
+            }
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        assert persistent : "Only support deserialize with persistent true";
+        int size = in.readInt();
+        try {
+            for (int i = 0; i < size; i++) {
+                K key = keyClazz.newInstance();
+                key.readFields(in);
+                super.put(key, null);
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    // clean up all tmp files
+    @Override
+    public void finalize() throws Throwable {
+        if (persistent) {
+            return;
+        }
+        try {
+            this.clear();
+            for (String file : fileList) {
+                try {
+                    Path filePath = new Path(file);
+                    FileSystem fs = FileSystem.get(filePath.toUri(), conf);
+                    fs.delete(filePath, true);
+                } catch (Throwable t) {}
+            }
+        } catch (Throwable t) {
+        } finally {
+          super.finalize();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 0d1fe88..95b6087 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -44,7 +44,7 @@ public class DictionaryGenerator {
 
     private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
 
-    private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
+    private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
 
     private static int getDictionaryMaxCardinality() {
         try {
@@ -70,15 +70,15 @@ public class DictionaryGenerator {
             builder = new StringDictBuilder();
         }
         
-        return buildDictionary(builder, valueEnumerator);
+        return buildDictionary(builder, null, valueEnumerator);
     }
     
-    public static Dictionary<String> buildDictionary(IDictionaryBuilder builder, IDictionaryValueEnumerator valueEnumerator) throws IOException {
+    public static Dictionary<String> buildDictionary(IDictionaryBuilder builder, DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator) throws IOException {
         int baseId = 0; // always 0 for now
         int nSamples = 5;
         ArrayList<String> samples = new ArrayList<String>(nSamples);
 
-        Dictionary<String> dict = builder.build(valueEnumerator, baseId, nSamples, samples);
+        Dictionary<String> dict = builder.build(dictInfo, valueEnumerator, baseId, nSamples, samples);
 
         // log a few samples
         StringBuilder buf = new StringBuilder();
@@ -89,20 +89,22 @@ public class DictionaryGenerator {
             buf.append(s.toString()).append("=>").append(dict.getIdFromValue(s));
         }
         logger.debug("Dictionary value samples: " + buf.toString());
-        logger.debug("Dictionary cardinality " + dict.getSize());
+        logger.debug("Dictionary cardinality: " + dict.getSize());
+        logger.debug("Dictionary builder class: " + builder.getClass().getName());
+        logger.debug("Dictionary class: " + dict.getClass().getName());
         if (dict instanceof TrieDictionary && dict.getSize() > DICT_MAX_CARDINALITY) {
             throw new IllegalArgumentException("Too high cardinality is not suitable for dictionary -- cardinality: " + dict.getSize());
         }
         return dict;
     }
 
-    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
-        return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(sourceDicts));
+    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
+        return buildDictionary(dataType, new MultipleDictionaryValueEnumerator(sourceDicts));
     }
 
     private static class DateDictBuilder implements IDictionaryBuilder {
         @Override
-        public Dictionary<String> build(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
+        public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
             final int BAD_THRESHOLD = 0;
             String matchPattern = null;
             byte[] value;
@@ -141,14 +143,14 @@ public class DictionaryGenerator {
     
     private static class TimeDictBuilder implements IDictionaryBuilder {
         @Override
-        public Dictionary<String> build(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
+        public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
             return new TimeStrDictionary(); // base ID is always 0
         }
     }
 
     private static class StringDictBuilder implements IDictionaryBuilder {
         @Override
-        public Dictionary<String> build(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
+        public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
             TrieDictionaryBuilder builder = new TrieDictionaryBuilder(new StringBytesConverter());
             byte[] value;
             while (valueEnumerator.moveNext()) {
@@ -166,7 +168,7 @@ public class DictionaryGenerator {
 
     private static class NumberDictBuilder implements IDictionaryBuilder {
         @Override
-        public Dictionary<String> build(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
+        public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
             NumberDictionaryBuilder builder = new NumberDictionaryBuilder(new StringBytesConverter());
             byte[] value;
             while (valueEnumerator.moveNext()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 43f7b26..5668bd8 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -312,7 +312,7 @@ public class DictionaryManager {
             if (builderClass == null)
                 dictionary = DictionaryGenerator.buildDictionary(DataType.getType(dictInfo.getDataType()), columnValueEnumerator);
             else
-                dictionary = DictionaryGenerator.buildDictionary((IDictionaryBuilder) ClassUtil.newInstance(builderClass), columnValueEnumerator);
+                dictionary = DictionaryGenerator.buildDictionary((IDictionaryBuilder) ClassUtil.newInstance(builderClass), dictInfo, columnValueEnumerator);
         } finally {
             if (columnValueEnumerator != null)
                 columnValueEnumerator.close();

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
new file mode 100644
index 0000000..0f4d8bb
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.MetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.NavigableSet;
+
+/**
+ * GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments.
+ * GlobalDictinary mainly used for count distinct measure to support rollup among segments.
+ * Created by sunyerui on 16/5/24.
+ */
+public class GlobalDictionaryBuilder implements IDictionaryBuilder {
+    private static final Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
+
+    @Override
+    public Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException {
+        if (dictInfo == null) {
+            throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
+        }
+        String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict/" + dictInfo.getResourceDir() + "/";
+
+        // Try to load the existing dict from cache, making sure there's only the same one object in memory
+        NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(dictInfo.getResourceDir());
+        ArrayList<String> appendDicts = new ArrayList<>();
+        if (dicts != null && !dicts.isEmpty()) {
+            for (String dict : dicts) {
+                DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
+                if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) {
+                    appendDicts.add(dict);
+                }
+            }
+        }
+
+        AppendTrieDictionary.Builder<String> builder;
+        if (appendDicts.isEmpty()) {
+            logger.info("GlobalDict {} is empty, create new one", dictInfo.getResourceDir());
+            builder = AppendTrieDictionary.Builder.create(dictDir);
+        } else if (appendDicts.size() == 1) {
+            logger.info("GlobalDict {} exist, append value", appendDicts.get(0));
+            AppendTrieDictionary dict = (AppendTrieDictionary)DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0));
+            builder = AppendTrieDictionary.Builder.create(dict);
+        } else {
+            throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", dictInfo.getResourceDir(), appendDicts.size()));
+        }
+
+        byte[] value;
+        while (valueEnumerator.moveNext()) {
+            value = valueEnumerator.current();
+            if (value == null) {
+                continue;
+            }
+            String v = Bytes.toString(value);
+            builder.addValue(v);
+            if (returnSamples.size() < nSamples && returnSamples.contains(v) == false)
+                returnSamples.add(v);
+        }
+        return builder.build(baseId);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
index 9ace239..8f95a2a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
@@ -25,5 +25,5 @@ import org.apache.kylin.common.util.Dictionary;
 
 public interface IDictionaryBuilder {
 
-    Dictionary<String> build(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException;
+    Dictionary<String> build(DictionaryInfo dictInfo, IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList<String> returnSamples) throws IOException;
 }