You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/02 13:19:10 UTC

[17/27] kylin git commit: KYLIN-2192 More Robust Global Dictionary

KYLIN-2192 More Robust Global Dictionary


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4a0ee798
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4a0ee798
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4a0ee798

Branch: refs/heads/KYLIN-1875
Commit: 4a0ee7989d5f8272592b980fce3f5716ca40d4c1
Parents: e562aaf
Author: sunyerui <su...@gmail.com>
Authored: Mon Nov 21 21:26:34 2016 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Fri Dec 2 13:33:59 2016 +0800

----------------------------------------------------------------------
 .../kylin/job/dataGen/FactTableGenerator.java   |  12 +-
 .../apache/kylin/common/KylinConfigBase.java    |   8 +
 .../apache/kylin/common/util/Dictionary.java    |   2 +-
 .../model/validation/rule/DictionaryRule.java   |  78 +-
 .../validation/rule/DictionaryRuleTest.java     |  28 +-
 .../apache/kylin/dict/AppendTrieDictionary.java | 285 +++++--
 .../kylin/dict/AppendTrieDictionaryChecker.java | 102 +++
 .../org/apache/kylin/dict/CachedTreeMap.java    | 260 +++---
 .../kylin/dict/GlobalDictionaryBuilder.java     |  36 +-
 .../kylin/dict/AppendTrieDictionaryTest.java    | 150 +++-
 .../apache/kylin/dict/CachedTreeMapTest.java    | 320 +++++---
 .../kylin/measure/bitmap/BitmapCounterTest.java |   6 +-
 ...t_kylin_cube_without_slr_left_join_desc.json |  16 +-
 .../localmeta/data/DEFAULT.TEST_KYLIN_FACT.csv  | 804 +++++++++----------
 .../flatten_data_for_without_slr_left_join.csv  | 804 +++++++++----------
 .../test_kylin_inner_join_model_desc.json       |   3 +-
 .../test_kylin_inner_join_view_model_desc.json  |   3 +-
 .../test_kylin_left_join_model_desc.json        |   3 +-
 .../test_kylin_left_join_view_model_desc.json   |   3 +-
 .../table/DEFAULT.TEST_KYLIN_FACT.json          |   8 +-
 .../source/hive/ITHiveTableReaderTest.java      |   2 +-
 .../query/sql_distinct_precisely/query00.sql    |   2 +-
 .../query/sql_distinct_precisely/query01.sql    |   2 +-
 .../query/sql_distinct_precisely/query02.sql    |   2 +-
 .../query/sql_distinct_precisely/query03.sql    |   3 +-
 .../query/sql_distinct_precisely/query04.sql    |   3 +-
 .../query/sql_distinct_precisely/query05.sql    |  25 -
 .../query/sql_distinct_precisely/query06.sql    |  26 -
 .../query/sql_distinct_precisely/query07.sql    |  24 -
 29 files changed, 1737 insertions(+), 1283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 8068fd1..677b713 100644
--- a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -403,13 +403,13 @@ public class FactTableGenerator {
     }
 
     private String createRandomCell(ColumnDesc cDesc) {
-        String type = cDesc.getTypeName();
-        String s = type.toLowerCase();
-        if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+        DataType type =cDesc.getType();
+        String s = type.getName();
+        if (s.equals("char") || s.equals("varchar")) {
             StringBuilder sb = new StringBuilder();
-            for (int i = 0; i < 2; i++) {
-                sb.append((char) ('a' + r.nextInt(10)));// there are 10*10
-                // possible strings
+            int len = Math.min(type.getPrecision(), 3);
+            for (int i = 0; i < len; i++) {
+                sb.append((char) ('a' + r.nextInt(10)));  // cardinality at most 10x10x10
             }
             return sb.toString();
         } else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3c10826..f46c185 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -232,6 +232,14 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.dictionary.append-entry-size", "10000000"));
     }
 
+    public int getAppendDictMaxVersions() {
+        return Integer.parseInt(getOptional("kylin.dictionary.append-max-versions", "3"));
+    }
+
+    public int getAppendDictVersionTTL() {
+        return Integer.parseInt(getOptional("kylin.dictionary.append-version-ttl", "259200000"));
+    }
+
     // for test
     public void setAppendDictEntrySize(int entrySize) {
         setProperty("kylin.dictionary.append-entry-size", String.valueOf(entrySize));

http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
index 0fb299c..1e172bc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
@@ -158,7 +158,7 @@ abstract public class Dictionary<T> implements Serializable {
             return nullId();
         else {
             int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag);
-            if (id < 0)
+            if (id == -1)
                 throw new IllegalArgumentException("Value '" + Bytes.toString(value, offset, len) + "' (" + Bytes.toStringBinary(value, offset, len) + ") not exists!");
             return id;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
index d06c816..37889c2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
@@ -18,9 +18,12 @@
 
 package org.apache.kylin.cube.model.validation.rule;
 
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DictionaryDesc;
 import org.apache.kylin.cube.model.validation.IValidatorRule;
@@ -29,9 +32,19 @@ import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.model.TblColRef;
 
 /**
- * Created by sunyerui on 16/6/1.
+ * Validate Dictionary Settings:
+ *
+ * <ul>
+ *     <li> no duplicated dictionary for one column
+ *     <li> dictionary can't set both `reuse` and `builder`
+ *     <li> transitive `reuse` like "a <- b <- c" is not allowed, force "a <- b, a <- c"
+ * </ul>
  */
 public class DictionaryRule implements IValidatorRule<CubeDesc> {
+    static final String ERROR_DUPLICATE_DICTIONARY_COLUMN = "Duplicated dictionary specification for column: ";
+    static final String ERROR_REUSE_BUILDER_BOTH_SET = "REUSE and BUILDER both set on dictionary for column: ";
+    static final String ERROR_REUSE_BUILDER_BOTH_EMPTY = "REUSE and BUILDER both empty on dictionary for column: ";
+    static final String ERROR_TRANSITIVE_REUSE = "Transitive REUSE is not allowed for dictionary: ";
 
     @Override
     public void validate(CubeDesc cubeDesc, ValidateContext context) {
@@ -40,40 +53,43 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> {
             return;
         }
 
-        HashMap<TblColRef, String> colToBuilderMap = new HashMap<>();
-        HashMap<TblColRef, TblColRef> colToReuseColMap = new HashMap<>();
+        Set<TblColRef> allDictCols = new HashSet<>();
+        Set<TblColRef> baseCols = new HashSet<>(); // col with builder
+        List<DictionaryDesc> reuseDictionaries = new ArrayList<>();
+
+        // first pass
         for (DictionaryDesc dictDesc : dictDescs) {
             TblColRef dictCol = dictDesc.getColumnRef();
-            if (dictCol == null) {
-                context.addResult(ResultLevel.ERROR, "Some column in dictionaries not found");
+            TblColRef reuseCol = dictDesc.getResuseColumnRef();
+            String builderClass = dictDesc.getBuilderClass();
+
+            if (!allDictCols.add(dictCol)) {
+                context.addResult(ResultLevel.ERROR, ERROR_DUPLICATE_DICTIONARY_COLUMN + dictCol);
                 return;
             }
-            String builder = dictDesc.getBuilderClass();
-            TblColRef reuseCol = dictDesc.getResuseColumnRef();
-            if (reuseCol == null) {
-                if (builder == null || builder.isEmpty()) {
-                    context.addResult(ResultLevel.ERROR, "Column " + dictCol + " cannot have builder and reuse column both empty");
-                    return;
-                }
-                
-                // Make sure the same column associate with same builder class
-                String oldBuilder = colToBuilderMap.put(dictCol, builder);
-                if (oldBuilder != null && !oldBuilder.equals(builder)) {
-                    context.addResult(ResultLevel.ERROR, "Column " + dictCol + " has inconsistent builders " + builder + " and " + oldBuilder);
-                    return;
-                }
+
+            if (reuseCol != null && StringUtils.isNotEmpty(builderClass)) {
+                context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_SET + dictCol);
+                return;
+            }
+
+            if (reuseCol == null && StringUtils.isEmpty(builderClass)) {
+                context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_EMPTY + dictCol);
+                return;
+            }
+
+            if (reuseCol != null) {
+                reuseDictionaries.add(dictDesc);
             } else {
-                if (builder != null && !builder.isEmpty()) {
-                    context.addResult(ResultLevel.ERROR, "Column " + dictCol + " cannot have builder and reuse column both");
-                    return;
-                }
-                
-                // Make sure one column only reuse another one column
-                TblColRef oldReuseCol = colToReuseColMap.put(dictCol, reuseCol);
-                if (oldReuseCol != null && !reuseCol.equals(oldReuseCol)) {
-                    context.addResult(ResultLevel.ERROR, "Column " + dictCol + " reuse inconsistent column " + reuseCol + " and " + oldReuseCol);
-                    return;
-                }
+                baseCols.add(dictCol);
+            }
+        }
+
+        // second pass: check no transitive reuse
+        for (DictionaryDesc dictDesc : reuseDictionaries) {
+            if (!baseCols.contains(dictDesc.getResuseColumnRef())) {
+                context.addResult(ResultLevel.ERROR, ERROR_TRANSITIVE_REUSE + dictDesc.getColumnRef());
+                return;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
index 9b37507..b6e0bcb 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
@@ -18,6 +18,10 @@
 
 package org.apache.kylin.cube.model.validation.rule;
 
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_DUPLICATE_DICTIONARY_COLUMN;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_EMPTY;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_SET;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_TRANSITIVE_REUSE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -36,9 +40,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-/**
- * Created by sunyerui on 16/6/1.
- */
 public class DictionaryRuleTest extends LocalFileMetadataTestCase {
     private static KylinConfig config;
 
@@ -65,24 +66,31 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
             desc.init(config);
             ValidateContext vContext = new ValidateContext();
             rule.validate(desc, vContext);
-            vContext.print(System.out);
             assertTrue(vContext.getResults().length == 0);
         }
     }
 
     @Test
     public void testBadDesc() throws IOException {
-        testDictionaryDesc("Column EDW.TEST_SITES.SITE_NAME has inconsistent builders " + "FakeBuilderClass and org.apache.kylin.dict.GlobalDictionaryBuilder", DictionaryDesc.create("SITE_NAME", null, "FakeBuilderClass"));
+        testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("USER_ID", null, "FakeBuilderClass"));
+        testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("USER_ID", null, GlobalDictionaryBuilder.class.getName()));
     }
 
     @Test
     public void testBadDesc2() throws IOException {
-        testDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.LSTG_SITE_ID cannot have builder and reuse column both", DictionaryDesc.create("lstg_site_id", "SITE_NAME", "FakeBuilderClass"));
+        testDictionaryDesc(ERROR_REUSE_BUILDER_BOTH_SET, DictionaryDesc.create("lstg_site_id", "SITE_NAME", "FakeBuilderClass"));
     }
 
     @Test
     public void testBadDesc3() throws IOException {
-        testDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.LSTG_SITE_ID cannot have builder and reuse column both empty", DictionaryDesc.create("lstg_site_id", null, null));
+        testDictionaryDesc(ERROR_REUSE_BUILDER_BOTH_EMPTY, DictionaryDesc.create("lstg_site_id", null, null));
+    }
+
+    @Test
+    public void testBadDesc4() throws IOException {
+        testDictionaryDesc(ERROR_TRANSITIVE_REUSE,
+                DictionaryDesc.create("lstg_site_id", "USER_ID", null),
+                DictionaryDesc.create("price", "lstg_site_id", null));
     }
     
     @Test
@@ -102,13 +110,13 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
         desc.init(config);
         ValidateContext vContext = new ValidateContext();
         rule.validate(desc, vContext);
-        vContext.print(System.out);
 
         if (expectMessage == null) {
             assertTrue(vContext.getResults().length == 0);
         } else {
-            assertTrue(vContext.getResults().length >= 1);
-            assertEquals(expectMessage, vContext.getResults()[0].getMessage());
+            assertTrue(vContext.getResults().length == 1);
+            String actualMessage = vContext.getResults()[0].getMessage();
+            assertTrue(actualMessage.startsWith(expectMessage));
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
index 14980bf..84060a7 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
@@ -31,10 +31,13 @@ import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.NavigableSet;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -49,6 +52,8 @@ import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.MetadataManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,16 +62,16 @@ import org.slf4j.LoggerFactory;
  * int IDs, used for global dictionary.
  *
  * Trie data is split into sub trees, called {@link DictSlice}, and stored in a {@link CachedTreeMap} with a configurable cache size.
- * 
+ *
  * With Trie the memory footprint of the mapping is kinda minimized at the cost
  * CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is
  * roughly 10 times slower, so there's a cache layer overlays on top of Trie and
  * gracefully fall back to Trie using a weak reference.
- * 
+ *
  * The implementation is NOT thread-safe for now.
  *
  * TODO making it thread-safe
- * 
+ *
  * @author sunyerui
  */
 @SuppressWarnings({ "rawtypes", "unchecked", "serial" })
@@ -87,7 +92,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
     transient private int nValues;
     transient private BytesConverter<T> bytesConverter;
 
-    private TreeMap<DictSliceKey, DictSlice> dictSliceMap;
+    volatile private TreeMap<DictSliceKey, DictSlice> dictSliceMap;
 
     transient private boolean enableValueCache = true;
     transient private SoftReference<HashMap> valueToIdCache;
@@ -99,17 +104,23 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
         }
     }
 
-    public void update(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, CachedTreeMap dictMap) throws IOException {
+    public void initParams(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter) throws IOException {
         this.baseDir = baseDir;
         this.baseId = baseId;
         this.maxId = maxId;
         this.maxValueLength = maxValueLength;
         this.nValues = nValues;
         this.bytesConverter = bytesConverter;
+    }
 
+    public void initDictSliceMap(CachedTreeMap dictMap) throws IOException {
         int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
-        dictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
-        ((CachedTreeMap)dictSliceMap).loadEntry(dictMap);
+        int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
+        long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
+        CachedTreeMap newDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir)
+            .immutable(true).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
+        newDictSliceMap.loadEntry(dictMap);
+        this.dictSliceMap = newDictSliceMap;
     }
 
     public byte[] writeDictMap() throws IOException {
@@ -123,6 +134,13 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
         return dictMapBytes;
     }
 
+    // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
+    public static void checkValidId(int id) {
+        if (id == 0 || id == -1) {
+            throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
+        }
+    }
+
     public static class DictSliceKey implements WritableComparable {
         byte[] key;
 
@@ -181,7 +199,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
 
         transient private int nValues;
         transient private int sizeOfId;
-        transient private int childOffsetMask;
+        // mask MUST be long, since childOffset maybe 5 bytes at most
+        transient private long childOffsetMask;
         transient private int firstByteOffset;
 
         private void init(byte[] trieBytes) {
@@ -197,7 +216,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
                 this.sizeChildOffset = headIn.read();
                 this.sizeOfId = headIn.read();
 
-                this.childOffsetMask = ~((BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE) << ((sizeChildOffset - 1) * 8));
+                this.childOffsetMask = ~(((long)(BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8));
                 this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte
             } catch (Exception e) {
                 if (e instanceof RuntimeException)
@@ -216,7 +235,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
                 if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) {
                     break;
                 }
-                nodeOffset = headSize + (BytesUtil.readUnsigned(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
+                nodeOffset = headSize + (int)(BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
                 if (nodeOffset == headSize) {
                     break;
                 }
@@ -258,7 +277,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
                 }
 
                 // find a child to continue
-                int c = headSize + (BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask);
+                int c = headSize + (int)(BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
                 if (c == headSize) // has no children
                     return -1;
                 byte inpByte = inp[o];
@@ -297,7 +316,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
             DictNode root = null;
             while (true) {
                 int p = n + firstByteOffset;
-                int childOffset = BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask;
+                int childOffset = (int)(BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
                 int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
                 boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
 
@@ -329,6 +348,53 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
             return root;
         }
 
+        public boolean doCheck() {
+            int offset = headSize;
+            HashSet<Integer> parentSet = new HashSet<>();
+            boolean lastChild = false;
+
+            while (offset < trieBytes.length) {
+                if (lastChild) {
+                    boolean contained = parentSet.remove(offset - headSize);
+                    // Can't find parent, the data is corrupted
+                    if (!contained) {
+                        return false;
+                    }
+                    lastChild = false;
+                }
+                int p = offset + firstByteOffset;
+                int childOffset = (int)(BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask);
+                int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+                boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE);
+
+                // Copy value overflow, the data is corrupted
+                if (trieBytes.length < p + parLen) {
+                   return false;
+                }
+
+                // Check id is fine
+                if (isEndOfValue) {
+                    BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+                }
+
+                // Record it if has children
+                if (childOffset != 0) {
+                    parentSet.add(childOffset);
+                }
+
+                // brothers done, move to next parent
+                if (checkFlag(offset, BIT_IS_LAST_CHILD)) {
+                    lastChild = true;
+                }
+
+                // move to next node
+                offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+            }
+
+            // ParentMap is empty, meaning all nodes has parent, the data is correct
+            return parentSet.isEmpty();
+        }
+
         public void write(DataOutput out) throws IOException {
             out.write(trieBytes);
         }
@@ -341,7 +407,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
                 throw new IllegalArgumentException("Wrong file type (magic does not match)");
 
             DataInputStream headIn = new DataInputStream(//
-                    new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
+                new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
             int headSize = headIn.readShort();
             int bodyLen = headIn.readInt();
             headIn.close();
@@ -398,6 +464,9 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
             this.id = o.id;
             this.isEndOfValue = o.isEndOfValue;
             this.children = o.children;
+            for (DictNode child : o.children) {
+                child.parent = this;
+            }
             this.nValuesBeneath = o.nValuesBeneath;
             this.parent = o.parent;
             this.childrenCount = o.childrenCount;
@@ -602,7 +671,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
 
             // nValueBytes
             if (n.part.length > 255)
-                throw new RuntimeException();
+                throw new RuntimeException("Value length is " + n.part.length
+                    + " and larger than 255: " + Bytes.toStringBinary(n.part));
             BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
             o++;
 
@@ -611,7 +681,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
             o += n.part.length;
 
             if (n.isEndOfValue) {
-                assert n.id > 0;
+                checkValidId(n.id);
                 BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId);
                 o += sizeId;
             }
@@ -715,12 +785,13 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
             s.mbpn_sizeId = 4;
             s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId;
             s.mbpn_sizeNoValueBytes = 1;
-            s.mbpn_sizeChildOffset = 4;
+            s.mbpn_sizeChildOffset = 5;
             s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset);
             while (true) { // minimize the offset size to match the footprint
                 int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1);
                 // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
-                if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) {
+                // expand t to long before *4, avoiding exceed Integer.MAX_VALUE
+                if (BytesUtil.sizeForValue((long)t * 4) <= s.mbpn_sizeChildOffset - 1) {
                     s.mbpn_sizeChildOffset--;
                     s.mbpn_footprint = t;
                 } else
@@ -760,31 +831,97 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
     }
 
     public static class Builder<T> {
-        private String baseDir;
+        private static ConcurrentHashMap<String, Pair<Integer, Builder>> builderInstanceAndCountMap = new ConcurrentHashMap();
+
+        public static Builder getInstance(String resourcePath) throws IOException {
+            return getInstance(resourcePath, null);
+        }
+
+        public synchronized static Builder getInstance(String resourcePath, AppendTrieDictionary dict) throws IOException {
+            Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath);
+            if (entry == null) {
+                entry = new Pair<>(0, createNewBuilder(resourcePath, dict));
+                builderInstanceAndCountMap.put(resourcePath, entry);
+            }
+            entry.setFirst(entry.getFirst() + 1);
+            return entry.getSecond();
+        }
+
+        // return true if entry still in map
+        private synchronized static boolean releaseInstance(String resourcePath) {
+            Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath);
+            if (entry != null) {
+                entry.setFirst(entry.getFirst() - 1);
+                if (entry.getFirst() <= 0) {
+                    builderInstanceAndCountMap.remove(resourcePath);
+                    return false;
+                }
+                return true;
+            }
+            return false;
+        }
+
+        public static Builder createNewBuilder(String resourcePath, AppendTrieDictionary existDict) throws IOException {
+            String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourcePath + "/";
+
+            AppendTrieDictionary dictToUse = existDict;
+            if (dictToUse == null) {
+                // Try to load the existing dict from cache, making sure there's only the same one object in memory
+                NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(resourcePath);
+                ArrayList<String> appendDicts = new ArrayList<>();
+                if (dicts != null && !dicts.isEmpty()) {
+                    for (String dict : dicts) {
+                        DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
+                        if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) {
+                            appendDicts.add(dict);
+                        }
+                    }
+                }
+                if (appendDicts.isEmpty()) {
+                    dictToUse = null;
+                } else if (appendDicts.size() == 1) {
+                    dictToUse = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0));
+                } else {
+                    throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", resourcePath, appendDicts.size()));
+                }
+            }
+
+            AppendTrieDictionary.Builder<String> builder;
+            if (dictToUse == null) {
+                logger.info("GlobalDict {} is empty, create new one", resourcePath);
+                builder = new Builder<>(resourcePath, null, dictDir, 0, 0, 0, new StringBytesConverter(), null);
+            } else {
+                logger.info("GlobalDict {} exist, append value", dictToUse);
+                builder = new Builder<>(resourcePath, dictToUse, dictToUse.baseDir, dictToUse.maxId, dictToUse.maxValueLength,
+                    dictToUse.nValues, dictToUse.bytesConverter, dictToUse.writeDictMap());
+            }
+
+            return builder;
+        }
+
+        private final String resourcePath;
+        private final String baseDir;
         private int maxId;
         private int maxValueLength;
         private int nValues;
-        private BytesConverter<T> bytesConverter;
+        private final BytesConverter<T> bytesConverter;
 
-        private AppendTrieDictionary dict;
+        private final AppendTrieDictionary dict;
 
-        private TreeMap<DictSliceKey, DictNode> mutableDictSliceMap;
-        private static int MAX_ENTRY_IN_SLICE = 10_000_000;
+        private final TreeMap<DictSliceKey, DictNode> mutableDictSliceMap;
+        private int MAX_ENTRY_IN_SLICE = 10_000_000;
         private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0;
 
         private int processedCount = 0;
 
-        public static Builder create(String baseDir) throws IOException {
-            return new Builder<>(null, baseDir, 0, 0, 0, new StringBytesConverter(), null);
-        }
-
-        public static Builder create(AppendTrieDictionary dict) throws IOException {
-            return new Builder<>(dict, dict.baseDir, dict.maxId, dict.maxValueLength, dict.nValues, dict.bytesConverter, dict.writeDictMap());
-        }
-
         // Constructor for a new Dict
-        private Builder(AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException {
-            this.dict = dict;
+        private Builder(String resourcePath, AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException {
+            this.resourcePath = resourcePath;
+            if (dict == null) {
+                this.dict = new AppendTrieDictionary<T>();
+            } else {
+                this.dict = dict;
+            }
             this.baseDir = baseDir;
             this.maxId = maxId;
             this.maxValueLength = maxValueLength;
@@ -793,8 +930,11 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
 
             MAX_ENTRY_IN_SLICE = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
             int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
+            int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
+            long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
             // create a new cached map with baseDir
-            mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).persistent(true).immutable(false).build();
+            mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir)
+                .maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).immutable(false).build();
             if (dictMapBytes != null) {
                 ((Writable) mutableDictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes)));
             }
@@ -804,7 +944,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
             addValue(bytesConverter.convertToBytes(value));
         }
 
-        public void addValue(byte[] value) {
+        private synchronized void addValue(byte[] value) {
             if (++processedCount % 1_000_000 == 0) {
                 logger.debug("add value count " + processedCount);
             }
@@ -859,15 +999,41 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
 
         private int createNextId() {
             int id = ++maxId;
-            if (maxId < 0) {
-                throw new IllegalArgumentException("AppendTrieDictionary Id overflow Integer.MAX_VALUE");
-            }
+            checkValidId(id);
             nValues++;
             return id;
         }
 
+        // Only used for test
+        public void setMaxId(int id) {
+            this.maxId = id;
+        }
+
+        // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree
+        private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) {
+            DictNode head = null;
+            DictNode current = null;
+            for (; start + 255 < end; start += 255) {
+                DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false);
+                if (head == null) {
+                    head = c;
+                    current = c;
+                } else {
+                    current.addChild(c);
+                    current = c;
+                }
+            }
+            DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true);
+            last.id = createNextId();
+            if (head == null) {
+                head = last;
+            } else {
+                current.addChild(last);
+            }
+            return head;
+        }
+
         private void addValueR(DictNode node, byte[] value, int start) {
-            assert value.length - start <= 255 : "value bytes overflow than 255";
             // match the value part of current node
             int i = 0, j = start;
             int n = node.part.length, nn = value.length;
@@ -903,8 +1069,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
             if (i < n) {
                 DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
                 c1.id = node.id;
-                DictNode c2 = new DictNode(BytesUtil.subarray(value, j, nn), true);
-                c2.id = createNextId();
+                DictNode c2 = addNodeMaybeOverflow(value, j, nn);
                 node.reset(BytesUtil.subarray(node.part, 0, i), false);
                 if (comp < 0) {
                     node.addChild(c1);
@@ -940,18 +1105,17 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
                 addValueR(node.children.get(mid), value, j);
             } else {
                 // otherwise, make the value a new child
-                DictNode c = new DictNode(BytesUtil.subarray(value, j, nn), true);
-                c.id = createNextId();
+                DictNode c = addNodeMaybeOverflow(value, j, nn);
                 node.addChild(comp <= 0 ? mid : mid + 1, c);
             }
         }
 
-        public AppendTrieDictionary<T> build(int baseId) throws IOException {
-            if (dict == null) {
-                dict = new AppendTrieDictionary<T>();
-            }
-            dict.update(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)mutableDictSliceMap);
-            dict.flushIndex((CachedTreeMap) mutableDictSliceMap);
+        public synchronized AppendTrieDictionary<T> build(int baseId) throws IOException {
+            boolean keepAppend = releaseInstance(resourcePath);
+            CachedTreeMap dictSliceMap = (CachedTreeMap)mutableDictSliceMap;
+            dict.initParams(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter);
+            dict.flushIndex(dictSliceMap, keepAppend);
+            dict.initDictSliceMap(dictSliceMap);
 
             return dict;
         }
@@ -970,8 +1134,6 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
         }
         DictSlice slice = dictSliceMap.get(sliceKey);
         int id = slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag);
-        if (id < 0)
-            logger.error("Not a valid value: " + bytesConverter.convertFromBytes(value, offset, len));
         return id;
     }
 
@@ -1031,25 +1193,24 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
         throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id");
     }
 
-    public void flushIndex(CachedTreeMap dictSliceMap) throws IOException {
-        Path filePath = new Path(dictSliceMap.getCurrentDir() + "/.index");
-        Configuration conf = new Configuration();
-        try (FSDataOutputStream indexOut = (FileSystem.get(filePath.toUri(), conf)).create(filePath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8)) {
+    public void flushIndex(CachedTreeMap dictSliceMap, boolean keepAppend) throws IOException {
+        try (FSDataOutputStream indexOut = dictSliceMap.openIndexOutput()) {
             indexOut.writeInt(baseId);
             indexOut.writeInt(maxId);
             indexOut.writeInt(maxValueLength);
             indexOut.writeInt(nValues);
             indexOut.writeUTF(bytesConverter.getClass().getName());
             dictSliceMap.write(indexOut);
+            dictSliceMap.commit(keepAppend);
         }
-        dictSliceMap.commit(false);
     }
 
     @Override
     public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
         Configuration conf = new Configuration();
         AppendTrieDictionary newDict = new AppendTrieDictionary();
-        newDict.update(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)dictSliceMap);
+        newDict.initParams(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter);
+        newDict.initDictSliceMap((CachedTreeMap)dictSliceMap);
         logger.info("Copy AppendDict from {} to {}", this.baseDir, newDict.baseDir);
         Path srcPath = new Path(this.baseDir);
         Path dstPath = new Path(newDict.baseDir);
@@ -1071,9 +1232,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
     @Override
     public void readFields(DataInput in) throws IOException {
         String baseDir = in.readUTF();
-        Path filePath = new Path(baseDir + "/.index");
         Configuration conf = new Configuration();
-        try (FSDataInputStream input = (FileSystem.get(filePath.toUri(), conf)).open(filePath, 8 * 1024 * 1024)) {
+        try (FSDataInputStream input = CachedTreeMap.openLatestIndexInput(conf, baseDir)) {
             int baseId = input.readInt();
             int maxId = input.readInt();
             int maxValueLength = input.readInt();
@@ -1087,10 +1247,13 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
                     throw new IOException(e);
                 }
             }
+            initParams(baseDir, baseId, maxId, maxValueLength, nValues, converter);
+
+            // Create instance for deserialize data, and update to map in dict
             CachedTreeMap dictMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder()
-                    .baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
+                .baseDir(baseDir).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
             dictMap.readFields(input);
-            update(baseDir, baseId, maxId, maxValueLength, nValues, converter, dictMap);
+            initDictSliceMap(dictMap);
         }
     }
 
@@ -1120,4 +1283,6 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
     public boolean contains(Dictionary other) {
         return false;
     }
+
 }
+

http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
new file mode 100644
index 0000000..f231275
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by sunyerui on 16/11/15.
+ */
+public class AppendTrieDictionaryChecker {
+
+    public boolean runChecker(String baseDir) throws IOException {
+        Configuration conf = new Configuration();
+        Path basePath = new Path(baseDir);
+        FileSystem fs = FileSystem.get(basePath.toUri(), conf);
+        List<Path> sliceList = new ArrayList<>();
+        List<Path> corruptedSliceList = new ArrayList<>();
+        listDictSlicePath(fs, fs.getFileStatus(basePath), sliceList);
+
+        for (Path path : sliceList) {
+            if (!doCheck(fs, path)) {
+                System.out.println("AppendDict Slice " + path + " corrupted");
+                corruptedSliceList.add(path);
+            } else {
+                System.out.println("AppendDict Slice " + path + " is right");
+            }
+        }
+
+        if (corruptedSliceList.isEmpty()) {
+            System.out.println("ALL AppendDict Slices is right");
+            return true;
+        } else {
+            System.out.println("Some AppendDict Slice(s) corrupted: ");
+            for (Path path : corruptedSliceList) {
+                System.out.println(path.toString());
+            }
+            return false;
+        }
+    }
+
+    public void listDictSlicePath(FileSystem fs, FileStatus path, List<Path> list) throws IOException {
+        if (path.isDirectory()) {
+            for (FileStatus status : fs.listStatus(path.getPath())) {
+                listDictSlicePath(fs, status, list);
+            }
+        } else {
+            if (path.getPath().getName().startsWith(CachedTreeMap.CACHED_PREFIX)) {
+                list.add(path.getPath());
+            }
+        }
+    }
+
+    public boolean doCheck(FileSystem fs, Path filePath) {
+        try (FSDataInputStream input = fs.open(filePath, CachedTreeMap.BUFFER_SIZE)) {
+            AppendTrieDictionary.DictSlice slice = new AppendTrieDictionary.DictSlice();
+            slice.readFields(input);
+            return slice.doCheck();
+        } catch (Exception e) {
+            return false;
+        } catch (Error e) {
+            return false;
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        String path = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/";
+        if (args.length > 0) {
+            path = args[0];
+        }
+        System.out.println("Recursive Check AppendTrieDictionary Slices in path " + path);
+        AppendTrieDictionaryChecker checker = new AppendTrieDictionaryChecker();
+        if (checker.runChecker(path)) {
+            System.exit(0);
+        } else {
+            System.exit(-1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
index 1ea3c1c..6acf764 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
@@ -31,9 +31,11 @@ import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.slf4j.Logger;
@@ -56,25 +58,29 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
     private final Class<V> valueClazz;
     transient volatile Collection<V> values;
     private final LoadingCache<K, V> valueCache;
-    private final TreeSet<String> fileList;
     private final Configuration conf;
-    private final String baseDir;
-    private final String tmpDir;
+    private final Path baseDir;
+    private final Path versionDir;
+    private final Path workingDir;
     private final FileSystem fs;
-    private final boolean persistent;
     private final boolean immutable;
-    private long writeValueTime = 0;
-    private long readValueTime = 0;
+    private final int maxVersions;
+    private final long versionTTL;
+    private boolean keepAppend;
 
-    private static final int BUFFER_SIZE = 8 * 1024 * 1024;
+    public static final int BUFFER_SIZE = 8 * 1024 * 1024;
+
+    public static final String CACHED_PREFIX = "cached_";
+    public static final String VERSION_PREFIX = "version_";
 
     public static class CachedTreeMapBuilder<K, V> {
         private Class<K> keyClazz;
         private Class<V> valueClazz;
         private int maxCount = 8;
         private String baseDir;
-        private boolean persistent;
         private boolean immutable;
+        private int maxVersions;
+        private long versionTTL;
 
         public static CachedTreeMapBuilder newBuilder() {
             return new CachedTreeMapBuilder();
@@ -103,13 +109,18 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
             return this;
         }
 
-        public CachedTreeMapBuilder<K, V> persistent(boolean persistent) {
-            this.persistent = persistent;
+        public CachedTreeMapBuilder<K, V> immutable(boolean immutable) {
+            this.immutable = immutable;
             return this;
         }
 
-        public CachedTreeMapBuilder<K, V> immutable(boolean immutable) {
-            this.immutable = immutable;
+        public CachedTreeMapBuilder<K, V> maxVersions(int maxVersions) {
+            this.maxVersions = maxVersions;
+            return this;
+        }
+
+        public CachedTreeMapBuilder<K, V> versionTTL(long versionTTL) {
+            this.versionTTL = versionTTL;
             return this;
         }
 
@@ -120,26 +131,38 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
             if (keyClazz == null || valueClazz == null) {
                 throw new RuntimeException("CachedTreeMap need key and value clazz to serialize data");
             }
-            CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, persistent, immutable);
+            CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, immutable, maxVersions, versionTTL);
             return map;
         }
     }
 
-    private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String baseDir, boolean persistent, boolean immutable) throws IOException {
+    private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String basePath,
+                          boolean immutable, int maxVersions, long versionTTL) throws IOException {
         super();
         this.keyClazz = keyClazz;
         this.valueClazz = valueClazz;
-        this.fileList = new TreeSet<>();
+        this.immutable = immutable;
+        this.keepAppend = true;
+        this.maxVersions = maxVersions;
+        this.versionTTL = versionTTL;
         this.conf = new Configuration();
-        if (baseDir.endsWith("/")) {
-            this.baseDir = baseDir.substring(0, baseDir.length()-1);
-        } else {
-            this.baseDir = baseDir;
+        if (basePath.endsWith("/")) {
+            basePath = basePath.substring(0, basePath.length()-1);
+        }
+        this.baseDir = new Path(basePath);
+        this.fs = FileSystem.get(baseDir.toUri(), conf);
+        if (!fs.exists(baseDir)) {
+            fs.mkdirs(baseDir);
+        }
+        this.versionDir = getLatestVersion(conf, fs, baseDir);
+        this.workingDir = new Path(baseDir, "working");
+        if (!this.immutable) {
+            // For mutable map, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt
+            if (fs.exists(workingDir)) {
+                fs.delete(workingDir, true);
+            }
+            FileUtil.copy(fs, versionDir, fs, workingDir, false, true, conf);
         }
-        this.tmpDir = this.baseDir + ".tmp";
-        this.fs = FileSystem.get(new Path(baseDir).toUri(), conf);
-        this.persistent = persistent;
-        this.immutable = immutable;
         CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new RemovalListener<K, V>() {
             @Override
             public void onRemoval(RemovalNotification<K, V> notification) {
@@ -152,24 +175,14 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
                     deleteValue(notification.getKey());
                     break;
                 default:
-                    throw new RuntimeException("unexpected evict reason " + notification.getCause());
                 }
             }
         });
-        // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc
         if (this.immutable) {
+            // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc
             builder.softValues();
         } else {
             builder.maximumSize(maxCount);
-            // For mutable map, copy all data into tmp and modify on tmp data, avoiding suddenly server crash made data corrupt
-            if (fs.exists(new Path(tmpDir))) {
-                fs.delete(new Path(tmpDir), true);
-            }
-            if (fs.exists(new Path(this.baseDir))) {
-                FileUtil.copy(fs, new Path(this.baseDir), fs, new Path(tmpDir), false, true, conf);
-            } else {
-                fs.mkdirs(new Path(this.baseDir));
-            }
         }
         this.valueCache = builder.build(new CacheLoader<K, V>() {
             @Override
@@ -182,38 +195,108 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
     }
 
     private String generateFileName(K key) {
-        String file = (immutable ? baseDir : tmpDir) + "/cached_" + key.toString();
+        String file = getCurrentDir() + "/" + CACHED_PREFIX + key.toString();
         return file;
     }
 
-    public String getCurrentDir() {
-        return immutable ? baseDir : tmpDir;
+    private String getCurrentDir() {
+        return immutable ? versionDir.toString() : workingDir.toString();
     }
 
-    public void commit(boolean stillMutable) throws IOException {
-        assert !immutable : "Only support commit method with immutable false";
+    private static String[] listAllVersions(FileSystem fs, Path baseDir) throws IOException {
+        FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                if (path.getName().startsWith(VERSION_PREFIX)) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        TreeSet<String> versions = new TreeSet<>();
+        for (FileStatus status : fileStatus) {
+            versions.add(status.getPath().toString());
+        }
+        return versions.toArray(new String[versions.size()]);
+    }
 
-        Path basePath = new Path(baseDir);
-        Path backupPath = new Path(baseDir+".bak");
-        Path tmpPath = new Path(tmpDir);
-        try {
-            fs.rename(basePath, backupPath);
-        } catch (IOException e) {
-            logger.info("CachedTreeMap commit backup basedir failed, " + e, e);
-            throw e;
+    // only for test
+    public String getLatestVersion() throws IOException {
+        return getLatestVersion(conf, fs, baseDir).toUri().getPath();
+    }
+
+    private static Path getLatestVersion(Configuration conf, FileSystem fs, Path baseDir) throws IOException {
+        String[] versions = listAllVersions(fs, baseDir);
+        if (versions.length > 0) {
+            return new Path(versions[versions.length - 1]);
+        } else {
+            // Old format, directly use base dir, convert to new format
+            Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis());
+            Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis());
+            Path indexFile = new Path(baseDir, ".index");
+            FileStatus[] cachedFiles;
+            try {
+                cachedFiles = fs.listStatus(baseDir, new PathFilter() {
+                    @Override
+                    public boolean accept(Path path) {
+                        if (path.getName().startsWith(CACHED_PREFIX)) {
+                            return true;
+                        }
+                        return false;
+                    }
+                });
+                fs.mkdirs(tmpNewVersionDir);
+                if (fs.exists(indexFile) && cachedFiles.length > 0) {
+                    FileUtil.copy(fs, indexFile, fs, tmpNewVersionDir, false, true, conf);
+                    for (FileStatus file : cachedFiles) {
+                        FileUtil.copy(fs, file.getPath(), fs, tmpNewVersionDir, false, true, conf);
+                    }
+                }
+                fs.rename(tmpNewVersionDir, newVersionDir);
+                if (fs.exists(indexFile) && cachedFiles.length > 0) {
+                    fs.delete(indexFile, true);
+                    for (FileStatus file : cachedFiles) {
+                        fs.delete(file.getPath(), true);
+                    }
+                }
+            } finally {
+                if (fs.exists(tmpNewVersionDir)) {
+                    fs.delete(tmpNewVersionDir, true);
+                }
+            }
+            return newVersionDir;
         }
+    }
 
-        try {
-            if (stillMutable) {
-                FileUtil.copy(fs, tmpPath, fs, basePath, false, true, conf);
-            } else {
-                fs.rename(tmpPath, basePath);
+    public void commit(boolean keepAppend) throws IOException {
+        assert this.keepAppend & !immutable : "Only support commit method with immutable false and keepAppend true";
+
+        Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis());
+        if (keepAppend) {
+            // Copy to tmp dir, and rename to new version, make sure it's complete when be visible
+            Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis());
+            try {
+                FileUtil.copy(fs, workingDir, fs, tmpNewVersionDir, false, true, conf);
+                fs.rename(tmpNewVersionDir, newVersionDir);
+            } finally {
+                if (fs.exists(tmpNewVersionDir)) {
+                    fs.delete(tmpNewVersionDir, true);
+                }
+            }
+        } else {
+            fs.rename(workingDir, newVersionDir);
+        }
+        this.keepAppend = keepAppend;
+
+        // Check versions count, delete expired versions
+        String[] versions = listAllVersions(fs, baseDir);
+        long timestamp = System.currentTimeMillis();
+        for (int i = 0; i < versions.length - maxVersions; i++) {
+            String versionString = versions[i].substring(versions[i].lastIndexOf(VERSION_PREFIX) + VERSION_PREFIX.length());
+            long version = Long.parseLong(versionString);
+            if (version + versionTTL < timestamp) {
+                fs.delete(new Path(versions[i]), true);
             }
-            fs.delete(backupPath, true);
-        } catch (IOException e) {
-            fs.rename(backupPath, basePath);
-            logger.info("CachedTreeMap commit move/copy tmpdir failed, " + e, e);
-            throw e;
         }
     }
 
@@ -227,25 +310,17 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
         if (immutable) {
             return;
         }
-        long t0 = System.currentTimeMillis();
         String fileName = generateFileName(key);
         Path filePath = new Path(fileName);
         try (FSDataOutputStream out = fs.create(filePath, true, BUFFER_SIZE, (short) 5, BUFFER_SIZE * 8)) {
             value.write(out);
-            if (!persistent) {
-                fs.deleteOnExit(filePath);
-            }
         } catch (Exception e) {
             logger.error(String.format("write value into %s exception: %s", fileName, e), e);
             throw new RuntimeException(e.getCause());
-        } finally {
-            fileList.add(fileName);
-            writeValueTime += System.currentTimeMillis() - t0;
         }
     }
 
     private V readValue(K key) throws Exception {
-        long t0 = System.currentTimeMillis();
         String fileName = generateFileName(key);
         Path filePath = new Path(fileName);
         try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) {
@@ -255,13 +330,11 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
         } catch (Exception e) {
             logger.error(String.format("read value from %s exception: %s", fileName, e), e);
             return null;
-        } finally {
-            readValueTime += System.currentTimeMillis() - t0;
         }
     }
 
     private void deleteValue(K key) {
-        if (persistent && immutable) {
+        if (immutable) {
             return;
         }
         String fileName = generateFileName(key);
@@ -272,14 +345,12 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
             }
         } catch (Exception e) {
             logger.error(String.format("delete value file %s exception: %s", fileName, e), e);
-        } finally {
-            fileList.remove(fileName);
         }
     }
 
     @Override
     public V put(K key, V value) {
-        assert !immutable : "Only support put method with immutable false";
+        assert keepAppend & !immutable : "Only support put method with immutable false and keepAppend true";
         super.put(key, null);
         valueCache.put(key, value);
         return null;
@@ -301,7 +372,7 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
 
     @Override
     public V remove(Object key) {
-        assert !immutable : "Only support remove method with immutable false";
+        assert keepAppend & !immutable : "Only support remove method with immutable false keepAppend true";
         super.remove(key);
         valueCache.invalidate(key);
         return null;
@@ -357,15 +428,32 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
 
         @Override
         public void remove() {
-            assert !immutable : "Only support remove method with immutable false";
+            assert keepAppend & !immutable : "Only support remove method with immutable false and keepAppend true";
             keyIterator.remove();
             valueCache.invalidate(currentKey);
         }
     }
 
+    public FSDataOutputStream openIndexOutput() throws IOException {
+        assert keepAppend & !immutable : "Only support write method with immutable false and keepAppend true";
+        Path indexPath = new Path(getCurrentDir(), ".index");
+        return fs.create(indexPath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8);
+    }
+
+    public FSDataInputStream openIndexInput() throws IOException {
+        Path indexPath = new Path(getCurrentDir(), ".index");
+        return fs.open(indexPath, 8 * 1024 * 1024);
+    }
+
+    public static FSDataInputStream openLatestIndexInput(Configuration conf, String baseDir) throws IOException {
+        Path basePath = new Path(baseDir);
+        FileSystem fs = FileSystem.get(basePath.toUri(), conf);
+        Path indexPath = new Path(getLatestVersion(conf, fs, basePath), ".index");
+        return fs.open(indexPath, 8 * 1024 * 1024);
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
-        assert persistent : "Only support serialize with persistent true";
         out.writeInt(size());
         for (K key : keySet()) {
             key.write(out);
@@ -378,7 +466,6 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
 
     @Override
     public void readFields(DataInput in) throws IOException {
-        assert persistent : "Only support deserialize with persistent true";
         int size = in.readInt();
         try {
             for (int i = 0; i < size; i++) {
@@ -390,27 +477,4 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
             throw new IOException(e);
         }
     }
-
-    // clean up all tmp files
-    @Override
-    public void finalize() throws Throwable {
-        if (persistent) {
-            return;
-        }
-        try {
-            this.clear();
-            for (String file : fileList) {
-                try {
-                    Path filePath = new Path(file);
-                    fs.delete(filePath, true);
-                } catch (Throwable t) {
-                    //do nothing?
-                }
-            }
-        } catch (Throwable t) {
-            //do nothing
-        } finally {
-            super.finalize();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index b2a3664..cda3c2b 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -19,14 +19,8 @@
 package org.apache.kylin.dict;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.NavigableSet;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments.
@@ -34,41 +28,15 @@ import org.slf4j.LoggerFactory;
  * Created by sunyerui on 16/5/24.
  */
 public class GlobalDictionaryBuilder implements IDictionaryBuilder {
-    private static final Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
-
     AppendTrieDictionary.Builder<String> builder;
     int baseId;
-    
+
     @Override
     public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
         if (dictInfo == null) {
             throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
         }
-        String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + dictInfo.getResourceDir() + "/";
-
-        // Try to load the existing dict from cache, making sure there's only the same one object in memory
-        NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(dictInfo.getResourceDir());
-        ArrayList<String> appendDicts = new ArrayList<>();
-        if (dicts != null && !dicts.isEmpty()) {
-            for (String dict : dicts) {
-                DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
-                if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) {
-                    appendDicts.add(dict);
-                }
-            }
-        }
-
-        if (appendDicts.isEmpty()) {
-            logger.info("GlobalDict {} is empty, create new one", dictInfo.getResourceDir());
-            this.builder = AppendTrieDictionary.Builder.create(dictDir);
-        } else if (appendDicts.size() == 1) {
-            logger.info("GlobalDict {} exist, append value", appendDicts.get(0));
-            AppendTrieDictionary dict = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0));
-            this.builder = AppendTrieDictionary.Builder.create(dict);
-        } else {
-            throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", dictInfo.getResourceDir(), appendDicts.size()));
-        }
-        
+        this.builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir());
         this.baseId = baseId;
     }
     

http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index 5e1705a..a7e8152 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -20,13 +20,15 @@ package org.apache.kylin.dict;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -35,6 +37,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Random;
 import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -51,6 +55,7 @@ import org.junit.Test;
 public class AppendTrieDictionaryTest {
 
     public static final String BASE_DIR = "/tmp/kylin_append_dict";
+    public static final String RESOURCE_DIR = "/dict/append_dict_test";
 
     @BeforeClass
     public static void setUp() {
@@ -64,22 +69,28 @@ public class AppendTrieDictionaryTest {
 
     @AfterClass
     public static void tearDown() {
-        String workingDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+        cleanup();
+    }
+
+//    @After
+    public void afterTest() {
+        cleanup();
+    }
+
+    public static void cleanup() {
+        Configuration conf = new Configuration();
+        Path basePath = new Path(BASE_DIR);
         try {
-            FileSystem.get(new Path(workingDir).toUri(), new Configuration()).delete(new Path(workingDir), true);
-        } catch (IOException e) {
-        }
-        File tmpLocalDir = new File(BASE_DIR);
-        if (tmpLocalDir.exists()) {
-            for (File f : tmpLocalDir.listFiles()) {
-                f.delete();
-            }
-            tmpLocalDir.delete();
-        }
+            FileSystem.get(basePath.toUri(), conf).delete(basePath, true);
+        } catch (IOException e) {}
     }
 
     public static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
             "", // empty
+            "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
+            "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
+            "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
+              + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
             "paint", "tar", "try", // some dup
     };
 
@@ -131,8 +142,7 @@ public class AppendTrieDictionaryTest {
     @Ignore("need huge key set")
     @Test
     public void testHugeKeySet() throws IOException {
-        BytesConverter converter = new StringBytesConverter();
-        AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create(BASE_DIR);
+        AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
         AppendTrieDictionary<String> dict = null;
 
         InputStream is = new FileInputStream("src/test/resources/dict/huge_key");
@@ -162,7 +172,7 @@ public class AppendTrieDictionaryTest {
         }
         BytesConverter converter = new StringBytesConverter();
 
-        AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create(BASE_DIR);
+        AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
         AppendTrieDictionary<String> dict = null;
         TreeMap<Integer, String> checkMap = new TreeMap<>();
         int firstAppend = rnd.nextInt(strList.size() / 2);
@@ -179,12 +189,14 @@ public class AppendTrieDictionaryTest {
             String str = strList.get(checkIndex);
             byte[] bytes = converter.convertToBytes(str);
             int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+            assertNotEquals(String.format("Value %s not exist", str), -1, id);
             assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id)));
             checkMap.put(id, str);
         }
 
         // reopen dict and append
-        b = AppendTrieDictionary.Builder.create(dict);
+//        b = AppendTrieDictionary.Builder.create(dict);
+        b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
         for (; appendIndex < secondAppend; appendIndex++) {
             b.addValue(strList.get(appendIndex));
         }
@@ -197,6 +209,7 @@ public class AppendTrieDictionaryTest {
             String str = strList.get(checkIndex);
             byte[] bytes = converter.convertToBytes(str);
             int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+            assertNotEquals(String.format("Value %s not exist", str), -1, id);
             if (checkIndex < firstAppend) {
                 assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
             } else {
@@ -207,7 +220,7 @@ public class AppendTrieDictionaryTest {
         }
 
         // reopen dict and append rest str
-        b = AppendTrieDictionary.Builder.create(dict);
+        b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
         for (; appendIndex < strList.size(); appendIndex++) {
             b.addValue(strList.get(appendIndex));
         }
@@ -220,6 +233,7 @@ public class AppendTrieDictionaryTest {
             String str = strList.get(checkIndex);
             byte[] bytes = converter.convertToBytes(str);
             int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+            assertNotEquals(String.format("Value %s not exist", str), -1, id);
             if (checkIndex < secondAppend) {
                 assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
             } else {
@@ -240,6 +254,7 @@ public class AppendTrieDictionaryTest {
         for (String str : strList) {
             byte[] bytes = converter.convertToBytes(str);
             int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+            assertNotEquals(String.format("Value %s not exist", str), -1, id);
             assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
         }
     }
@@ -260,4 +275,103 @@ public class AppendTrieDictionaryTest {
             throw new RuntimeException(e);
         }
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testMaxInteger() throws IOException {
+        AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+        builder.setMaxId(Integer.MAX_VALUE - 2);
+        builder.addValue("a");
+        builder.addValue("ab");
+        builder.addValue("acd");
+        builder.addValue("ac");
+        AppendTrieDictionary dict = builder.build(0);
+        assertEquals(2147483646, dict.getIdFromValueImpl("a", 0));
+        assertEquals(2147483647, dict.getIdFromValueImpl("ab", 0));
+        assertEquals(-2147483647, dict.getIdFromValueImpl("ac", 0));
+        assertEquals(-2147483648, dict.getIdFromValueImpl("acd", 0));
+    }
+
+    @Ignore("Only occurred when value is very long (>8000 bytes)")
+    @Test
+    public void testSuperLongValue() throws IOException {
+        AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+        String value = "a";
+        for (int i = 0; i < 10000; i++) {
+            value += "a";
+            try {
+                builder.addValue(value);
+            } catch (StackOverflowError e) {
+                System.out.println("\nstack overflow " + i);
+                throw e;
+            }
+        }
+        AppendTrieDictionary dictionary = builder.build(0);
+        dictionary.getMaxId();
+    }
+
+    private static class SharedBuilderThread extends Thread {
+        CountDownLatch startLatch;
+        CountDownLatch finishLatch;
+        String resourcePath;
+        String prefix;
+        int count;
+
+        SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String resourcePath, String prefix, int count) {
+            this.startLatch = startLatch;
+            this.finishLatch = finishLatch;
+            this.resourcePath = resourcePath;
+            this.prefix = prefix;
+            this.count = count;
+        }
+
+        @Override
+        public void run() {
+            try {
+                AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
+                startLatch.countDown();
+                for (int i = 0; i < count; i++) {
+                    builder.addValue(prefix + i);
+                }
+                builder.build(0);
+                finishLatch.countDown();
+            } catch (IOException e) {}
+        }
+    }
+
+    @Test
+    public void testSharedBuilder() throws IOException, InterruptedException {
+        String resourcePath = "shared_builder";
+        final CountDownLatch startLatch = new CountDownLatch(3);
+        final CountDownLatch finishLatch = new CountDownLatch(3);
+
+        AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
+        Thread t1 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t1_", 10000);
+        Thread t2 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t2_", 10);
+        Thread t3 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t3_", 100000);
+        t1.start();
+        t2.start();
+        t3.start();
+        startLatch.await();
+        AppendTrieDictionary dict = builder.build(0);
+        assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS));
+        assertEquals(110010, dict.getMaxId());
+        try {
+            builder.addValue("fail");
+            fail("Builder should be closed");
+        } catch (Exception e) {}
+
+        builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict);
+        builder.addValue("success");
+        dict = builder.build(0);
+        for (int i = 0; i < 10000; i ++) {
+            assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
+        }
+        for (int i = 0; i < 10; i ++) {
+            assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
+        }
+        for (int i = 0; i < 100000; i ++) {
+            assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
+        }
+        assertEquals(110011, dict.getIdFromValue("success"));
+    }
+}