You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/31 13:19:23 UTC

[06/50] [abbrv] kylin git commit: KYLIN-3092 Synchronize w/r operations on entity-caching managers

KYLIN-3092 Synchronize w/r operations on entity-caching managers


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f2f487fe
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f2f487fe
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f2f487fe

Branch: refs/heads/master
Commit: f2f487fe2dc868119562303cb8b9a0b630f704cf
Parents: b8d7987
Author: Li Yang <li...@apache.org>
Authored: Sat Dec 9 00:23:51 2017 +0800
Committer: Dong Li <li...@apache.org>
Committed: Mon Dec 11 10:16:35 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeDescManager.java  |  311 +++--
 .../org/apache/kylin/cube/CubeInstance.java     |   31 +-
 .../java/org/apache/kylin/cube/CubeManager.java | 1079 +++++++++---------
 .../kylin/cube/cli/CubeSignatureRefresher.java  |    2 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |   29 +-
 .../kylin/metadata/TableMetadataManager.java    |  583 +++++-----
 .../kylin/metadata/TempStatementManager.java    |  157 ++-
 .../org/apache/kylin/metadata/acl/TableACL.java |   11 +
 .../kylin/metadata/acl/TableACLManager.java     |  111 +-
 .../metadata/cachesync/CachedCrudAssist.java    |   72 +-
 .../kylin/metadata/model/DataModelDesc.java     |    7 +-
 .../kylin/metadata/model/DataModelManager.java  |  269 +++--
 .../metadata/model/ExternalFilterDesc.java      |    5 +
 .../apache/kylin/metadata/model/TableDesc.java  |   79 +-
 .../kylin/metadata/model/TableExtDesc.java      |   38 +-
 .../kylin/metadata/project/ProjectInstance.java |    2 +-
 .../kylin/metadata/project/ProjectManager.java  |   12 +-
 .../metadata/streaming/StreamingConfig.java     |    6 +
 .../metadata/streaming/StreamingManager.java    |  183 +--
 .../metadata/TempStatementManagerTest.java      |    6 +-
 .../streaming/StreamingManagerTest.java         |   68 ++
 .../kylin/storage/hybrid/HybridInstance.java    |   37 +-
 .../kylin/storage/hybrid/HybridManager.java     |  133 ++-
 .../apache/kylin/engine/spark/SparkCubing.java  |    4 +-
 .../DEFAULT_SESSION/temp_table1.json            |    1 +
 .../DEFAULT_SESSION/temp_table2.json            |    1 +
 .../kylin/provision/BuildCubeWithStream.java    |    2 +-
 .../kylin/rest/controller/ModelController.java  |    2 +-
 .../kylin/rest/service/KafkaConfigService.java  |    2 +-
 .../kylin/rest/service/StreamingService.java    |    4 +-
 .../kylin/source/kafka/KafkaConfigManager.java  |  166 +--
 .../kylin/source/kafka/config/KafkaConfig.java  |    8 +-
 32 files changed, 1746 insertions(+), 1675 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index f724549..a58ba40 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -25,9 +25,9 @@ import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.AutoReadWriteLock;
+import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
 import org.apache.kylin.cube.cuboid.CuboidManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.CubeMetadataValidator;
@@ -36,9 +36,9 @@ import org.apache.kylin.dimension.DictionaryDimEnc;
 import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.dimension.DimensionEncodingFactory;
 import org.apache.kylin.measure.topn.TopNMeasureType;
-import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -59,8 +59,6 @@ public class CubeDescManager {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeDescManager.class);
 
-    public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new JsonSerializer<CubeDesc>(CubeDesc.class);
-
     public static CubeDescManager getInstance(KylinConfig config) {
         return config.getManager(CubeDescManager.class);
     }
@@ -69,20 +67,42 @@ public class CubeDescManager {
     static CubeDescManager newInstance(KylinConfig config) throws IOException {
         return new CubeDescManager(config);
     }
-
+    
     // ============================================================================
 
     private KylinConfig config;
+    
     // name ==> CubeDesc
     private CaseInsensitiveStringCache<CubeDesc> cubeDescMap;
+    private CachedCrudAssist<CubeDesc> crud;
+
+    // protects concurrent operations around the cached map, to avoid for example
+    // writing an entity in the middle of reloading it (dirty read)
+    private AutoReadWriteLock descMapLock = new AutoReadWriteLock();
 
-    private CubeDescManager(KylinConfig config) throws IOException {
-        logger.info("Initializing CubeDescManager with config " + config);
-        this.config = config;
+    private CubeDescManager(KylinConfig cfg) throws IOException {
+        logger.info("Initializing CubeDescManager with config " + cfg);
+        this.config = cfg;
         this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc");
+        this.crud = new CachedCrudAssist<CubeDesc>(getStore(), ResourceStore.CUBE_DESC_RESOURCE_ROOT, CubeDesc.class,
+                cubeDescMap) {
+            @Override
+            protected CubeDesc initEntityAfterReload(CubeDesc cubeDesc, String resourceName) {
+                if (cubeDesc.isDraft())
+                    throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft");
+
+                try {
+                    cubeDesc.init(config);
+                } catch (Exception e) {
+                    logger.warn("Broken cube desc " + cubeDesc.resourceName(), e);
+                    cubeDesc.addError(e.getMessage());
+                }
+                return cubeDesc;
+            }
+        };
 
         // touch lower level metadata before registering my listener
-        reloadAllCubeDesc();
+        crud.reloadAll();
         Broadcaster.getInstance(config).registerListener(new CubeDescSyncListener(), "cube_desc");
     }
 
@@ -93,7 +113,7 @@ public class CubeDescManager {
             for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
                 if (real instanceof CubeInstance) {
                     String descName = ((CubeInstance) real).getDescName();
-                    reloadCubeDescLocal(descName);
+                    reloadCubeDescQuietly(descName);
                 }
             }
         }
@@ -108,7 +128,7 @@ public class CubeDescManager {
             if (event == Event.DROP)
                 removeLocalCubeDesc(cubeDescName);
             else
-                reloadCubeDescLocal(cubeDescName);
+                reloadCubeDescQuietly(cubeDescName);
 
             for (ProjectInstance prj : ProjectManager.getInstance(config).findProjectsByModel(modelName)) {
                 broadcaster.notifyProjectSchemaUpdate(prj.getName());
@@ -117,58 +137,45 @@ public class CubeDescManager {
     }
 
     public CubeDesc getCubeDesc(String name) {
-        return cubeDescMap.get(name);
+        try (AutoLock lock = descMapLock.lockForRead()) {
+            return cubeDescMap.get(name);
+        }
     }
 
     public List<CubeDesc> listAllDesc() {
-        return new ArrayList<CubeDesc>(cubeDescMap.values());
-    }
-
-    /**
-     * Reload CubeDesc from resource store It will be triggered by an desc
-     * update event.
-     * 
-     * @param name
-     * @throws IOException
-     */
-    public CubeDesc reloadCubeDescLocal(String name) throws IOException {
-        // Broken CubeDesc is not allowed to be saved and broadcast.
-        CubeDesc ndesc = loadCubeDesc(CubeDesc.concatResourcePath(name), false);
-
-        cubeDescMap.putLocal(ndesc.getName(), ndesc);
-        clearCuboidCache(ndesc.getName());
-
-        // if related cube is in DESCBROKEN state before, change it back to DISABLED
-        CubeManager cubeManager = CubeManager.getInstance(config);
-        for (CubeInstance cube : cubeManager.getCubesByDesc(name)) {
-            if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
-                cubeManager.reloadCubeLocal(cube.getName());
-            }
+        try (AutoLock lock = descMapLock.lockForRead()) {
+            return new ArrayList<CubeDesc>(cubeDescMap.values());
         }
-
-        return ndesc;
     }
-
-    private CubeDesc loadCubeDesc(String path, boolean allowBroken) throws IOException {
-        ResourceStore store = getStore();
-        CubeDesc ndesc = store.getResource(path, CubeDesc.class, CUBE_DESC_SERIALIZER);
-        if (ndesc == null)
-            throw new IllegalArgumentException("No cube desc found at " + path);
-        if (ndesc.isDraft())
-            throw new IllegalArgumentException("CubeDesc '" + ndesc.getName() + "' must not be a draft");
-
-        try {
-            ndesc.init(config);
+    
+    public CubeDesc reloadCubeDescQuietly(String name) {
+        try (AutoLock lock = descMapLock.lockForWrite()) {
+            return reloadCubeDescLocal(name);
         } catch (Exception e) {
-            logger.warn("Broken cube desc " + path, e);
-            ndesc.addError(e.getMessage());
+            logger.error("Failed to reload CubeDesc " + name, e);
+            return null;
         }
+    }
 
-        if (!allowBroken && !ndesc.getError().isEmpty()) {
-            throw new IllegalStateException("Cube desc at " + path + " has issues: " + ndesc.getError());
+    public CubeDesc reloadCubeDescLocal(String name) throws IOException {
+        try (AutoLock lock = descMapLock.lockForWrite()) {
+            CubeDesc ndesc = crud.reload(name);
+            clearCuboidCache(name);
+            
+            // Broken CubeDesc is not allowed to be saved and broadcast.
+            if (ndesc.isBroken())
+                throw new IllegalStateException("CubeDesc " + name + " is broken");
+    
+            // if related cube is in DESCBROKEN state before, change it back to DISABLED
+            CubeManager cubeManager = CubeManager.getInstance(config);
+            for (CubeInstance cube : cubeManager.getCubesByDesc(name)) {
+                if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
+                    cube.init(config);
+                }
+            }
+    
+            return ndesc;
         }
-
-        return ndesc;
     }
 
     /**
@@ -179,38 +186,83 @@ public class CubeDescManager {
      * @throws IOException
      */
     public CubeDesc createCubeDesc(CubeDesc cubeDesc) throws IOException {
-        if (cubeDesc.getUuid() == null || cubeDesc.getName() == null)
-            throw new IllegalArgumentException();
-        if (cubeDescMap.containsKey(cubeDesc.getName()))
-            throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' already exists");
-        if (cubeDesc.isDraft())
-            throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft");
-
-        try {
-            cubeDesc.init(config);
-        } catch (Exception e) {
-            logger.warn("Broken cube desc " + cubeDesc, e);
-            cubeDesc.addError(e.getMessage());
-        }
-        postProcessCubeDesc(cubeDesc);
-        // Check base validation
-        if (!cubeDesc.getError().isEmpty()) {
-            return cubeDesc;
-        }
-        // Semantic validation
-        CubeMetadataValidator validator = new CubeMetadataValidator();
-        ValidateContext context = validator.validate(cubeDesc);
-        if (!context.ifPass()) {
+        try (AutoLock lock = descMapLock.lockForWrite()) {
+            if (cubeDesc.getUuid() == null || cubeDesc.getName() == null)
+                throw new IllegalArgumentException();
+            if (cubeDescMap.containsKey(cubeDesc.getName()))
+                throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' already exists");
+            if (cubeDesc.isDraft())
+                throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft");
+
+            try {
+                cubeDesc.init(config);
+            } catch (Exception e) {
+                logger.warn("Broken cube desc " + cubeDesc, e);
+                cubeDesc.addError(e.getMessage());
+            }
+            
+            postProcessCubeDesc(cubeDesc);
+            // Check base validation
+            if (!cubeDesc.getError().isEmpty()) {
+                return cubeDesc;
+            }
+            // Semantic validation
+            CubeMetadataValidator validator = new CubeMetadataValidator();
+            ValidateContext context = validator.validate(cubeDesc);
+            if (!context.ifPass()) {
+                return cubeDesc;
+            }
+
+            cubeDesc.setSignature(cubeDesc.calculateSignature());
+
+            // save resource
+            crud.save(cubeDesc);
+            
             return cubeDesc;
         }
+    }
+
+    /**
+     * Update CubeDesc with the input. Broadcast the event into cluster
+     * 
+     * @param desc
+     * @return
+     * @throws IOException
+     */
+    public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
+        try (AutoLock lock = descMapLock.lockForWrite()) {
+            // Validate CubeDesc
+            if (desc.getUuid() == null || desc.getName() == null)
+                throw new IllegalArgumentException();
+            String name = desc.getName();
+            if (!cubeDescMap.containsKey(name))
+                throw new IllegalArgumentException("CubeDesc '" + name + "' does not exist.");
+            if (desc.isDraft())
+                throw new IllegalArgumentException("CubeDesc '" + desc.getName() + "' must not be a draft");
+
+            try {
+                desc.init(config);
+            } catch (Exception e) {
+                logger.warn("Broken cube desc " + desc, e);
+                desc.addError(e.getMessage());
+                return desc;
+            }
+
+            postProcessCubeDesc(desc);
+            // Semantic validation
+            CubeMetadataValidator validator = new CubeMetadataValidator();
+            ValidateContext context = validator.validate(desc);
+            if (!context.ifPass()) {
+                return desc;
+            }
 
-        cubeDesc.setSignature(cubeDesc.calculateSignature());
+            desc.setSignature(desc.calculateSignature());
 
-        String path = cubeDesc.getResourcePath();
-        getStore().putResource(path, cubeDesc, CUBE_DESC_SERIALIZER);
-        cubeDescMap.put(cubeDesc.getName(), cubeDesc);
+            // save resource
+            crud.save(desc);
 
-        return cubeDesc;
+            return desc;
+        }
     }
 
     /**
@@ -259,16 +311,18 @@ public class CubeDescManager {
 
     // remove cubeDesc
     public void removeCubeDesc(CubeDesc cubeDesc) throws IOException {
-        String path = cubeDesc.getResourcePath();
-        getStore().deleteResource(path);
-        cubeDescMap.remove(cubeDesc.getName());
-        clearCuboidCache(cubeDesc.getName());
+        try (AutoLock lock = descMapLock.lockForWrite()) {
+            crud.delete(cubeDesc);
+            clearCuboidCache(cubeDesc.getName());
+        }
     }
 
     // remove cubeDesc
     public void removeLocalCubeDesc(String name) throws IOException {
-        cubeDescMap.removeLocal(name);
-        clearCuboidCache(name);
+        try (AutoLock lock = descMapLock.lockForWrite()) {
+            cubeDescMap.removeLocal(name);
+            clearCuboidCache(name);
+        }
     }
     
     private void clearCuboidCache(String descName) {
@@ -276,87 +330,6 @@ public class CubeDescManager {
         CuboidManager.getInstance(config).clearCache(descName);
     }
 
-    private void reloadAllCubeDesc() throws IOException {
-        ResourceStore store = getStore();
-        logger.info("Reloading Cube Metadata from folder "
-                + store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT));
-
-        cubeDescMap.clear();
-
-        List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_DESC_RESOURCE_ROOT,
-                MetadataConstants.FILE_SURFIX);
-        for (String path : paths) {
-            CubeDesc desc = null;
-            try {
-                desc = loadCubeDesc(path, true);
-            } catch (Exception e) {
-                logger.error("Error during load cube desc, skipping " + path, e);
-                continue;
-            }
-
-            if (!path.equals(desc.getResourcePath())) {
-                logger.error(
-                        "Skip suspicious desc at " + path + ", " + desc + " should be at " + desc.getResourcePath());
-                continue;
-            }
-            if (cubeDescMap.containsKey(desc.getName())) {
-                logger.error("Dup CubeDesc name '" + desc.getName() + "' on path " + path);
-                continue;
-            }
-
-            cubeDescMap.putLocal(desc.getName(), desc);
-        }
-
-        logger.info("Loaded " + cubeDescMap.size() + " Cube Desc(s)");
-    }
-
-    /**
-     * Update CubeDesc with the input. Broadcast the event into cluster
-     * 
-     * @param desc
-     * @return
-     * @throws IOException
-     */
-    public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
-        // Validate CubeDesc
-        if (desc.getUuid() == null || desc.getName() == null)
-            throw new IllegalArgumentException();
-        String name = desc.getName();
-        if (!cubeDescMap.containsKey(name))
-            throw new IllegalArgumentException("CubeDesc '" + name + "' does not exist.");
-        if (desc.isDraft())
-            throw new IllegalArgumentException("CubeDesc '" + desc.getName() + "' must not be a draft");
-
-        try {
-            desc.init(config);
-        } catch (Exception e) {
-            logger.warn("Broken cube desc " + desc, e);
-            desc.addError(e.getMessage());
-            return desc;
-        }
-
-        postProcessCubeDesc(desc);
-        // Semantic validation
-        CubeMetadataValidator validator = new CubeMetadataValidator();
-        ValidateContext context = validator.validate(desc);
-        if (!context.ifPass()) {
-            return desc;
-        }
-
-        desc.setSignature(desc.calculateSignature());
-
-        // Save Source
-        String path = desc.getResourcePath();
-        getStore().putResource(path, desc, CUBE_DESC_SERIALIZER);
-
-        // Reload the CubeDesc
-        CubeDesc ndesc = loadCubeDesc(path, false);
-        // Here replace the old one
-        cubeDescMap.put(ndesc.getName(), desc);
-
-        return ndesc;
-    }
-
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index d1c5166..1be7923 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.cube;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +52,8 @@ import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.metadata.realization.SQLDigest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -63,6 +67,8 @@ import com.google.common.collect.Lists;
 @SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class CubeInstance extends RootPersistentEntity implements IRealization, IBuildable {
+    private static final Logger logger = LoggerFactory.getLogger(CubeInstance.class);
+
     public static final int COST_WEIGHT_MEASURE = 1;
     public static final int COST_WEIGHT_DIMENSION = 10;
     public static final int COST_WEIGHT_INNER_JOIN = 100;
@@ -121,6 +127,24 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
     // default constructor for jackson
     public CubeInstance() {
     }
+    
+    void init(KylinConfig config) {
+        CubeDesc cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(descName);
+        checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not found", descName, name);
+
+        if (cubeDesc.isBroken()) {
+            setStatus(RealizationStatusEnum.DESCBROKEN);
+            logger.error("cube descriptor {} (for cube '{}') is broken", cubeDesc.getResourcePath(), name);
+            for (String error : cubeDesc.getError()) {
+                logger.error("Error: {}", error);
+            }
+        } else if (getStatus() == RealizationStatusEnum.DESCBROKEN) {
+            setStatus(RealizationStatusEnum.DISABLED);
+            logger.info("cube {} changed from DESCBROKEN to DISABLED", name);
+        }
+
+        setConfig((KylinConfigExt) cubeDesc.getConfig());
+    }
 
     public CuboidScheduler getCuboidScheduler() {
         if (cuboidScheduler != null)
@@ -174,9 +198,14 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         return (getStatus() == RealizationStatusEnum.DISABLED || getStatus() == RealizationStatusEnum.DESCBROKEN)
                 && segments.isEmpty();
     }
+    
+    @Override
+    public String resourceName() {
+        return name;
+    }
 
     public String getResourcePath() {
-        return concatResourcePath(name);
+        return concatResourcePath(resourceName());
     }
 
     public static String concatResourcePath(String cubeName) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index e00735c..3220a0f 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -18,13 +18,9 @@
 
 package org.apache.kylin.cube;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -37,10 +33,11 @@ import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.AutoReadWriteLock;
+import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -54,6 +51,7 @@ import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
@@ -87,7 +85,7 @@ public class CubeManager implements IRealizationProvider {
     private static String ALPHA_NUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
 
     private static int HBASE_TABLE_LENGTH = 10;
-    public static final Serializer<CubeInstance> CUBE_SERIALIZER = new JsonSerializer<CubeInstance>(CubeInstance.class);
+    public static final Serializer<CubeInstance> CUBE_SERIALIZER = new JsonSerializer<>(CubeInstance.class);
 
     private static final Logger logger = LoggerFactory.getLogger(CubeManager.class);
 
@@ -103,21 +101,41 @@ public class CubeManager implements IRealizationProvider {
     // ============================================================================
 
     private KylinConfig config;
+
     // cube name ==> CubeInstance
     private CaseInsensitiveStringCache<CubeInstance> cubeMap;
-    // "table/column" ==> lookup table
-    //    private SingleValueCache<String, LookupStringTable> lookupTables = new SingleValueCache<String, LookupStringTable>(Broadcaster.TYPE.METADATA);
+    private CachedCrudAssist<CubeInstance> crud;
+
+    // protects concurrent operations around the cached map, to avoid for example
+    // writing an entity in the middle of reloading it (dirty read)
+    private AutoReadWriteLock cubeMapLock = new AutoReadWriteLock();
 
     // for generation hbase table name of a new segment
     private ConcurrentMap<String, String> usedStorageLocation = new ConcurrentHashMap<>();
 
-    private CubeManager(KylinConfig config) throws IOException {
+    // a few inner classes to group related methods
+    private SegmentAssist segAssist = new SegmentAssist();
+    private DictionaryAssist dictAssist = new DictionaryAssist();
+
+    private CubeManager(KylinConfig cfg) throws IOException {
         logger.info("Initializing CubeManager with config " + config);
-        this.config = config;
+        this.config = cfg;
         this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
+        this.crud = new CachedCrudAssist<CubeInstance>(getStore(), ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class,
+                cubeMap) {
+            @Override
+            protected CubeInstance initEntityAfterReload(CubeInstance cube, String resourceName) {
+                cube.init(config);
+
+                for (CubeSegment segment : cube.getSegments()) {
+                    usedStorageLocation.put(segment.getUuid(), segment.getStorageLocationIdentifier());
+                }
+                return cube;
+            }
+        };
 
         // touch lower level metadata before registering my listener
-        loadAllCubeInstance();
+        crud.reloadAll();
         Broadcaster.getInstance(config).registerListener(new CubeSyncListener(), "cube");
     }
 
@@ -127,7 +145,7 @@ public class CubeManager implements IRealizationProvider {
         public void onProjectSchemaChange(Broadcaster broadcaster, String project) throws IOException {
             for (IRealization real : ProjectManager.getInstance(config).listAllRealizations(project)) {
                 if (real instanceof CubeInstance) {
-                    reloadCubeLocal(real.getName());
+                    reloadCubeQuietly(real.getName());
                 }
             }
         }
@@ -140,7 +158,7 @@ public class CubeManager implements IRealizationProvider {
             if (event == Event.DROP)
                 removeCubeLocal(cubeName);
             else
-                reloadCubeLocal(cubeName);
+                reloadCubeQuietly(cubeName);
 
             for (ProjectInstance prj : ProjectManager.getInstance(config).findProjects(RealizationType.CUBE,
                     cubeName)) {
@@ -150,20 +168,25 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public List<CubeInstance> listAllCubes() {
-        return new ArrayList<CubeInstance>(cubeMap.values());
+        try (AutoLock lock = cubeMapLock.lockForRead()) {
+            return new ArrayList<CubeInstance>(cubeMap.values());
+        }
     }
 
     public CubeInstance getCube(String cubeName) {
-        return cubeMap.get(cubeName);
+        try (AutoLock lock = cubeMapLock.lockForRead()) {
+            return cubeMap.get(cubeName);
+        }
     }
 
     public CubeInstance getCubeByUuid(String uuid) {
-        Collection<CubeInstance> copy = new ArrayList<CubeInstance>(cubeMap.values());
-        for (CubeInstance cube : copy) {
-            if (uuid.equals(cube.getUuid()))
-                return cube;
+        try (AutoLock lock = cubeMapLock.lockForRead()) {
+            for (CubeInstance cube : cubeMap.values()) {
+                if (uuid.equals(cube.getUuid()))
+                    return cube;
+            }
+            return null;
         }
-        return null;
     }
 
     /**
@@ -174,148 +197,57 @@ public class CubeManager implements IRealizationProvider {
      * @return
      */
     public List<CubeInstance> getCubesByDesc(String descName) {
-
-        List<CubeInstance> list = listAllCubes();
-        List<CubeInstance> result = new ArrayList<CubeInstance>();
-        Iterator<CubeInstance> it = list.iterator();
-        while (it.hasNext()) {
-            CubeInstance ci = it.next();
-            if (descName.equalsIgnoreCase(ci.getDescName())) {
-                result.add(ci);
+        try (AutoLock lock = cubeMapLock.lockForRead()) {
+            List<CubeInstance> list = listAllCubes();
+            List<CubeInstance> result = new ArrayList<CubeInstance>();
+            Iterator<CubeInstance> it = list.iterator();
+            while (it.hasNext()) {
+                CubeInstance ci = it.next();
+                if (descName.equalsIgnoreCase(ci.getDescName())) {
+                    result.add(ci);
+                }
             }
+            return result;
         }
-        return result;
-    }
-
-    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
-            throws IOException {
-        CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-        if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-            return null;
-
-        String builderClass = cubeDesc.getDictionaryBuilderClass(col);
-        DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
-
-        saveDictionaryInfo(cubeSeg, col, dictInfo);
-        return dictInfo;
     }
 
-    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
-            Dictionary<String> dict) throws IOException {
-        CubeDesc cubeDesc = cubeSeg.getCubeDesc();
-        if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
-            return null;
-
-        DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
-
-        saveDictionaryInfo(cubeSeg, col, dictInfo);
-        return dictInfo;
-    }
-
-    private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo) throws IOException {
-        if (dictInfo != null) {
-            Dictionary<?> dict = dictInfo.getDictionaryObject();
-            cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-            cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
-
-            CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance());
-            update.setToUpdateSegs(cubeSeg);
-            updateCube(update);
-        }
-    }
-
-    /**
-     * return null if no dictionary for given column
-     */
-    @SuppressWarnings("unchecked")
-    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
-        DictionaryInfo info = null;
-        try {
-            DictionaryManager dictMgr = getDictionaryManager();
-            String dictResPath = cubeSeg.getDictResPath(col);
-            if (dictResPath == null)
-                return null;
-
-            info = dictMgr.getDictionaryInfo(dictResPath);
-            if (info == null)
-                throw new IllegalStateException("No dictionary found by " + dictResPath
-                        + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, e);
-        }
-        return (Dictionary<String>) info.getDictionaryObject();
-    }
-
-    public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {
-        TableMetadataManager metaMgr = getTableManager();
-        SnapshotManager snapshotMgr = getSnapshotManager();
-
-        TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, cubeSeg.getProject()));
-        IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
-        SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
-
-        cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-        CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance());
-        cubeBuilder.setToUpdateSegs(cubeSeg);
-        updateCube(cubeBuilder);
-
-        return snapshot;
-    }
-
-    // sync on update
-    public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOException {
-        logger.info("Dropping cube '" + cubeName + "'");
-        // load projects before remove cube from project
-
-        // delete cube instance and cube desc
-        CubeInstance cube = getCube(cubeName);
-
-        // remove cube and update cache
-        getStore().deleteResource(cube.getResourcePath());
-        cubeMap.remove(cube.getName());
-        Cuboid.clearCache(cube);
-
-        if (deleteDesc && cube.getDescriptor() != null) {
-            CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor());
-        }
-
-        // delete cube from project
-        ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
-
-        return cube;
-    }
-
-    // sync on update
     public CubeInstance createCube(String cubeName, String projectName, CubeDesc desc, String owner)
             throws IOException {
-        logger.info("Creating cube '" + projectName + "-->" + cubeName + "' from desc '" + desc.getName() + "'");
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            logger.info("Creating cube '" + projectName + "-->" + cubeName + "' from desc '" + desc.getName() + "'");
 
-        // save cube resource
-        CubeInstance cube = CubeInstance.create(cubeName, desc);
-        cube.setOwner(owner);
-        updateCubeWithRetry(new CubeUpdate(cube), 0);
+            // save cube resource
+            CubeInstance cube = CubeInstance.create(cubeName, desc);
+            cube.setOwner(owner);
+            updateCubeWithRetry(new CubeUpdate(cube), 0);
 
-        ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
+            ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName,
+                    owner);
 
-        return cube;
+            return cube;
+        }
     }
 
     public CubeInstance createCube(CubeInstance cube, String projectName, String owner) throws IOException {
-        logger.info("Creating cube '" + projectName + "-->" + cube.getName() + "' from instance object. '");
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            logger.info("Creating cube '" + projectName + "-->" + cube.getName() + "' from instance object. '");
 
-        // save cube resource
-        cube.setOwner(owner);
-        updateCubeWithRetry(new CubeUpdate(cube), 0);
+            // save cube resource
+            cube.setOwner(owner);
+            updateCubeWithRetry(new CubeUpdate(cube), 0);
 
-        ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cube.getName(), projectName,
-                owner);
+            ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cube.getName(),
+                    projectName, owner);
 
-        return cube;
+            return cube;
+        }
     }
 
     public CubeInstance updateCube(CubeUpdate update) throws IOException {
-        CubeInstance cube = updateCubeWithRetry(update, 0);
-        return cube;
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            CubeInstance cube = updateCubeWithRetry(update, 0);
+            return cube;
+        }
     }
 
     private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
@@ -378,7 +310,7 @@ public class CubeManager implements IRealizationProvider {
         }
 
         try {
-            getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER);
+            crud.save(cube);
         } catch (IllegalStateException ise) {
             logger.warn("Write conflict to update cube " + cube.getName() + " at try " + retry + ", will retry...");
             if (retry >= 7) {
@@ -386,7 +318,7 @@ public class CubeManager implements IRealizationProvider {
                 throw ise;
             }
 
-            cube = reloadCubeLocal(cube.getName());
+            cube = crud.reload(cube.getName());
             update.setCubeInstance(cube);
             retry++;
             cube = updateCubeWithRetry(update, retry);
@@ -402,486 +334,607 @@ public class CubeManager implements IRealizationProvider {
             }
         }
 
-        cubeMap.put(cube.getName(), cube);
-
         //this is a duplicate call to take care of scenarios where REST cache service unavailable
         ProjectManager.getInstance(cube.getConfig()).clearL2Cache();
 
         return cube;
     }
 
-    // append a full build segment
-    public CubeSegment appendSegment(CubeInstance cube) throws IOException {
-        return appendSegment(cube, null, null, null, null);
+    public CubeInstance reloadCubeQuietly(String cubeName) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            CubeInstance cube = crud.reloadQuietly(cubeName);
+            if (cube != null)
+                Cuboid.clearCache(cube);
+            return cube;
+        }
     }
 
-    public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException {
-        return appendSegment(cube, tsRange, null, null, null);
+    public void removeCubeLocal(String cubeName) {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            CubeInstance cube = cubeMap.get(cubeName);
+            if (cube != null) {
+                cubeMap.removeLocal(cubeName);
+                for (CubeSegment segment : cube.getSegments()) {
+                    usedStorageLocation.remove(segment.getUuid());
+                }
+                Cuboid.clearCache(cube);
+            }
+        }
     }
 
-    public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException {
-        return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(),
-                src.getSourcePartitionOffsetEnd());
-    }
+    public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            logger.info("Dropping cube '" + cubeName + "'");
+            // load projects before remove cube from project
 
-    CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange,
-            Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd)
-            throws IOException {
-        checkInputRanges(tsRange, segRange);
-        checkBuildingSegment(cube);
+            // delete cube instance and cube desc
+            CubeInstance cube = getCube(cubeName);
+
+            // remove cube and update cache
+            crud.delete(cube);
+            Cuboid.clearCache(cube);
 
-        // fix start/end a bit
-        if (cube.getModel().getPartitionDesc().isPartitioned()) {
-            // if missing start, set it to where last time ends
-            CubeSegment last = cube.getLastSegment();
-            if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) {
-                tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v);
+            if (deleteDesc && cube.getDescriptor() != null) {
+                CubeDescManager.getInstance(config).removeCubeDesc(cube.getDescriptor());
             }
-        } else {
-            // full build
-            tsRange = null;
-            segRange = null;
-        }
 
-        CubeSegment newSegment = newSegment(cube, tsRange, segRange);
-        newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
-        newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
-        validateNewSegments(cube, newSegment);
+            // delete cube from project
+            ProjectManager.getInstance(config).removeRealizationsFromProjects(RealizationType.CUBE, cubeName);
 
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToAddSegs(newSegment);
-        updateCube(cubeBuilder);
-        return newSegment;
+            return cube;
+        }
     }
 
-    public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException {
-        checkInputRanges(tsRange, segRange);
-        checkBuildingSegment(cube);
-
-        if (cube.getModel().getPartitionDesc().isPartitioned() == false) {
-            // full build
-            tsRange = null;
-            segRange = null;
-        }
+    @VisibleForTesting
+    /*private*/ String generateStorageLocation() {
+        String namePrefix = config.getHBaseTableNamePrefix();
+        String tableName = "";
+        Random ran = new Random();
+        do {
+            StringBuffer sb = new StringBuffer();
+            sb.append(namePrefix);
+            for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
+                sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
+            }
+            tableName = sb.toString();
+        } while (this.usedStorageLocation.containsValue(tableName));
+        return tableName;
+    }
 
-        CubeSegment newSegment = newSegment(cube, tsRange, segRange);
+    private boolean isReady(CubeSegment seg) {
+        return seg.getStatus() == SegmentStatusEnum.READY;
+    }
 
-        Pair<Boolean, Boolean> pair = cube.getSegments().fitInSegments(newSegment);
-        if (pair.getFirst() == false || pair.getSecond() == false)
-            throw new IllegalArgumentException("The new refreshing segment " + newSegment
-                    + " does not match any existing segment in cube " + cube);
+    private TableMetadataManager getTableManager() {
+        return TableMetadataManager.getInstance(config);
+    }
 
-        if (segRange != null) {
-            CubeSegment toRefreshSeg = null;
-            for (CubeSegment cubeSegment : cube.getSegments()) {
-                if (cubeSegment.getSegRange().equals(segRange)) {
-                    toRefreshSeg = cubeSegment;
-                    break;
-                }
-            }
+    private DictionaryManager getDictionaryManager() {
+        return DictionaryManager.getInstance(config);
+    }
 
-            if (toRefreshSeg == null) {
-                throw new IllegalArgumentException("For streaming cube, only one segment can be refreshed at one time");
-            }
+    private SnapshotManager getSnapshotManager() {
+        return SnapshotManager.getInstance(config);
+    }
 
-            newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart());
-            newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
-        }
+    private ResourceStore getStore() {
+        return ResourceStore.getStore(this.config);
+    }
 
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToAddSegs(newSegment);
-        updateCube(cubeBuilder);
+    @Override
+    public RealizationType getRealizationType() {
+        return RealizationType.CUBE;
+    }
 
-        return newSegment;
+    @Override
+    public IRealization getRealization(String name) {
+        return getCube(name);
     }
 
-    public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
-            throws IOException {
-        if (cube.getSegments().isEmpty())
-            throw new IllegalArgumentException("Cube " + cube + " has no segments");
-
-        checkInputRanges(tsRange, segRange);
-        checkBuildingSegment(cube);
-        checkCubeIsPartitioned(cube);
-
-        if (cube.getSegments().getFirstSegment().isOffsetCube()) {
-            // offset cube, merge by date range?
-            if (segRange == null && tsRange != null) {
-                Pair<CubeSegment, CubeSegment> pair = cube.getSegments(SegmentStatusEnum.READY)
-                        .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE);
-                if (pair == null)
-                    throw new IllegalArgumentException("Find no segments to merge by " + tsRange + " for cube " + cube);
-                segRange = new SegmentRange(pair.getFirst().getSegRange().start, pair.getSecond().getSegRange().end);
-            }
-            tsRange = null;
-            Preconditions.checkArgument(segRange != null);
-        } else {
-            segRange = null;
-            Preconditions.checkArgument(tsRange != null);
-        }
-
-        CubeSegment newSegment = newSegment(cube, tsRange, segRange);
-
-        Segments<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment);
-        if (mergingSegments.size() <= 1)
-            throw new IllegalArgumentException("Range " + newSegment.getSegRange()
-                    + " must contain at least 2 segments, but there is " + mergingSegments.size());
-
-        CubeSegment first = mergingSegments.get(0);
-        CubeSegment last = mergingSegments.get(mergingSegments.size() - 1);
-        if (force == false) {
-            for (int i = 0; i < mergingSegments.size() - 1; i++) {
-                if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange()))
-                    throw new IllegalStateException("Merging segments must not have gaps between "
-                            + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1));
-            }
-        }
-        if (first.isOffsetCube()) {
-            newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
-            newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
-            newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
-            newSegment.setTSRange(null);
-        } else {
-            newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd()));
-            newSegment.setSegRange(null);
-        }
-
-        if (force == false) {
-            List<String> emptySegment = Lists.newArrayList();
-            for (CubeSegment seg : mergingSegments) {
-                if (seg.getSizeKB() == 0) {
-                    emptySegment.add(seg.getName());
-                }
-            }
+    // ============================================================================
+    // Segment related methods
+    // ============================================================================
 
-            if (emptySegment.size() > 0) {
-                throw new IllegalArgumentException(
-                        "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: "
-                                + emptySegment);
-            }
-        }
+    // append a full build segment
+    public CubeSegment appendSegment(CubeInstance cube) throws IOException {
+        return appendSegment(cube, null, null, null, null);
+    }
 
-        validateNewSegments(cube, newSegment);
+    public CubeSegment appendSegment(CubeInstance cube, TSRange tsRange) throws IOException {
+        return appendSegment(cube, tsRange, null, null, null);
+    }
 
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToAddSegs(newSegment);
-        updateCube(cubeBuilder);
+    public CubeSegment appendSegment(CubeInstance cube, SourcePartition src) throws IOException {
+        return appendSegment(cube, src.getTSRange(), src.getSegRange(), src.getSourcePartitionOffsetStart(),
+                src.getSourcePartitionOffsetEnd());
+    }
 
-        return newSegment;
+    CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange,
+            Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd)
+            throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            return segAssist.appendSegment(cube, tsRange, segRange, sourcePartitionOffsetStart,
+                    sourcePartitionOffsetEnd);
+        }
     }
 
-    private void checkInputRanges(TSRange tsRange, SegmentRange segRange) {
-        if (tsRange != null && segRange != null) {
-            throw new IllegalArgumentException(
-                    "Build or refresh cube segment either by TSRange or by SegmentRange, not both.");
+    public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            return segAssist.refreshSegment(cube, tsRange, segRange);
         }
     }
 
-    private void checkBuildingSegment(CubeInstance cube) {
-        int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
-        if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
-            throw new IllegalStateException(
-                    "There is already " + cube.getBuildingSegments().size() + " building segment; ");
+    public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
+            throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            return segAssist.mergeSegments(cube, tsRange, segRange, force);
         }
     }
 
-    private void checkCubeIsPartitioned(CubeInstance cube) {
-        if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) {
-            throw new IllegalStateException(
-                    "there is no partition date column specified, only full build is supported");
+    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            segAssist.promoteNewlyBuiltSegments(cube, newSegment);
         }
     }
 
-    /**
-     * After cube update, reload cube related cache
-     *
-     * @param cubeName
-     */
-    public CubeInstance reloadCubeLocal(String cubeName) {
-        CubeInstance cubeInstance = reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
-        if (cubeInstance != null)
-            Cuboid.clearCache(cubeInstance);
-        return cubeInstance;
+    public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
+        segAssist.validateNewSegments(cube, newSegments);
     }
 
-    public void removeCubeLocal(String cubeName) {
-        CubeInstance cube = cubeMap.get(cubeName);
-        if (cube != null) {
-            cubeMap.removeLocal(cubeName);
-            for (CubeSegment segment : cube.getSegments()) {
-                usedStorageLocation.remove(segment.getUuid());
-            }
-            Cuboid.clearCache(cube);
-        }
+    public List<CubeSegment> calculateHoles(String cubeName) {
+        return segAssist.calculateHoles(cubeName);
     }
 
-    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+    private class SegmentAssist {
+
+        CubeSegment appendSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange,
+                Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd)
+                throws IOException {
+            checkInputRanges(tsRange, segRange);
+            checkBuildingSegment(cube);
+
+            // fix start/end a bit
+            if (cube.getModel().getPartitionDesc().isPartitioned()) {
+                // if missing start, set it to where last time ends
+                CubeSegment last = cube.getLastSegment();
+                if (last != null && !last.isOffsetCube() && tsRange.start.v == 0) {
+                    tsRange = new TSRange(last.getTSRange().end.v, tsRange.end.v);
+                }
+            } else {
+                // full build
+                tsRange = null;
+                segRange = null;
+            }
 
-        String tableName = join.getPKSide().getTableIdentity();
-        String[] pkCols = join.getPrimaryKey();
-        String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
-        if (snapshotResPath == null)
-            throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
-                    + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+            CubeSegment newSegment = newSegment(cube, tsRange, segRange);
+            newSegment.setSourcePartitionOffsetStart(sourcePartitionOffsetStart);
+            newSegment.setSourcePartitionOffsetEnd(sourcePartitionOffsetEnd);
+            validateNewSegments(cube, newSegment);
 
-        try {
-            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
-            TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject());
-            return new LookupStringTable(tableDesc, pkCols, snapshot);
-        } catch (IOException e) {
-            throw new IllegalStateException(
-                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToAddSegs(newSegment);
+            updateCube(cubeBuilder);
+            return newSegment;
         }
-    }
 
-    private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) {
-        DataModelDesc modelDesc = cube.getModel();
+        public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange)
+                throws IOException {
+            checkInputRanges(tsRange, segRange);
+            checkBuildingSegment(cube);
 
-        CubeSegment segment = new CubeSegment();
-        segment.setUuid(UUID.randomUUID().toString());
-        segment.setName(CubeSegment.makeSegmentName(tsRange, segRange, modelDesc));
-        segment.setCreateTimeUTC(System.currentTimeMillis());
-        segment.setCubeInstance(cube);
+            if (cube.getModel().getPartitionDesc().isPartitioned() == false) {
+                // full build
+                tsRange = null;
+                segRange = null;
+            }
 
-        // let full build range be backward compatible
-        if (tsRange == null && segRange == null)
-            tsRange = new TSRange(0L, Long.MAX_VALUE);
+            CubeSegment newSegment = newSegment(cube, tsRange, segRange);
 
-        segment.setTSRange(tsRange);
-        segment.setSegRange(segRange);
-        segment.setStatus(SegmentStatusEnum.NEW);
-        segment.setStorageLocationIdentifier(generateStorageLocation());
+            Pair<Boolean, Boolean> pair = cube.getSegments().fitInSegments(newSegment);
+            if (pair.getFirst() == false || pair.getSecond() == false)
+                throw new IllegalArgumentException("The new refreshing segment " + newSegment
+                        + " does not match any existing segment in cube " + cube);
 
-        segment.setCubeInstance(cube);
+            if (segRange != null) {
+                CubeSegment toRefreshSeg = null;
+                for (CubeSegment cubeSegment : cube.getSegments()) {
+                    if (cubeSegment.getSegRange().equals(segRange)) {
+                        toRefreshSeg = cubeSegment;
+                        break;
+                    }
+                }
 
-        segment.validate();
-        return segment;
-    }
+                if (toRefreshSeg == null) {
+                    throw new IllegalArgumentException(
+                            "For streaming cube, only one segment can be refreshed at one time");
+                }
 
-    @VisibleForTesting
-    /*private*/ String generateStorageLocation() {
-        String namePrefix = config.getHBaseTableNamePrefix();
-        String tableName = "";
-        Random ran = new Random();
-        do {
-            StringBuffer sb = new StringBuffer();
-            sb.append(namePrefix);
-            for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
-                sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
+                newSegment.setSourcePartitionOffsetStart(toRefreshSeg.getSourcePartitionOffsetStart());
+                newSegment.setSourcePartitionOffsetEnd(toRefreshSeg.getSourcePartitionOffsetEnd());
             }
-            tableName = sb.toString();
-        } while (this.usedStorageLocation.containsValue(tableName));
-        return tableName;
-    }
 
-    public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
-        if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
-            throw new IllegalStateException(
-                    "For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier");
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToAddSegs(newSegment);
+            updateCube(cubeBuilder);
 
-        if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
-            throw new IllegalStateException("For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID");
-
-        if (isReady(newSegment) == true) {
-            logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
+            return newSegment;
         }
 
-        List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment);
+        public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
+                throws IOException {
+            if (cube.getSegments().isEmpty())
+                throw new IllegalArgumentException("Cube " + cube + " has no segments");
+
+            checkInputRanges(tsRange, segRange);
+            checkBuildingSegment(cube);
+            checkCubeIsPartitioned(cube);
+
+            if (cube.getSegments().getFirstSegment().isOffsetCube()) {
+                // offset cube, merge by date range?
+                if (segRange == null && tsRange != null) {
+                    Pair<CubeSegment, CubeSegment> pair = cube.getSegments(SegmentStatusEnum.READY)
+                            .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE);
+                    if (pair == null)
+                        throw new IllegalArgumentException(
+                                "Find no segments to merge by " + tsRange + " for cube " + cube);
+                    segRange = new SegmentRange(pair.getFirst().getSegRange().start,
+                            pair.getSecond().getSegRange().end);
+                }
+                tsRange = null;
+                Preconditions.checkArgument(segRange != null);
+            } else {
+                segRange = null;
+                Preconditions.checkArgument(tsRange != null);
+            }
+
+            CubeSegment newSegment = newSegment(cube, tsRange, segRange);
+
+            Segments<CubeSegment> mergingSegments = cube.getMergingSegments(newSegment);
+            if (mergingSegments.size() <= 1)
+                throw new IllegalArgumentException("Range " + newSegment.getSegRange()
+                        + " must contain at least 2 segments, but there is " + mergingSegments.size());
+
+            CubeSegment first = mergingSegments.get(0);
+            CubeSegment last = mergingSegments.get(mergingSegments.size() - 1);
+            if (force == false) {
+                for (int i = 0; i < mergingSegments.size() - 1; i++) {
+                    if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange()))
+                        throw new IllegalStateException("Merging segments must not have gaps between "
+                                + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1));
+                }
+            }
+            if (first.isOffsetCube()) {
+                newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
+                newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+                newSegment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+                newSegment.setTSRange(null);
+            } else {
+                newSegment.setTSRange(new TSRange(mergingSegments.getTSStart(), mergingSegments.getTSEnd()));
+                newSegment.setSegRange(null);
+            }
+
+            if (force == false) {
+                List<String> emptySegment = Lists.newArrayList();
+                for (CubeSegment seg : mergingSegments) {
+                    if (seg.getSizeKB() == 0) {
+                        emptySegment.add(seg.getName());
+                    }
+                }
+
+                if (emptySegment.size() > 0) {
+                    throw new IllegalArgumentException(
+                            "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: "
+                                    + emptySegment);
+                }
+            }
 
-        if (tobe.contains(newSegment) == false)
-            throw new IllegalStateException(
-                    "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
+            validateNewSegments(cube, newSegment);
 
-        newSegment.setStatus(SegmentStatusEnum.READY);
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToAddSegs(newSegment);
+            updateCube(cubeBuilder);
 
-        List<CubeSegment> toRemoveSegs = Lists.newArrayList();
-        for (CubeSegment segment : cube.getSegments()) {
-            if (!tobe.contains(segment))
-                toRemoveSegs.add(segment);
+            return newSegment;
         }
 
-        logger.info("Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs);
+        private void checkInputRanges(TSRange tsRange, SegmentRange segRange) {
+            if (tsRange != null && segRange != null) {
+                throw new IllegalArgumentException(
+                        "Build or refresh cube segment either by TSRange or by SegmentRange, not both.");
+            }
+        }
 
-        CubeUpdate cubeBuilder = new CubeUpdate(cube);
-        cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()]))
-                .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
-        updateCube(cubeBuilder);
-    }
+        private void checkBuildingSegment(CubeInstance cube) {
+            int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
+            if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
+                throw new IllegalStateException(
+                        "There is already " + cube.getBuildingSegments().size() + " building segment; ");
+            }
+        }
 
-    public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
-        List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
-        List<CubeSegment> newList = Arrays.asList(newSegments);
-        if (tobe.containsAll(newList) == false) {
-            throw new IllegalStateException("For cube " + cube + ", the new segments " + newList
-                    + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
+        private void checkCubeIsPartitioned(CubeInstance cube) {
+            if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned() == false) {
+                throw new IllegalStateException(
+                        "there is no partition date column specified, only full build is supported");
+            }
         }
-    }
 
-    private boolean isReady(CubeSegment seg) {
-        return seg.getStatus() == SegmentStatusEnum.READY;
-    }
+        private CubeSegment newSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) {
+            DataModelDesc modelDesc = cube.getModel();
 
-    private void loadAllCubeInstance() throws IOException {
-        ResourceStore store = getStore();
-        List<String> paths = store.collectResourceRecursively(ResourceStore.CUBE_RESOURCE_ROOT, ".json");
+            CubeSegment segment = new CubeSegment();
+            segment.setUuid(UUID.randomUUID().toString());
+            segment.setName(CubeSegment.makeSegmentName(tsRange, segRange, modelDesc));
+            segment.setCreateTimeUTC(System.currentTimeMillis());
+            segment.setCubeInstance(cube);
 
-        logger.info("Loading Cube from folder " + store.getReadableResourcePath(ResourceStore.CUBE_RESOURCE_ROOT));
+            // let full build range be backward compatible
+            if (tsRange == null && segRange == null)
+                tsRange = new TSRange(0L, Long.MAX_VALUE);
 
-        int succeed = 0;
-        int fail = 0;
-        for (String path : paths) {
-            CubeInstance cube = reloadCubeLocalAt(path);
-            if (cube == null) {
-                fail++;
-            } else {
-                succeed++;
-            }
+            segment.setTSRange(tsRange);
+            segment.setSegRange(segRange);
+            segment.setStatus(SegmentStatusEnum.NEW);
+            segment.setStorageLocationIdentifier(generateStorageLocation());
+
+            segment.setCubeInstance(cube);
+
+            segment.validate();
+            return segment;
         }
 
-        logger.info("Loaded " + succeed + " cubes, fail on " + fail + " cubes");
-    }
+        public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment newSegment) throws IOException {
+            if (StringUtils.isBlank(newSegment.getStorageLocationIdentifier()))
+                throw new IllegalStateException(
+                        "For cube " + cube + ", segment " + newSegment + " missing StorageLocationIdentifier");
 
-    private CubeInstance reloadCubeLocalAt(String path) {
-        ResourceStore store = getStore();
-        CubeInstance cube;
+            if (StringUtils.isBlank(newSegment.getLastBuildJobID()))
+                throw new IllegalStateException(
+                        "For cube " + cube + ", segment " + newSegment + " missing LastBuildJobID");
 
-        try {
-            cube = store.getResource(path, CubeInstance.class, CUBE_SERIALIZER);
-            if (cube == null) {
-                return cube;
+            if (isReady(newSegment) == true) {
+                logger.warn("For cube " + cube + ", segment " + newSegment + " state should be NEW but is READY");
             }
 
-            String cubeName = cube.getName();
-            checkState(StringUtils.isNotBlank(cubeName), "cube (at %s) name must not be blank", path);
+            List<CubeSegment> tobe = cube.calculateToBeSegments(newSegment);
 
-            CubeDesc cubeDesc = CubeDescManager.getInstance(config).getCubeDesc(cube.getDescName());
-            checkNotNull(cubeDesc, "cube descriptor '%s' (for cube '%s') not found", cube.getDescName(), cubeName);
+            if (tobe.contains(newSegment) == false)
+                throw new IllegalStateException(
+                        "For cube " + cube + ", segment " + newSegment + " is expected but not in the tobe " + tobe);
 
-            if (!cubeDesc.getError().isEmpty()) {
-                cube.setStatus(RealizationStatusEnum.DESCBROKEN);
-                logger.error("cube descriptor {} (for cube '{}') is broken", cubeDesc.getResourcePath(), cubeName);
-                for (String error : cubeDesc.getError()) {
-                    logger.error("Error: {}", error);
-                }
-            } else if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
-                cube.setStatus(RealizationStatusEnum.DISABLED);
-                logger.info("cube {} changed from DESCBROKEN to DISABLED", cubeName);
+            newSegment.setStatus(SegmentStatusEnum.READY);
+
+            List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+            for (CubeSegment segment : cube.getSegments()) {
+                if (!tobe.contains(segment))
+                    toRemoveSegs.add(segment);
             }
 
-            cube.setConfig((KylinConfigExt) cubeDesc.getConfig());
-            cubeMap.putLocal(cubeName, cube);
+            logger.info(
+                    "Promoting cube " + cube + ", new segment " + newSegment + ", to remove segments " + toRemoveSegs);
 
-            for (CubeSegment segment : cube.getSegments()) {
-                usedStorageLocation.put(segment.getUuid(), segment.getStorageLocationIdentifier());
+            CubeUpdate cubeBuilder = new CubeUpdate(cube);
+            cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()]))
+                    .setToUpdateSegs(newSegment).setStatus(RealizationStatusEnum.READY);
+            updateCube(cubeBuilder);
+        }
+
+        public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
+            List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
+            List<CubeSegment> newList = Arrays.asList(newSegments);
+            if (tobe.containsAll(newList) == false) {
+                throw new IllegalStateException("For cube " + cube + ", the new segments " + newList
+                        + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
             }
+        }
 
-            logger.info("Reloaded cube {} being {} having {} segments", cubeName, cube, cube.getSegments().size());
-            return cube;
+        /**
+         * Calculate the holes (gaps) in segments.
+         * @param cubeName
+         * @return
+         */
+        public List<CubeSegment> calculateHoles(String cubeName) {
+            List<CubeSegment> holes = Lists.newArrayList();
+            final CubeInstance cube = getCube(cubeName);
+            DataModelDesc modelDesc = cube.getModel();
+            Preconditions.checkNotNull(cube);
+            final List<CubeSegment> segments = cube.getSegments();
+            logger.info("totally " + segments.size() + " cubeSegments");
+            if (segments.size() == 0) {
+                return holes;
+            }
 
-        } catch (Exception e) {
-            logger.error("Error during load cube instance, skipping : " + path, e);
-            return null;
+            Collections.sort(segments);
+            for (int i = 0; i < segments.size() - 1; ++i) {
+                CubeSegment first = segments.get(i);
+                CubeSegment second = segments.get(i + 1);
+                if (first.getSegRange().connects(second.getSegRange()))
+                    continue;
+
+                if (first.getSegRange().apartBefore(second.getSegRange())) {
+                    CubeSegment hole = new CubeSegment();
+                    hole.setCubeInstance(cube);
+                    if (first.isOffsetCube()) {
+                        hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start));
+                        hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
+                        hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
+                        hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange(), modelDesc));
+                    } else {
+                        hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v));
+                        hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null, modelDesc));
+                    }
+                    holes.add(hole);
+                }
+            }
+            return holes;
         }
+
     }
 
-    private TableMetadataManager getTableManager() {
-        return TableMetadataManager.getInstance(config);
+    // ============================================================================
+    // Dictionary/Snapshot related methods
+    // ============================================================================
+
+    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+            throws IOException {
+        return dictAssist.buildDictionary(cubeSeg, col, inpTable);
     }
 
-    private DictionaryManager getDictionaryManager() {
-        return DictionaryManager.getInstance(config);
+    public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+            Dictionary<String> dict) throws IOException {
+        return dictAssist.saveDictionary(cubeSeg, col, inpTable, dict);
     }
 
-    private SnapshotManager getSnapshotManager() {
-        return SnapshotManager.getInstance(config);
+    /**
+     * return null if no dictionary for given column
+     */
+    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+        return dictAssist.getDictionary(cubeSeg, col);
     }
 
-    private ResourceStore getStore() {
-        return ResourceStore.getStore(this.config);
+    public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {
+        return dictAssist.buildSnapshotTable(cubeSeg, lookupTable);
     }
 
-    @Override
-    public RealizationType getRealizationType() {
-        return RealizationType.CUBE;
+    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+        return dictAssist.getLookupTable(cubeSegment, join);
     }
 
-    @Override
-    public IRealization getRealization(String name) {
-        return getCube(name);
+    //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+    public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+        return dictAssist.getUHCIndex(cubeDesc);
     }
 
-    /**
-     * Calculate the holes (gaps) in segments.
-     * @param cubeName
-     * @return
-     */
-    public List<CubeSegment> calculateHoles(String cubeName) {
-        List<CubeSegment> holes = Lists.newArrayList();
-        final CubeInstance cube = getCube(cubeName);
-        DataModelDesc modelDesc = cube.getModel();
-        Preconditions.checkNotNull(cube);
-        final List<CubeSegment> segments = cube.getSegments();
-        logger.info("totally " + segments.size() + " cubeSegments");
-        if (segments.size() == 0) {
-            return holes;
+    private class DictionaryAssist {
+        public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable)
+                throws IOException {
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+                return null;
+
+            String builderClass = cubeDesc.getDictionaryBuilderClass(col);
+            DictionaryInfo dictInfo = getDictionaryManager().buildDictionary(col, inpTable, builderClass);
+
+            saveDictionaryInfo(cubeSeg, col, dictInfo);
+            return dictInfo;
         }
 
-        Collections.sort(segments);
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getSegRange().connects(second.getSegRange()))
-                continue;
-
-            if (first.getSegRange().apartBefore(second.getSegRange())) {
-                CubeSegment hole = new CubeSegment();
-                hole.setCubeInstance(cube);
-                if (first.isOffsetCube()) {
-                    hole.setSegRange(new SegmentRange(first.getSegRange().end, second.getSegRange().start));
-                    hole.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetEnd());
-                    hole.setSourcePartitionOffsetEnd(second.getSourcePartitionOffsetStart());
-                    hole.setName(CubeSegment.makeSegmentName(null, hole.getSegRange(), modelDesc));
-                } else {
-                    hole.setTSRange(new TSRange(first.getTSRange().end.v, second.getTSRange().start.v));
-                    hole.setName(CubeSegment.makeSegmentName(hole.getTSRange(), null, modelDesc));
-                }
-                holes.add(hole);
+        public DictionaryInfo saveDictionary(CubeSegment cubeSeg, TblColRef col, IReadableTable inpTable,
+                Dictionary<String> dict) throws IOException {
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col))
+                return null;
+
+            DictionaryInfo dictInfo = getDictionaryManager().saveDictionary(col, inpTable, dict);
+
+            saveDictionaryInfo(cubeSeg, col, dictInfo);
+            return dictInfo;
+        }
+
+        private void saveDictionaryInfo(CubeSegment cubeSeg, TblColRef col, DictionaryInfo dictInfo)
+                throws IOException {
+            if (dictInfo != null) {
+                Dictionary<?> dict = dictInfo.getDictionaryObject();
+                cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
+                cubeSeg.getRowkeyStats().add(new Object[] { col.getIdentity(), dict.getSize(), dict.getSizeOfId() });
+
+                CubeUpdate update = new CubeUpdate(cubeSeg.getCubeInstance());
+                update.setToUpdateSegs(cubeSeg);
+                updateCube(update);
             }
         }
-        return holes;
-    }
 
-    private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+        /**
+         * return null if no dictionary for given column
+         */
+        @SuppressWarnings("unchecked")
+        public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+            DictionaryInfo info = null;
+            try {
+                DictionaryManager dictMgr = getDictionaryManager();
+                String dictResPath = cubeSeg.getDictResPath(col);
+                if (dictResPath == null)
+                    return null;
+
+                info = dictMgr.getDictionaryInfo(dictResPath);
+                if (info == null)
+                    throw new IllegalStateException("No dictionary found by " + dictResPath
+                            + ", invalid cube state; cube segment" + cubeSeg + ", col " + col);
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col,
+                        e);
+            }
+            return (Dictionary<String>) info.getDictionaryObject();
+        }
 
-    //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
-    public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
-        List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
-        int[] uhcIndex = new int[dictCols.size()];
-
-        //add GlobalDictionaryColumns
-        List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
-        if (dictionaryDescList != null) {
-            for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
-                if (dictionaryDesc.getBuilderClass() != null
-                        && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
-                    for (int i = 0; i < dictCols.size(); i++) {
-                        if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
-                            uhcIndex[i] = 1;
-                            break;
+        public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {
+            TableMetadataManager metaMgr = getTableManager();
+            SnapshotManager snapshotMgr = getSnapshotManager();
+
+            TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable, cubeSeg.getProject()));
+            IReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc);
+            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
+
+            cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+            CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance());
+            cubeBuilder.setToUpdateSegs(cubeSeg);
+            updateCube(cubeBuilder);
+
+            return snapshot;
+        }
+
+        public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+
+            String tableName = join.getPKSide().getTableIdentity();
+            String[] pkCols = join.getPrimaryKey();
+            String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
+            if (snapshotResPath == null)
+                throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
+                        + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+
+            try {
+                SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
+                TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject());
+                return new LookupStringTable(tableDesc, pkCols, snapshot);
+            } catch (IOException e) {
+                throw new IllegalStateException(
+                        "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+            }
+        }
+
+        private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+        //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+        public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+            List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
+            int[] uhcIndex = new int[dictCols.size()];
+
+            //add GlobalDictionaryColumns
+            List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
+            if (dictionaryDescList != null) {
+                for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+                    if (dictionaryDesc.getBuilderClass() != null
+                            && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
+                        for (int i = 0; i < dictCols.size(); i++) {
+                            if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
+                                uhcIndex[i] = 1;
+                                break;
+                            }
                         }
                     }
                 }
             }
-        }
 
-        //add ShardByColumns
-        Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
-        for (int i = 0; i < dictCols.size(); i++) {
-            if (shardByColumns.contains(dictCols.get(i))) {
-                uhcIndex[i] = 1;
+            //add ShardByColumns
+            Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
+            for (int i = 0; i < dictCols.size(); i++) {
+                if (shardByColumns.contains(dictCols.get(i))) {
+                    uhcIndex[i] = 1;
+                }
             }
-        }
 
-        return uhcIndex;
+            return uhcIndex;
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f2f487fe/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java
index d07c93b..2eaebb1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/CubeSignatureRefresher.java
@@ -97,7 +97,7 @@ public class CubeSignatureRefresher {
             String calculatedSign = cubeDesc.calculateSignature();
             if (cubeDesc.getSignature() == null || (!cubeDesc.getSignature().equals(calculatedSign))) {
                 cubeDesc.setSignature(calculatedSign);
-                store.putResource(cubeDesc.getResourcePath(), cubeDesc, CubeDescManager.CUBE_DESC_SERIALIZER);
+                store.putResource(cubeDesc.getResourcePath(), cubeDesc, CubeDesc.newSerializerForLowLevelAccess());
                 updatedResources.add(cubeDesc.getResourcePath());
             }
         } catch (Exception e) {