You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/21 00:27:32 UTC

[kylin] branch KYLIN-3221 created (now 3826a3d)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch KYLIN-3221
in repository https://gitbox.apache.org/repos/asf/kylin.git.


      at 3826a3d  KYLIN-3377 Some improvements for lookup table - snapshot management

This branch includes the following new commits:

     new fd6dd9c  KYLIN-3373 Some improvements for lookup table - UI part change
     new 7f8d018  KYLIN-3374 Some improvements for lookup table - metadata change
     new 45f998b  KYLIN-3375 Some improvements for lookup table - build change
     new 1d7ec09  KYLIN-3376 Some improvements for lookup table - query change
     new 3826a3d  KYLIN-3377 Some improvements for lookup table - snapshot management

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.

[kylin] 04/05: KYLIN-3376 Some improvements for lookup table - query change

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3221
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1d7ec09cb0e0e2c126a5cbe3b83491ce04278a69
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu May 10 13:09:11 2018 +0800

    KYLIN-3376 Some improvements for lookup table - query change
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../apache/kylin/cube/CubeCapabilityChecker.java   |   2 +-
 core-dictionary/pom.xml                            |   5 +
 .../kylin/dict/lookup/LookupStringTable.java       |  12 +-
 .../org/apache/kylin/dict/lookup/LookupTable.java  |   4 +-
 .../dict/lookup/cache/RocksDBLookupBuilder.java    |  83 ++++
 .../dict/lookup/cache/RocksDBLookupRowEncoder.java |  70 ++++
 .../dict/lookup/cache/RocksDBLookupTable.java      | 113 ++++++
 .../dict/lookup/cache/RocksDBLookupTableCache.java | 420 +++++++++++++++++++++
 .../lookup/cache/RocksDBLookupRowEncoderTest.java  |  80 ++++
 .../lookup/cache/RocksDBLookupTableCacheTest.java  | 220 +++++++++++
 .../dict/lookup/cache/RocksDBLookupTableTest.java  | 161 ++++++++
 .../org/apache/kylin/job/SelfStopExecutable.java   |   2 +-
 .../kylin/storage/gtrecord/CubeTupleConverter.java |  63 ++--
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |  14 +-
 .../kylin/storage/gtrecord/ITupleConverter.java    |   3 +-
 .../storage/gtrecord/SegmentCubeTupleIterator.java |   1 +
 .../storage/translate/DerivedFilterTranslator.java |   6 +-
 pom.xml                                            |   6 +
 .../query/enumerator/LookupTableEnumerator.java    |  19 +-
 19 files changed, 1224 insertions(+), 60 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 5dffd96..e12b689 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -75,7 +75,7 @@ public class CubeCapabilityChecker {
             }
         } else {
             //for non query-on-facttable 
-            if (cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable)) {
+            if (cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable) || cube.getSnapshots().containsKey(digest.factTable)) {
 
                 Set<TblColRef> dimCols = Sets.newHashSet(cube.getModel().findFirstTable(digest.factTable).getColumns());
 
diff --git a/core-dictionary/pom.xml b/core-dictionary/pom.xml
index 7e8bee1..d82d204 100644
--- a/core-dictionary/pom.xml
+++ b/core-dictionary/pom.xml
@@ -38,6 +38,11 @@
             <artifactId>kylin-core-metadata</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
+        </dependency>
+
         <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
index 886de22..1d0348a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict.lookup;
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.Iterator;
 
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.datatype.DataType;
@@ -31,7 +32,7 @@ import org.apache.kylin.source.IReadableTable;
  * @author yangli9
  * 
  */
-public class LookupStringTable extends LookupTable<String> {
+public class LookupStringTable extends LookupTable<String> implements ILookupTable{
 
     private static final Comparator<String> dateStrComparator = new Comparator<String>() {
         @Override
@@ -109,4 +110,13 @@ public class LookupStringTable extends LookupTable<String> {
         return String.class;
     }
 
+    @Override
+    public Iterator<String[]> iterator() {
+        return data.values().iterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
 }
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index a99ef29..e1123fa 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -22,10 +22,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.Array;
@@ -53,7 +53,7 @@ abstract public class LookupTable<T> {
         this.tableDesc = tableDesc;
         this.keyColumns = keyColumns;
         this.table = table;
-        this.data = new ConcurrentHashMap<Array<T>, T[]>();
+        this.data = new HashMap<Array<T>, T[]>();
         init();
     }
 
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java
new file mode 100644
index 0000000..269684e
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.util.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocksDBLookupBuilder {
+    private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupBuilder.class);
+
+    static {
+        RocksDB.loadLibrary();
+    }
+    private Options options;
+    private String dbPath;
+    private TableDesc tableDesc;
+    private RocksDBLookupRowEncoder encoder;
+    private int writeBatchSize;
+
+    public RocksDBLookupBuilder(TableDesc tableDesc, String[] keyColumns, String dbPath) {
+        this.tableDesc = tableDesc;
+        this.encoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns);
+        this.dbPath = dbPath;
+        this.writeBatchSize = 500;
+        this.options = new Options();
+        options.setCreateIfMissing(true).setWriteBufferSize(8 * SizeUnit.KB).setMaxWriteBufferNumber(3)
+                .setMaxBackgroundCompactions(5).setCompressionType(CompressionType.SNAPPY_COMPRESSION)
+                .setCompactionStyle(CompactionStyle.UNIVERSAL);
+
+    }
+
+    public void build(ILookupTable srcLookupTable) {
+        File dbFolder = new File(dbPath);
+        if (dbFolder.exists()) {
+            logger.info("remove rocksdb folder:{} to rebuild table cache:{}", dbPath, tableDesc.getIdentity());
+            FileUtils.deleteQuietly(dbFolder);
+        } else {
+            logger.info("create new rocksdb folder:{} for table cache:{}", dbPath, tableDesc.getIdentity());
+            dbFolder.mkdirs();
+        }
+        logger.info("start to build lookup table:{} to rocks db:{}", tableDesc.getIdentity(), dbPath);
+        try (RocksDB rocksDB = RocksDB.open(options, dbPath)) {
+            // todo use batch may improve write performance
+            for (String[] row : srcLookupTable) {
+                KV kv = encoder.encode(row);
+
+                rocksDB.put(kv.getKey(), kv.getValue());
+            }
+        } catch (RocksDBException e) {
+            throw new RuntimeException("error when write data to rocks db", e);
+        }
+
+        logger.info("source table:{} has been written to rocks db:{}", tableDesc.getIdentity(), dbPath);
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java
new file mode 100644
index 0000000..7455a7b
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import org.apache.kylin.dict.lookup.AbstractLookupRowEncoder;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * encode/decode original table row to rocksDB KV
+ * 
+ */
+public class RocksDBLookupRowEncoder extends AbstractLookupRowEncoder<KV>{
+
+    public RocksDBLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) {
+        super(tableDesc, keyColumns);
+    }
+
+    public KV encode(String[] row) {
+        String[] keys = getKeyData(row);
+        String[] values = getValueData(row);
+        byte[] encodeKey = encodeStringsWithLenPfx(keys, false);
+        byte[] encodeValue = encodeStringsWithLenPfx(values, true);
+
+        return new KV(encodeKey, encodeValue);
+    }
+
+    public String[] decode(KV kv) {
+        String[] result = new String[columnsNum];
+
+        decodeFromLenPfxBytes(kv.key, keyIndexes, result);
+        decodeFromLenPfxBytes(kv.value, valueIndexes, result);
+
+        return result;
+    }
+
+    public static class KV {
+        private byte[] key;
+        private byte[] value;
+
+        public KV(byte[] key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public byte[] getKey() {
+            return key;
+        }
+
+        public byte[] getValue() {
+            return value;
+        }
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java
new file mode 100644
index 0000000..4f0c816
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocksDBLookupTable implements ILookupTable {
+    private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTable.class);
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    private TableDesc tableDesc;
+    private RocksDB rocksDB;
+    private Options options;
+
+    private RocksDBLookupRowEncoder rowEncoder;
+
+    public RocksDBLookupTable(TableDesc tableDesc, String[] keyColumns, String dbPath) {
+        this.tableDesc = tableDesc;
+        this.options = new Options();
+        this.rowEncoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns);
+        try {
+            this.rocksDB = RocksDB.openReadOnly(options, dbPath);
+        } catch (RocksDBException e) {
+            throw new RuntimeException("cannot open rocks db in path:" + dbPath, e);
+        }
+    }
+
+    @Override
+    public String[] getRow(Array<String> key) {
+        byte[] encodeKey = rowEncoder.encodeStringsWithLenPfx(key.data, false);
+        try {
+            byte[] value = rocksDB.get(encodeKey);
+            if (value == null) {
+                return null;
+            }
+            return rowEncoder.decode(new KV(encodeKey, value));
+        } catch (RocksDBException e) {
+            throw new RuntimeException("error when get key from rocksdb", e);
+        }
+    }
+
+    @Override
+    public Iterator<String[]> iterator() {
+        final RocksIterator rocksIterator = rocksDB.newIterator();
+        rocksIterator.seekToFirst();
+
+        return new Iterator<String[]>() {
+            int counter;
+            @Override
+            public boolean hasNext() {
+                boolean valid = rocksIterator.isValid();
+                if (!valid) {
+                    rocksIterator.close();
+                }
+                return valid;
+            }
+
+            @Override
+            public String[] next() {
+                counter ++;
+                if (counter % 100000 == 0) {
+                    logger.info("scanned {} rows from rocksDB", counter);
+                }
+                String[] result = rowEncoder.decode(new KV(rocksIterator.key(), rocksIterator.value()));
+                rocksIterator.next();
+                return result;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("not support operation");
+            }
+        };
+    }
+
+    @Override
+    public void close() throws IOException {
+        options.close();
+        if (rocksDB != null) {
+            rocksDB.close();
+        }
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
new file mode 100644
index 0000000..460c448
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Predicate;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+
+public class RocksDBLookupTableCache implements IExtLookupTableCache {
+    private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTableCache.class);
+
+    private static final String CACHE_TYPE_ROCKSDB = "rocksdb";
+    private static final String STATE_FILE = "STATE";
+    private static final String DB_FILE = "db";
+
+    private String basePath;
+    private long maxCacheSizeInKB;
+    private Cache<String, CachedTableInfo> tablesCache;
+
+    private ConcurrentMap<String, Boolean> inBuildingTables = Maps.newConcurrentMap();
+
+    private ExecutorService cacheBuildExecutor;
+    private ScheduledExecutorService cacheStateCheckExecutor;
+    private CacheStateChecker cacheStateChecker;
+
+    // static cached instances
+    private static final ConcurrentMap<KylinConfig, RocksDBLookupTableCache> SERVICE_CACHE = new ConcurrentHashMap<>();
+
+    public static RocksDBLookupTableCache getInstance(KylinConfig config) {
+        RocksDBLookupTableCache r = SERVICE_CACHE.get(config);
+        if (r == null) {
+            synchronized (RocksDBLookupTableCache.class) {
+                r = SERVICE_CACHE.get(config);
+                if (r == null) {
+                    r = new RocksDBLookupTableCache(config);
+                    SERVICE_CACHE.put(config, r);
+                    if (SERVICE_CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return r;
+    }
+
+    public static void clearCache() {
+        synchronized (SERVICE_CACHE) {
+            SERVICE_CACHE.clear();
+        }
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
+
+    private RocksDBLookupTableCache(KylinConfig config) {
+        this.config = config;
+        init();
+    }
+
+    private void init() {
+        this.basePath = getCacheBasePath(config);
+
+        this.maxCacheSizeInKB = (long) (config.getExtTableSnapshotLocalCacheMaxSizeGB() * 1024 * 1024);
+        this.tablesCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, CachedTableInfo>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, CachedTableInfo> notification) {
+                logger.warn(notification.getValue() + " is removed " + "because of " + notification.getCause());
+                notification.getValue().cleanStorage();
+            }
+        }).maximumWeight(maxCacheSizeInKB).weigher(new Weigher<String, CachedTableInfo>() {
+            @Override
+            public int weigh(String key, CachedTableInfo value) {
+                return value.getSizeInKB();
+            }
+        }).build();
+        restoreCacheState();
+        cacheStateChecker = new CacheStateChecker();
+        initExecutors();
+    }
+
+    protected static String getCacheBasePath(KylinConfig config) {
+        String basePath = config.getExtTableSnapshotLocalCachePath();
+        if (!basePath.startsWith("/")) {
+            basePath = KylinConfig.getKylinHome() + File.separator + basePath;
+        }
+        return basePath + File.separator + CACHE_TYPE_ROCKSDB;
+    }
+
+    private void restoreCacheState() {
+        File dbBaseFolder = new File(basePath);
+        if (!dbBaseFolder.exists()) {
+            dbBaseFolder.mkdirs();
+        }
+        Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(dbBaseFolder);
+        for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) {
+            for (File snapshotFolder : tableSnapshotsEntry.getValue()) {
+                initSnapshotState(tableSnapshotsEntry.getKey(), snapshotFolder);
+            }
+        }
+    }
+
+    private Map<String, File[]> getCachedTableSnapshotsFolders(File dbBaseFolder) {
+        Map<String, File[]> result = Maps.newHashMap();
+        File[] tableFolders = dbBaseFolder.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File file) {
+                return file.isDirectory();
+            }
+        });
+        if (tableFolders == null) {
+            return result;
+        }
+
+        for (File tableFolder : tableFolders) {
+            String tableName = tableFolder.getName();
+            File[] snapshotFolders = tableFolder.listFiles(new FileFilter() {
+                @Override
+                public boolean accept(File snapshotFile) {
+                    return snapshotFile.isDirectory();
+                }
+            });
+            result.put(tableName, snapshotFolders);
+        }
+        return result;
+    }
+
+    private void initSnapshotState(String tableName, File snapshotCacheFolder) {
+        String snapshotID = snapshotCacheFolder.getName();
+        File stateFile = getCacheStateFile(snapshotCacheFolder.getAbsolutePath());
+        if (stateFile.exists()) {
+            try {
+                String stateStr = Files.toString(stateFile, Charsets.UTF_8);
+                String resourcePath = ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID);
+                if (CacheState.AVAILABLE.name().equals(stateStr)) {
+                    tablesCache.put(resourcePath, new CachedTableInfo(snapshotCacheFolder.getAbsolutePath()));
+                }
+            } catch (IOException e) {
+                logger.error("error to read state file:" + stateFile.getAbsolutePath());
+            }
+        }
+    }
+
+    private void initExecutors() {
+        this.cacheBuildExecutor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("lookup-cache-build-thread"));
+        this.cacheStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(
+                "lookup-cache-state-checker"));
+        cacheStateCheckExecutor.scheduleAtFixedRate(cacheStateChecker, 10, 10 * 60, TimeUnit.SECONDS); // check every 10 minutes
+    }
+
+    @Override
+    public ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist) {
+        String resourcePath = extTableSnapshotInfo.getResourcePath();
+        if (inBuildingTables.containsKey(resourcePath)) {
+            logger.info("cache is in building for snapshot:" + resourcePath);
+            return null;
+        }
+        CachedTableInfo cachedTableInfo = tablesCache.getIfPresent(resourcePath);
+        if (cachedTableInfo == null) {
+            if (buildIfNotExist) {
+                buildSnapshotCache(tableDesc, extTableSnapshotInfo, getSourceLookupTable(tableDesc, extTableSnapshotInfo));
+            }
+            logger.info("no available cache ready for the table snapshot:" + extTableSnapshotInfo.getResourcePath());
+            return null;
+        }
+
+        String[] keyColumns = extTableSnapshotInfo.getKeyColumns();
+        String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+        return new RocksDBLookupTable(tableDesc, keyColumns, dbPath);
+    }
+
+    private ILookupTable getSourceLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) {
+        return LookupProviderFactory.getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo);
+    }
+
+    @Override
+    public void buildSnapshotCache(final TableDesc tableDesc, final ExtTableSnapshotInfo extTableSnapshotInfo, final ILookupTable sourceTable) {
+        if (extTableSnapshotInfo.getSignature().getSize() / 1024 > maxCacheSizeInKB * 2 / 3) {
+            logger.warn("the size is to large to build to cache for snapshot:{}, size:{}, skip cache building",
+                    extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo.getSignature().getSize());
+            return;
+        }
+        final String[] keyColumns = extTableSnapshotInfo.getKeyColumns();
+        final String cachePath = getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+        final String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+        final String snapshotResPath = extTableSnapshotInfo.getResourcePath();
+
+        if (inBuildingTables.containsKey(snapshotResPath)) {
+            logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath);
+            return;
+        }
+        if (inBuildingTables.putIfAbsent(snapshotResPath, true) == null) {
+            cacheBuildExecutor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, keyColumns, dbPath);
+                        builder.build(sourceTable);
+                    } finally {
+                        inBuildingTables.remove(snapshotResPath);
+                    }
+                    saveSnapshotCacheState(extTableSnapshotInfo, cachePath);
+                }
+            });
+
+        } else {
+            logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath);
+        }
+    }
+
+    @Override
+    public void removeSnapshotCache(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        tablesCache.invalidate(extTableSnapshotInfo.getResourcePath());
+    }
+
+    @Override
+    public CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        String resourcePath = extTableSnapshotInfo.getResourcePath();
+        if (inBuildingTables.containsKey(resourcePath)) {
+            return CacheState.IN_BUILDING;
+        }
+        File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
+                extTableSnapshotInfo.getId()));
+        if (!stateFile.exists()) {
+            return CacheState.NONE;
+        }
+        try {
+            String stateString = Files.toString(stateFile, Charsets.UTF_8);
+            return CacheState.valueOf(stateString);
+        } catch (IOException e) {
+            logger.error("error when read state file", e);
+        }
+        return CacheState.NONE;
+    }
+
+    public long getTotalCacheSize() {
+        return FileUtils.sizeOfDirectory(new File(getCacheBasePath(config)));
+    }
+
+    public void checkCacheState() {
+        cacheStateChecker.run();
+    }
+
+    private void saveSnapshotCacheState(ExtTableSnapshotInfo extTableSnapshotInfo, String cachePath) {
+        File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
+                extTableSnapshotInfo.getId()));
+        try {
+            Files.write(CacheState.AVAILABLE.name(), stateFile, Charsets.UTF_8);
+            tablesCache.put(extTableSnapshotInfo.getResourcePath(), new CachedTableInfo(cachePath));
+        } catch (IOException e) {
+            throw new RuntimeException("error when write cache state for snapshot:"
+                    + extTableSnapshotInfo.getResourcePath());
+        }
+    }
+
+    private File getCacheStateFile(String snapshotCacheFolder) {
+        String stateFilePath = snapshotCacheFolder + File.separator + STATE_FILE;
+        return new File(stateFilePath);
+    }
+
+    protected String getSnapshotStorePath(String tableName, String snapshotID) {
+        return getSnapshotCachePath(tableName, snapshotID) + File.separator + DB_FILE;
+    }
+
+    protected String getSnapshotCachePath(String tableName, String snapshotID) {
+        return basePath + File.separator + tableName + File.separator + snapshotID;
+    }
+
+    private class CacheStateChecker implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                String cacheBasePath = getCacheBasePath(config);
+                logger.info("check snapshot local cache state, local path:{}", cacheBasePath);
+                File baseFolder = new File(cacheBasePath);
+                if (!baseFolder.exists()) {
+                    return;
+                }
+                Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(baseFolder);
+                List<Pair<String, File>> allCachedSnapshots = Lists.newArrayList();
+                for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) {
+                    String tableName = tableSnapshotsEntry.getKey();
+                    for (File file : tableSnapshotsEntry.getValue()) {
+                        String snapshotID = file.getName();
+                        allCachedSnapshots
+                                .add(new Pair<>(ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID), file));
+                    }
+                }
+
+                final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths();
+
+                List<Pair<String, File>> toRemovedCachedSnapshots = Lists.newArrayList(FluentIterable.from(
+                        allCachedSnapshots).filter(new Predicate<Pair<String, File>>() {
+                    @Override
+                    public boolean apply(@Nullable Pair<String, File> input) {
+                        return !activeSnapshotSet.contains(input.getFirst());
+                    }
+                }));
+                for (Pair<String, File> toRemovedCachedSnapshot : toRemovedCachedSnapshots) {
+                    File snapshotCacheFolder = toRemovedCachedSnapshot.getSecond();
+                    logger.info("removed cache file:{}, it is not referred by any cube",
+                            snapshotCacheFolder.getAbsolutePath());
+                    try {
+                        FileUtils.deleteDirectory(snapshotCacheFolder);
+                    } catch (IOException e) {
+                        logger.error("fail to remove folder:" + snapshotCacheFolder.getAbsolutePath(), e);
+                    }
+                    tablesCache.invalidate(toRemovedCachedSnapshot.getFirst());
+                }
+            } catch (Exception e) {
+                logger.error("error happens when check cache state", e);
+            }
+
+        }
+    }
+
+    private static class CachedTableInfo {
+        private String cachePath;
+
+        private long dbSize;
+
+        public CachedTableInfo(String cachePath) {
+            this.cachePath = cachePath;
+            this.dbSize = FileUtils.sizeOfDirectory(new File(cachePath));
+        }
+
+        public int getSizeInKB() {
+            return (int) (dbSize / 1024);
+        }
+
+        public void cleanStorage() {
+            logger.info("clean cache storage for path:" + cachePath);
+            try {
+                FileUtils.deleteDirectory(new File(cachePath));
+            } catch (IOException e) {
+                logger.error("file delete fail:" + cachePath, e);
+            }
+        }
+    }
+
+    /**
+     * A simple named thread factory.
+     */
+    private static class NamedThreadFactory implements ThreadFactory {
+        private final ThreadGroup group;
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final String namePrefix;
+
+        public NamedThreadFactory(String threadPrefix) {
+            final SecurityManager s = System.getSecurityManager();
+            this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+            this.namePrefix = threadPrefix + "-";
+        }
+
+        @Override
+        public Thread newThread(Runnable r) {
+            final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+            t.setDaemon(true);
+            return t;
+        }
+    }
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java
new file mode 100644
index 0000000..ff5fdff
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBLookupRowEncoderTest extends LocalFileMetadataTestCase {
+    private TableDesc tableDesc;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testEnDeCode() {
+        RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" });
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        KV kv = lookupRowEncoder.encode(row);
+
+        String[] decodeRow = lookupRowEncoder.decode(kv);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithNullValue() {
+        RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" });
+        String[] row = new String[] { "AD", "42.546245", "1.601554", null };
+        KV kv = lookupRowEncoder.encode(row);
+
+        String[] decodeRow = lookupRowEncoder.decode(kv);
+        assertNull(decodeRow[3]);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithMultiKeys() {
+        RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY",
+                "NAME" });
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        KV kv = lookupRowEncoder.encode(row);
+
+        String[] decodeRow = lookupRowEncoder.decode(kv);
+        assertArrayEquals(row, decodeRow);
+    }
+
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java
new file mode 100644
index 0000000..55b25b2
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.IExtLookupProvider;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+/**
+ */
+public class RocksDBLookupTableCacheTest extends LocalFileMetadataTestCase {
+    private static final String TABLE_COUNTRY = "DEFAULT.TEST_COUNTRY";
+    private static final String MOCK_EXT_LOOKUP = "mock";
+    private TableDesc tableDesc;
+    private KylinConfig kylinConfig;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        System.setProperty("KYLIN_HOME", ".");
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        kylinConfig = getTestConfig();
+        tableDesc = metadataManager.getTableDesc(TABLE_COUNTRY, "default");
+        cleanCache();
+        LookupProviderFactory.registerLookupProvider(MOCK_EXT_LOOKUP, MockedLookupProvider.class.getName());
+    }
+
+    private void cleanCache() {
+        FileUtils.deleteQuietly(new File(kylinConfig.getExtTableSnapshotLocalCachePath()));
+    }
+
+    @After
+    public void tearDown() {
+        cleanupTestMetadata();
+        cleanCache();
+    }
+
+    @Test
+    public void testBuildTableCache() throws Exception {
+        String snapshotID = UUID.randomUUID().toString();
+        ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(snapshotID, 100000);
+        assertEquals(CacheState.AVAILABLE, RocksDBLookupTableCache.getInstance(kylinConfig).getCacheState(snapshotInfo));
+    }
+
+    private ExtTableSnapshotInfo buildSnapshotCache(String snapshotID, int rowCnt) throws Exception {
+        ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo();
+        snapshotInfo.setTableName(TABLE_COUNTRY);
+        snapshotInfo.setUuid(snapshotID);
+        snapshotInfo.setStorageType(MOCK_EXT_LOOKUP);
+        snapshotInfo.setKeyColumns(new String[] { "COUNTRY" });
+        snapshotInfo.setRowCnt(rowCnt);
+        snapshotInfo.setSignature(new TableSignature("/test", rowCnt, System.currentTimeMillis()));
+
+        RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+        cache.buildSnapshotCache(tableDesc, snapshotInfo, getLookupTableWithRandomData(rowCnt));
+
+        while (cache.getCacheState(snapshotInfo) == CacheState.IN_BUILDING) {
+            Thread.sleep(500);
+        }
+        return snapshotInfo;
+    }
+
+    @Test
+    public void testRestoreCacheFromFiles() throws Exception {
+        String snapshotID = UUID.randomUUID().toString();
+        String snapshotCacheBasePath = RocksDBLookupTableCache.getCacheBasePath(kylinConfig) + File.separator
+                + TABLE_COUNTRY + File.separator + snapshotID;
+        String dbPath = snapshotCacheBasePath + File.separator + "db";
+        RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" }, dbPath);
+        builder.build(getLookupTableWithRandomData(10000));
+        String stateFilePath = snapshotCacheBasePath + File.separator + "STATE";
+        Files.write(CacheState.AVAILABLE.name(), new File(stateFilePath), Charsets.UTF_8);
+
+        RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+        ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo();
+        snapshotInfo.setTableName(TABLE_COUNTRY);
+        snapshotInfo.setUuid(snapshotID);
+        snapshotInfo.setStorageType(MOCK_EXT_LOOKUP);
+        snapshotInfo.setKeyColumns(new String[] { "COUNTRY" });
+        ILookupTable lookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+        int rowCnt = 0;
+        for (String[] strings : lookupTable) {
+            rowCnt++;
+        }
+        lookupTable.close();
+        assertEquals(10000, rowCnt);
+    }
+
+    @Test
+    public void testEvict() throws Exception {
+        kylinConfig.setProperty("kylin.snapshot.ext.local.cache.max-size-gb", "0.005");
+        int snapshotNum = 10;
+        int snapshotRowCnt = 100000;
+        for (int i = 0; i < snapshotNum; i++) {
+            buildSnapshotCache(UUID.randomUUID().toString(), snapshotRowCnt);
+        }
+        assertTrue(RocksDBLookupTableCache.getInstance(kylinConfig).getTotalCacheSize() < 0.006 * 1024 * 1024 * 1024);
+    }
+
+    @Test
+    public void testCheckCacheState() throws Exception {
+        ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(UUID.randomUUID().toString(), 1000);
+        RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+        ILookupTable cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+        assertNotNull(cachedLookupTable);
+        cachedLookupTable.close();
+        cache.checkCacheState();
+        String cacheLocalPath = cache.getSnapshotCachePath(snapshotInfo.getTableName(), snapshotInfo.getId());
+        assertFalse(new File(cacheLocalPath).exists());
+        cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+        assertNull(cachedLookupTable);
+    }
+
+    public static class MockedLookupProvider implements IExtLookupProvider {
+
+        @Override
+        public ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+            return getLookupTableWithRandomData(extTableSnapshot.getRowCnt());
+        }
+
+        @Override
+        public IExtLookupTableCache getLocalCache() {
+            return null;
+        }
+
+        @Override
+        public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+            return null;
+        }
+    }
+
+    private static ILookupTable getLookupTableWithRandomData(final long rowNum) {
+        return new ILookupTable() {
+            Random random = new Random();
+
+            @Override
+            public String[] getRow(Array<String> key) {
+                return new String[0];
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public Iterator<String[]> iterator() {
+                return new Iterator<String[]>() {
+                    private int iterCnt = 0;
+
+                    @Override
+                    public boolean hasNext() {
+                        return iterCnt < rowNum;
+                    }
+
+                    @Override
+                    public String[] next() {
+                        iterCnt++;
+                        return genRandomRow(iterCnt);
+                    }
+
+                    @Override
+                    public void remove() {
+
+                    }
+                };
+            }
+
+            private String[] genRandomRow(int id) {
+                return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()),
+                        String.valueOf(random.nextDouble()), "Andorra" + id };
+            }
+        };
+    }
+
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java
new file mode 100644
index 0000000..d5bbd40
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class RocksDBLookupTableTest extends LocalFileMetadataTestCase {
+
+    private TableDesc tableDesc;
+    private RocksDBLookupTable lookupTable;
+    private Random random;
+    private int sourceRowNum;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        this.random = new Random();
+        tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+        sourceRowNum = 10000;
+        genTestData();
+        lookupTable = new RocksDBLookupTable(tableDesc, new String[] { "COUNTRY" }, "lookup_cache/TEST_COUNTRY");
+    }
+
+    private void genTestData() {
+        removeTestDataIfExists();
+        File file = new File("lookup_cache/TEST_COUNTRY");
+        file.mkdirs();
+        RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" },
+                "lookup_cache/TEST_COUNTRY");
+        long start = System.currentTimeMillis();
+        builder.build(getLookupTableWithRandomData(sourceRowNum));
+        long take = System.currentTimeMillis() - start;
+        System.out.println("take:" + take + " ms to complete build");
+    }
+
+    private void removeTestDataIfExists() {
+        FileUtils.deleteQuietly(new File("lookup_cache"));
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        cleanupTestMetadata();
+        removeTestDataIfExists();
+        lookupTable.close();
+    }
+
+    @Test
+    public void testIterator() throws Exception {
+        System.out.println("start iterator table");
+        long start = System.currentTimeMillis();
+        Iterator<String[]> iter = lookupTable.iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        long take = System.currentTimeMillis() - start;
+        System.out.println("scan " + count + " rows, take " + take + " ms");
+
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        int getNum = 3000;
+        List<String[]> keys = Lists.newArrayList();
+        for (int i = 0; i < getNum; i++) {
+            String[] keyi = new String[] { "keyyyyy" + random.nextInt(sourceRowNum) };
+            keys.add(keyi);
+        }
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < getNum; i++) {
+            String[] row = lookupTable.getRow(new Array<>(keys.get(i)));
+            if (row == null) {
+                System.out.println("null value for key:" + Arrays.toString(keys.get(i)));
+            }
+        }
+        long take = System.currentTimeMillis() - start;
+        System.out.println("muliti get " + getNum + " rows, take " + take + " ms");
+    }
+
+    private ILookupTable getLookupTableWithRandomData(final int rowNum) {
+        return new ILookupTable() {
+            @Override
+            public String[] getRow(Array<String> key) {
+                return new String[0];
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public Iterator<String[]> iterator() {
+                return new Iterator<String[]>() {
+                    private int iterCnt = 0;
+
+                    @Override
+                    public boolean hasNext() {
+                        return iterCnt < rowNum;
+                    }
+
+                    @Override
+                    public String[] next() {
+                        iterCnt++;
+                        return genRandomRow(iterCnt);
+                    }
+
+                    @Override
+                    public void remove() {
+
+                    }
+                };
+            }
+        };
+    }
+
+    private String[] genRandomRow(int id) {
+        return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()), String.valueOf(random.nextDouble()),
+                "Andorra" + id };
+    }
+
+}
diff --git a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
index 09f5a7a..ddeeafa 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
@@ -36,7 +36,7 @@ public class SelfStopExecutable extends BaseTestExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         doingWork = true;
         try {
-            for (int i = 0; i < 20; i++) {
+            for (int i = 0; i < 60; i++) {
                 sleepOneSecond();
                 
                 if (isDiscarded())
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 8af9e3f..05fc90f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -19,30 +19,26 @@
 package org.apache.kylin.storage.gtrecord;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
-import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +63,7 @@ public class CubeTupleConverter implements ITupleConverter {
 
     public final List<IAdvMeasureFiller> advMeasureFillers;
     public final List<Integer> advMeasureIndexInGTValues;
+    private List<ILookupTable> usedLookupTables;
 
     public final int nSelectedDims;
 
@@ -86,6 +83,7 @@ public class CubeTupleConverter implements ITupleConverter {
 
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
+        usedLookupTables = Lists.newArrayList();
 
         ////////////
 
@@ -178,6 +176,17 @@ public class CubeTupleConverter implements ITupleConverter {
         }
     }
 
+    @Override
+    public void close() throws IOException {
+        for (ILookupTable usedLookupTable : usedLookupTables) {
+            try {
+                usedLookupTable.close();
+            } catch (Exception e) {
+                logger.error("error when close lookup table:" + usedLookupTable);
+            }
+        }
+    }
+
     protected interface IDerivedColumnFiller {
         public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
     }
@@ -204,7 +213,7 @@ public class CubeTupleConverter implements ITupleConverter {
         switch (deriveInfo.type) {
         case LOOKUP:
             return new IDerivedColumnFiller() {
-                LookupStringTable lookupTable = getLookupTable(cubeSeg, deriveInfo.join);
+                ILookupTable lookupTable = getAndAddLookupTable(cubeSeg, deriveInfo.join);
                 int[] derivedColIdx = initDerivedColIdx();
                 Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
 
@@ -263,40 +272,10 @@ public class CubeTupleConverter implements ITupleConverter {
         return -1;
     }
 
-    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-        long ts = System.currentTimeMillis();
-
-        TableMetadataManager metaMgr = TableMetadataManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-        SnapshotManager snapshotMgr = SnapshotManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-
-        String tableName = join.getPKSide().getTableIdentity();
-        String[] pkCols = join.getPrimaryKey();
-        String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
-        if (snapshotResPath == null)
-            throw new IllegalStateException("No snaphot for table '" + tableName + "' found on cube segment" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
-
-        try {
-            SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotResPath);
-            TableDesc tableDesc = metaMgr.getTableDesc(tableName, cubeSegment.getProject());
-            EnhancedStringLookupTable enhancedStringLookupTable = new EnhancedStringLookupTable(tableDesc, pkCols, snapshot);
-            logger.info("Time to get lookup up table for {} is {} ", join.getPKSide().getTableName(), (System.currentTimeMillis() - ts));
-            return enhancedStringLookupTable;
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
-        }
-    }
-
-    public static class EnhancedStringLookupTable extends LookupStringTable {
-
-        public EnhancedStringLookupTable(TableDesc tableDesc, String[] keyColumns, IReadableTable table) throws IOException {
-            super(tableDesc, keyColumns, table);
-        }
-
-        @Override
-        protected void init() throws IOException {
-            this.data = new HashMap<>();
-            super.init();
-        }
+    public ILookupTable getAndAddLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+        ILookupTable lookupTable = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getLookupTable(cubeSegment, join);
+        usedLookupTables.add(lookupTable);
+        return lookupTable;
     }
 
     private static String toString(Object o) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 2f69b76..5e11e91 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -36,7 +37,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.RowKeyColDesc;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.bitmap.BitmapMeasureType;
@@ -363,9 +364,16 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
             return compf;
 
         DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
-        LookupStringTable lookup = cubeDesc.getHostInfo(derived).type == CubeDesc.DeriveType.PK_FK ? null
+        ILookupTable lookup = cubeDesc.getHostInfo(derived).type == CubeDesc.DeriveType.PK_FK ? null
                 : getLookupStringTableForDerived(derived, hostInfo);
         Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+        try {
+            if (lookup != null) {
+                lookup.close();
+            }
+        } catch (IOException e) {
+            logger.error("error when close lookup table.", e);
+        }
         TupleFilter translatedFilter = translated.getFirst();
         boolean loosened = translated.getSecond();
         if (loosened) {
@@ -375,7 +383,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
     }
 
     @SuppressWarnings("unchecked")
-    protected LookupStringTable getLookupStringTableForDerived(TblColRef derived, DeriveInfo hostInfo) {
+    protected ILookupTable getLookupStringTableForDerived(TblColRef derived, DeriveInfo hostInfo) {
         CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
         CubeSegment seg = cubeInstance.getLatestReadySegment();
         return cubeMgr.getLookupTable(seg, hostInfo.join);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
index dd48e4d..72e6848 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
@@ -18,12 +18,13 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import java.io.Closeable;
 import java.util.List;
 
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.tuple.Tuple;
 
-public interface ITupleConverter {
+public interface ITupleConverter extends Closeable {
 
     public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple);
 }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 3bac5ec..a6e0bc1 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -202,6 +202,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
     protected void close(CubeSegmentScanner scanner) {
         try {
             scanner.close();
+            cubeTupleConverter.close();
         } catch (IOException e) {
             logger.error("Exception when close CubeScanner", e);
         }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index f4150fe..7fa426f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -27,7 +27,7 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.kv.RowKeyColumnOrder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.CubeDesc.DeriveType;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -50,7 +50,7 @@ public class DerivedFilterTranslator {
 
     private static final Logger logger = LoggerFactory.getLogger(DerivedFilterTranslator.class);
 
-    public static Pair<TupleFilter, Boolean> translate(LookupStringTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
+    public static Pair<TupleFilter, Boolean> translate(ILookupTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
 
         TblColRef derivedCol = compf.getColumn();
         TblColRef[] hostCols = hostInfo.columns;
@@ -76,7 +76,7 @@ public class DerivedFilterTranslator {
 
         Set<Array<String>> satisfyingHostRecords = Sets.newHashSet();
         SingleColumnTuple tuple = new SingleColumnTuple(derivedCol);
-        for (String[] row : lookup.getAllRows()) {
+        for (String[] row : lookup) {
             tuple.value = row[di];
             if (compf.evaluate(tuple, FilterCodeSystemFactory.getFilterCodeSystem(derivedCol.getColumnDesc().getType()))) {
                 collect(row, pi, satisfyingHostRecords);
diff --git a/pom.xml b/pom.xml
index a8e8312..a5cc1fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
         <tomcat.version>7.0.85</tomcat.version>
         <t-digest.version>3.1</t-digest.version>
         <freemarker.version>2.3.23</freemarker.version>
+        <rocksdb.version>5.9.2</rocksdb.version>
         <!--metric-->
         <dropwizard.version>3.1.2</dropwizard.version>
         <!-- REST Service, ref https://github.com/spring-projects/spring-boot/blob/v1.3.8.RELEASE/spring-boot-dependencies/pom.xml -->
@@ -655,6 +656,11 @@
                 <artifactId>freemarker</artifactId>
                 <version>${freemarker.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.rocksdb</groupId>
+                <artifactId>rocksdbjni</artifactId>
+                <version>${rocksdb.version}</version>
+            </dependency>
 
             <!-- Logging -->
             <dependency>
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
index 1500def..c3407bc 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
@@ -18,8 +18,8 @@
 
 package org.apache.kylin.query.enumerator;
 
+import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -27,19 +27,22 @@ import org.apache.calcite.linq4j.Enumerator;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.schema.OLAPTable;
 import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
 public class LookupTableEnumerator implements Enumerator<Object[]> {
+    private final static Logger logger = LoggerFactory.getLogger(LookupTableEnumerator.class);
 
-    private final Collection<String[]> allRows;
+    private final ILookupTable lookupTable;
     private final List<ColumnDesc> colDescs;
     private final Object[] current;
     private Iterator<String[]> iterator;
@@ -67,8 +70,7 @@ public class LookupTableEnumerator implements Enumerator<Object[]> {
             throw new IllegalStateException("No dimension with derived columns found for lookup table " + lookupTableName + ", cube desc " + cube.getDescriptor());
 
         CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
-        LookupStringTable table = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
-        this.allRows = table.getAllRows();
+        this.lookupTable = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
 
         OLAPTable olapTable = (OLAPTable) olapContext.firstTableScan.getOlapTable();
         this.colDescs = olapTable.getSourceColumns();
@@ -103,11 +105,16 @@ public class LookupTableEnumerator implements Enumerator<Object[]> {
 
     @Override
     public void reset() {
-        this.iterator = allRows.iterator();
+        this.iterator = lookupTable.iterator();
     }
 
     @Override
     public void close() {
+        try {
+            lookupTable.close();
+        } catch (IOException e) {
+            logger.error("error when close lookup table", e);
+        }
     }
 
 }

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.

[kylin] 05/05: KYLIN-3377 Some improvements for lookup table - snapshot management

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3221
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 3826a3de898af0c7f5bdb20052e0c7d3bed8193c
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu May 10 13:13:15 2018 +0800

    KYLIN-3377 Some improvements for lookup table - snapshot management
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../kylin/engine/mr/LookupSnapshotBuildJob.java    |  97 +++++++++++++++
 .../kylin/engine/mr/LookupSnapshotJobBuilder.java  |  86 +++++++++++++
 .../mr/steps/lookup/LookupExecutableUtil.java      | 134 +++++++++++++++++++++
 .../lookup/LookupSnapshotToMetaStoreStep.java      |  83 +++++++++++++
 .../steps/lookup/UpdateCubeAfterSnapshotStep.java  |  78 ++++++++++++
 .../kylin/rest/controller/CubeController.java      |  20 +++
 .../kylin/rest/controller/TableController.java     |  28 +++++
 .../rest/request/LookupSnapshotBuildRequest.java   |  51 ++++++++
 .../kylin/rest/response/TableSnapshotResponse.java |  98 +++++++++++++++
 .../org/apache/kylin/rest/service/JobService.java  |  32 +++++
 .../apache/kylin/rest/service/TableService.java    | 116 ++++++++++++++++++
 .../kylin/rest/service/TableServiceTest.java       |  53 ++++++++
 12 files changed, 876 insertions(+)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
new file mode 100644
index 0000000..6865ce3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+
+public class LookupSnapshotBuildJob extends DefaultChainedExecutable {
+
+    public static final Integer DEFAULT_PRIORITY = 30;
+
+    private static final String DEPLOY_ENV_NAME = "envName";
+    private static final String PROJECT_INSTANCE_NAME = "projectName";
+    private static final String CUBE_NAME = "cubeName";
+
+    private static final String JOB_TYPE = "Lookup ";
+
+    public static LookupSnapshotBuildJob createJob(CubeInstance cube, String tableName, String submitter,
+            KylinConfig kylinConfig) {
+        return initJob(cube, tableName, submitter, kylinConfig);
+    }
+
+    private static LookupSnapshotBuildJob initJob(CubeInstance cube, String tableName, String submitter,
+            KylinConfig kylinConfig) {
+        List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(),
+                cube.getName());
+        if (projList == null || projList.size() == 0) {
+            throw new RuntimeException("Cannot find the project containing the cube " + cube.getName() + "!!!");
+        } else if (projList.size() >= 2) {
+            String msg = "Find more than one project containing the cube " + cube.getName()
+                    + ". It does't meet the uniqueness requirement!!! ";
+            throw new RuntimeException(msg);
+        }
+
+        LookupSnapshotBuildJob result = new LookupSnapshotBuildJob();
+        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+        format.setTimeZone(TimeZone.getTimeZone(kylinConfig.getTimeZone()));
+        result.setDeployEnvName(kylinConfig.getDeployEnv());
+        result.setProjectName(projList.get(0).getName());
+        CubingExecutableUtil.setCubeName(cube.getName(), result.getParams());
+        result.setName(JOB_TYPE + " CUBE - " + cube.getName() + " - " + " TABLE - " + tableName + " - "
+                + format.format(new Date(System.currentTimeMillis())));
+        result.setSubmitter(submitter);
+        result.setNotifyList(cube.getDescriptor().getNotifyList());
+        return result;
+    }
+
+    protected void setDeployEnvName(String name) {
+        setParam(DEPLOY_ENV_NAME, name);
+    }
+
+    public String getDeployEnvName() {
+        return getParam(DEPLOY_ENV_NAME);
+    }
+
+    public String getProjectName() {
+        return getParam(PROJECT_INSTANCE_NAME);
+    }
+
+    public void setProjectName(String name) {
+        setParam(PROJECT_INSTANCE_NAME, name);
+    }
+
+    public String getCubeName() {
+        return getParam(CUBE_NAME);
+    }
+
+    @Override
+    public int getDefaultPriority() {
+        return DEFAULT_PRIORITY;
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java
new file mode 100644
index 0000000..e7888a5
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
+import org.apache.kylin.engine.mr.steps.lookup.LookupSnapshotToMetaStoreStep;
+import org.apache.kylin.engine.mr.steps.lookup.UpdateCubeAfterSnapshotStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LookupSnapshotJobBuilder {
+    private static final Logger logger = LoggerFactory.getLogger(LookupSnapshotJobBuilder.class);
+    private CubeInstance cube;
+    private String lookupTable;
+    private List<String> segments;
+    private String submitter;
+    private KylinConfig kylinConfig;
+
+    public LookupSnapshotJobBuilder(CubeInstance cube, String lookupTable, List<String> segments, String submitter) {
+        this.cube = cube;
+        this.lookupTable = lookupTable;
+        this.segments = segments;
+        this.submitter = submitter;
+        this.kylinConfig = cube.getConfig();
+    }
+
+    public LookupSnapshotBuildJob build() {
+        logger.info("new job to build lookup snapshot:{} for cube:{}", lookupTable, cube.getName());
+        LookupSnapshotBuildJob result = LookupSnapshotBuildJob.createJob(cube, lookupTable, submitter, kylinConfig);
+        CubeDesc cubeDesc = cube.getDescriptor();
+        SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(lookupTable);
+        if (snapshotTableDesc != null && snapshotTableDesc.isExtSnapshotTable()) {
+            addExtMaterializeLookupTableSteps(result, snapshotTableDesc);
+        } else {
+            addInMetaStoreMaterializeLookupTableSteps(result);
+        }
+        return result;
+    }
+
+    private void addExtMaterializeLookupTableSteps(final LookupSnapshotBuildJob result,
+            SnapshotTableDesc snapshotTableDesc) {
+        ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotTableDesc.getStorageType());
+        materializer.materializeLookupTable(result, cube, lookupTable);
+
+        UpdateCubeAfterSnapshotStep afterSnapshotStep = new UpdateCubeAfterSnapshotStep();
+        afterSnapshotStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE);
+        LookupExecutableUtil.setCubeName(cube.getName(), afterSnapshotStep.getParams());
+        LookupExecutableUtil.setLookupTableName(lookupTable, afterSnapshotStep.getParams());
+        LookupExecutableUtil.setSegments(segments, afterSnapshotStep.getParams());
+        LookupExecutableUtil.setJobID(result.getId(), afterSnapshotStep.getParams());
+        result.addTask(afterSnapshotStep);
+    }
+
+    private void addInMetaStoreMaterializeLookupTableSteps(final LookupSnapshotBuildJob result) {
+        LookupSnapshotToMetaStoreStep lookupSnapshotToMetaStoreStep = new LookupSnapshotToMetaStoreStep();
+        lookupSnapshotToMetaStoreStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE);
+        LookupExecutableUtil.setCubeName(cube.getName(), lookupSnapshotToMetaStoreStep.getParams());
+        LookupExecutableUtil.setLookupTableName(lookupTable, lookupSnapshotToMetaStoreStep.getParams());
+        LookupExecutableUtil.setSegments(segments, lookupSnapshotToMetaStoreStep.getParams());
+        result.addTask(lookupSnapshotToMetaStoreStep);
+    }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupExecutableUtil.java
new file mode 100644
index 0000000..bc2aa1d
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupExecutableUtil.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps.lookup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+
+import com.google.common.collect.Lists;
+
+public class LookupExecutableUtil {
+
+    public static final String CUBE_NAME = "cubeName";
+    public static final String LOOKUP_TABLE_NAME = "lookupTableName";
+    public static final String PROJECT_NAME = "projectName";
+    public static final String LOOKUP_SNAPSHOT_ID = "snapshotID";
+    public static final String SEGMENT_IDS = "segments";
+    public static final String JOB_ID = "jobID";
+
+
+    public static void setCubeName(String cubeName, Map<String, String> params) {
+        params.put(CUBE_NAME, cubeName);
+    }
+
+    public static String getCubeName(Map<String, String> params) {
+        return params.get(CUBE_NAME);
+    }
+
+    public static void setLookupTableName(String lookupTableName, Map<String, String> params) {
+        params.put(LOOKUP_TABLE_NAME, lookupTableName);
+    }
+
+    public static String getLookupTableName(Map<String, String> params) {
+        return params.get(LOOKUP_TABLE_NAME);
+    }
+
+    public static void setProjectName(String projectName, Map<String, String> params) {
+        params.put(PROJECT_NAME, projectName);
+    }
+
+    public static String getProjectName(Map<String, String> params) {
+        return params.get(PROJECT_NAME);
+    }
+
+    public static void setLookupSnapshotID(String snapshotID, Map<String, String> params) {
+        params.put(LOOKUP_SNAPSHOT_ID, snapshotID);
+    }
+
+    public static String getLookupSnapshotID(Map<String, String> params) {
+        return params.get(LOOKUP_SNAPSHOT_ID);
+    }
+
+    public static List<String> getSegments(Map<String, String> params) {
+        final String ids = params.get(SEGMENT_IDS);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id : splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    public static void setSegments(List<String> segments, Map<String, String> params) {
+        params.put(SEGMENT_IDS, StringUtils.join(segments, ","));
+    }
+
+
+    public static String getJobID(Map<String, String> params) {
+        return params.get(JOB_ID);
+    }
+
+    public static void setJobID(String jobID, Map<String, String> params) {
+        params.put(JOB_ID, jobID);
+    }
+    
+    public static void updateSnapshotPathToCube(CubeManager cubeManager, CubeInstance cube, String lookupTableName,
+            String snapshotPath) throws IOException {
+        cubeManager.updateCubeLookupSnapshot(cube, lookupTableName, snapshotPath);
+        cube.putSnapshotResPath(lookupTableName, snapshotPath);
+    }
+
+    public static void updateSnapshotPathToSegments(CubeManager cubeManager, CubeInstance cube, List<String> segmentIDs, String lookupTableName, String snapshotPath) throws IOException {
+        CubeInstance cubeCopy = cube.latestCopyForWrite();
+        if (segmentIDs.size() > 0) {
+            CubeSegment[] segments = new CubeSegment[segmentIDs.size()];
+            for (int i = 0; i < segments.length; i++) {
+                CubeSegment segment = cubeCopy.getSegmentById(segmentIDs.get(i));
+                if (segment == null) {
+                    throw new IllegalStateException("the segment not exist in cube:" + segmentIDs.get(i));
+                }
+                segment.putSnapshotResPath(lookupTableName, snapshotPath);
+                segments[i] = segment;
+            }
+            CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+            cubeUpdate.setToUpdateSegs(segments);
+            cubeManager.updateCube(cubeUpdate);
+
+            // Update the input cubeSeg after the resource store updated
+            for (int i = 0; i < segments.length; i++) {
+                CubeSegment segment = cube.getSegmentById(segmentIDs.get(i));
+                segment.putSnapshotResPath(lookupTableName, snapshotPath);
+            }
+        }
+    }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
new file mode 100644
index 0000000..783ded0
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.SourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Save lookup snapshot information to cube metadata
+ */
+public class LookupSnapshotToMetaStoreStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(LookupSnapshotToMetaStoreStep.class);
+
+    public LookupSnapshotToMetaStoreStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig kylinConfig = context.getConfig();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+        TableMetadataManager metaMgr = TableMetadataManager.getInstance(kylinConfig);
+        SnapshotManager snapshotMgr = SnapshotManager.getInstance(kylinConfig);
+        CubeInstance cube = cubeManager.getCube(LookupExecutableUtil.getCubeName(this.getParams()));
+        List<String> segmentIDs = LookupExecutableUtil.getSegments(this.getParams());
+        String lookupTableName = LookupExecutableUtil.getLookupTableName(this.getParams());
+        CubeDesc cubeDesc = cube.getDescriptor();
+        try {
+            TableDesc tableDesc = metaMgr.getTableDesc(lookupTableName, cube.getProject());
+            IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+            logger.info("take snapshot for table:" + lookupTableName);
+            SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
+
+            logger.info("update snapshot path to cube metadata");
+            if (cubeDesc.isGlobalSnapshotTable(lookupTableName)) {
+                LookupExecutableUtil.updateSnapshotPathToCube(cubeManager, cube, lookupTableName,
+                        snapshot.getResourcePath());
+            } else {
+                LookupExecutableUtil.updateSnapshotPathToSegments(cubeManager, cube, segmentIDs, lookupTableName,
+                        snapshot.getResourcePath());
+            }
+            return new ExecuteResult();
+        } catch (IOException e) {
+            logger.error("fail to build snapshot for:" + lookupTableName, e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
new file mode 100644
index 0000000..463e3b9
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Save lookup snapshot information to cube metadata
+ */
+public class UpdateCubeAfterSnapshotStep extends AbstractExecutable {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateCubeAfterSnapshotStep.class);
+
+    public UpdateCubeAfterSnapshotStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        KylinConfig kylinConfig = context.getConfig();
+        CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+        CubeInstance cube = cubeManager.getCube(LookupExecutableUtil.getCubeName(this.getParams()));
+        List<String> segmentIDs = LookupExecutableUtil.getSegments(this.getParams());
+        String lookupTableName = LookupExecutableUtil.getLookupTableName(this.getParams());
+        DefaultChainedExecutable job = (DefaultChainedExecutable) getManager().getJob(LookupExecutableUtil.getJobID(this.getParams()));
+
+        String contextKey = BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + lookupTableName;
+        String snapshotResPath = job.getExtraInfo(contextKey);
+
+        CubeDesc cubeDesc = cube.getDescriptor();
+        try {
+            logger.info("update snapshot path to cube metadata");
+            if (cubeDesc.isGlobalSnapshotTable(lookupTableName)) {
+                LookupExecutableUtil.updateSnapshotPathToCube(cubeManager, cube, lookupTableName,
+                        snapshotResPath);
+            } else {
+                LookupExecutableUtil.updateSnapshotPathToSegments(cubeManager, cube, segmentIDs, lookupTableName,
+                        snapshotResPath);
+            }
+            return new ExecuteResult();
+        } catch (IOException e) {
+            logger.error("fail to save cuboid statistics", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index b4ebcba..c6219a4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -64,6 +64,7 @@ import org.apache.kylin.rest.request.JobBuildRequest2;
 import org.apache.kylin.rest.request.JobOptimizeRequest;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.CubeInstanceResponse;
+import org.apache.kylin.rest.request.LookupSnapshotBuildRequest;
 import org.apache.kylin.rest.response.CuboidTreeResponse;
 import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.GeneralResponse;
@@ -299,6 +300,25 @@ public class CubeController extends BasicController {
     }
 
     /**
+     * Force rebuild a cube's lookup table snapshot
+     *
+     * @throws IOException
+     */
+    @RequestMapping(value = "/{cubeName}/refresh_lookup", method = { RequestMethod.PUT }, produces = { "application/json" })
+    @ResponseBody
+    public JobInstance reBuildLookupSnapshot(@PathVariable String cubeName, @RequestBody LookupSnapshotBuildRequest request) {
+        try {
+            final CubeManager cubeMgr = cubeService.getCubeManager();
+            final CubeInstance cube = cubeMgr.getCube(cubeName);
+            String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
+            return jobService.submitLookupSnapshotJob(cube, request.getLookupTableName(), request.getSegmentIDs(), submitter);
+        } catch (IOException e) {
+            logger.error(e.getLocalizedMessage(), e);
+            throw new InternalErrorException(e.getLocalizedMessage());
+        }
+    }
+
+    /**
      * Delete a cube segment
      *
      * @throws IOException
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 7ada8cc..66621c7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -32,6 +32,7 @@ import org.apache.kylin.rest.exception.NotFoundException;
 import org.apache.kylin.rest.request.CardinalityRequest;
 import org.apache.kylin.rest.request.HiveTableRequest;
 import org.apache.kylin.rest.service.TableACLService;
+import org.apache.kylin.rest.response.TableSnapshotResponse;
 import org.apache.kylin.rest.service.TableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -208,4 +209,31 @@ public class TableController extends BasicController {
         }
     }
 
+    @RequestMapping(value = "/{project}/{tableName}/{snapshotID}/snapshotLocalCache", method = { RequestMethod.PUT })
+    @ResponseBody
+    public void updateSnapshotLocalCache(@PathVariable final String project, @PathVariable final String tableName, @PathVariable final String snapshotID) {
+        tableService.updateSnapshotLocalCache(project, tableName, snapshotID);
+    }
+
+    @RequestMapping(value = "/{tableName}/{snapshotID}/snapshotLocalCache/state", method = { RequestMethod.GET })
+    @ResponseBody
+    public String getSnapshotLocalCacheState(@PathVariable final String tableName, @PathVariable final String snapshotID) {
+        return tableService.getSnapshotLocalCacheState(tableName, snapshotID);
+    }
+
+    @RequestMapping(value = "/{tableName}/{snapshotID}/snapshotLocalCache", method = { RequestMethod.DELETE })
+    @ResponseBody
+    public void removeSnapshotLocalCache(@PathVariable final String tableName, @PathVariable final String snapshotID) {
+        tableService.removeSnapshotLocalCache(tableName, snapshotID);
+    }
+
+    @RequestMapping(value = "/{project}/{tableName}/snapshots", method = { RequestMethod.GET })
+    @ResponseBody
+    public List<TableSnapshotResponse> getTableSnapshots(@PathVariable final String project, @PathVariable final String tableName) throws IOException {
+        return tableService.getLookupTableSnapshots(project, tableName);
+    }
+
+    public void setTableService(TableService tableService) {
+        this.tableService = tableService;
+    }
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/LookupSnapshotBuildRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/LookupSnapshotBuildRequest.java
new file mode 100644
index 0000000..06adf8a
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/LookupSnapshotBuildRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+import java.util.List;
+
+public class LookupSnapshotBuildRequest {
+    private String cubeName;
+    private String lookupTableName;
+    private List<String> segmentIDs;
+
+    public String getCubeName() {
+        return cubeName;
+    }
+
+    public void setCubeName(String cubeName) {
+        this.cubeName = cubeName;
+    }
+
+    public String getLookupTableName() {
+        return lookupTableName;
+    }
+
+    public void setLookupTableName(String lookupTableName) {
+        this.lookupTableName = lookupTableName;
+    }
+
+    public List<String> getSegmentIDs() {
+        return segmentIDs;
+    }
+
+    public void setSegmentIDs(List<String> segmentIDs) {
+        this.segmentIDs = segmentIDs;
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/TableSnapshotResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/TableSnapshotResponse.java
new file mode 100644
index 0000000..0574764
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/TableSnapshotResponse.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.response;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TableSnapshotResponse implements Serializable {
+    private static final long serialVersionUID = -8707176301793624704L;
+    public static final String TYPE_EXT = "ext";
+    public static final String TYPE_INNER = "inner";
+
+    private String snapshotID;
+
+    private String snapshotType; // can be ext or inner
+
+    private String storageType;
+
+    private long lastBuildTime;
+
+    private long sourceTableSize;
+
+    private long sourceTableLastModifyTime;
+
+    private List<String> cubesAndSegmentsUsage;
+
+    public String getSnapshotID() {
+        return snapshotID;
+    }
+
+    public void setSnapshotID(String snapshotID) {
+        this.snapshotID = snapshotID;
+    }
+
+    public String getSnapshotType() {
+        return snapshotType;
+    }
+
+    public void setSnapshotType(String snapshotType) {
+        this.snapshotType = snapshotType;
+    }
+
+    public String getStorageType() {
+        return storageType;
+    }
+
+    public void setStorageType(String storageType) {
+        this.storageType = storageType;
+    }
+
+    public long getLastBuildTime() {
+        return lastBuildTime;
+    }
+
+    public void setLastBuildTime(long lastBuildTime) {
+        this.lastBuildTime = lastBuildTime;
+    }
+
+    public long getSourceTableSize() {
+        return sourceTableSize;
+    }
+
+    public void setSourceTableSize(long sourceTableSize) {
+        this.sourceTableSize = sourceTableSize;
+    }
+
+    public long getSourceTableLastModifyTime() {
+        return sourceTableLastModifyTime;
+    }
+
+    public void setSourceTableLastModifyTime(long sourceTableLastModifyTime) {
+        this.sourceTableLastModifyTime = sourceTableLastModifyTime;
+    }
+
+    public List<String> getCubesAndSegmentsUsage() {
+        return cubesAndSegmentsUsage;
+    }
+
+    public void setCubesAndSegmentsUsage(List<String> cubesAndSegmentsUsage) {
+        this.cubesAndSegmentsUsage = cubesAndSegmentsUsage;
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4317ed5..c0d9e56 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -36,6 +36,8 @@ import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.LookupSnapshotBuildJob;
+import org.apache.kylin.engine.mr.LookupSnapshotJobBuilder;
 import org.apache.kylin.engine.mr.common.JobInfoConverter;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JobInstance;
@@ -385,6 +387,14 @@ public class JobService extends BasicService implements InitializingBean {
         return optimizeJobInstance;
     }
 
+    public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs, String submitter) throws IOException {
+        LookupSnapshotBuildJob job = new LookupSnapshotJobBuilder(cube, lookupTable, segmentIDs, submitter).build();
+        getExecutableManager().addJob(job);
+
+        JobInstance jobInstance = getLookupSnapshotBuildJobInstance(job);
+        return jobInstance;
+    }
+
     private void checkCubeDescSignature(CubeInstance cube) {
         Message msg = MsgPicker.getMsg();
 
@@ -480,6 +490,28 @@ public class JobService extends BasicService implements InitializingBean {
         return result;
     }
 
+    protected JobInstance getLookupSnapshotBuildJobInstance(LookupSnapshotBuildJob job) {
+        if (job == null) {
+            return null;
+        }
+
+        final JobInstance result = new JobInstance();
+        result.setName(job.getName());
+        result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+        result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams()));
+        result.setLastModified(job.getLastModified());
+        result.setSubmitter(job.getSubmitter());
+        result.setUuid(job.getId());
+        result.setType(CubeBuildTypeEnum.BUILD);
+        result.setStatus(JobInfoConverter.parseToJobStatus(job.getStatus()));
+        result.setDuration(job.getDuration() / 1000);
+        for (int i = 0; i < job.getTasks().size(); ++i) {
+            AbstractExecutable task = job.getTasks().get(i);
+            result.addStep(JobInfoConverter.parseToJobStep(task, i, getExecutableManager().getOutput(task.getId())));
+        }
+        return result;
+    }
+
     protected JobInstance getCheckpointJobInstance(AbstractExecutable job) {
         Message msg = MsgPicker.getMsg();
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index ace1686..786daa6 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.service;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,10 +30,19 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -49,6 +59,9 @@ import org.apache.kylin.rest.msg.Message;
 import org.apache.kylin.rest.msg.MsgPicker;
 import org.apache.kylin.rest.response.TableDescResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.rest.response.TableSnapshotResponse;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
 import org.apache.kylin.source.ISourceMetadataExplorer;
 import org.apache.kylin.source.SourceManager;
 import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
@@ -387,6 +400,109 @@ public class TableService extends BasicService {
         }
     }
 
+    public void updateSnapshotLocalCache(String project, String tableName, String snapshotID) {
+        ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+        ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
+        TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
+        if (extTableSnapshotInfo == null) {
+            throw new IllegalArgumentException("cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
+        }
+        LookupProviderFactory.rebuildLocalCache(tableDesc, extTableSnapshotInfo);
+    }
+
+    public void removeSnapshotLocalCache(String tableName, String snapshotID) {
+        ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+        ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
+        if (extTableSnapshotInfo == null) {
+            throw new IllegalArgumentException("cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
+        }
+        LookupProviderFactory.removeLocalCache(extTableSnapshotInfo);
+    }
+
+    public String getSnapshotLocalCacheState(String tableName, String snapshotID) {
+        ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+        ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
+        if (extTableSnapshotInfo == null) {
+            throw new IllegalArgumentException("cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
+        }
+        CacheState cacheState = LookupProviderFactory.getCacheState(extTableSnapshotInfo);
+        return cacheState.name();
+    }
+
+    public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException {
+        TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
+        IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+        TableSignature signature = hiveTable.getSignature();
+        return internalGetLookupTableSnapshots(tableName, signature);
+    }
+
+    List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature) throws IOException {
+        ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+        SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
+        List<ExtTableSnapshotInfo> extTableSnapshots = extSnapshotInfoManager.getSnapshots(tableName);
+        List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
+
+        Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
+
+        List<TableSnapshotResponse> result = Lists.newArrayList();
+        for (ExtTableSnapshotInfo extTableSnapshot : extTableSnapshots) {
+            TableSnapshotResponse response = new TableSnapshotResponse();
+            response.setSnapshotID(extTableSnapshot.getId());
+            response.setSnapshotType(TableSnapshotResponse.TYPE_EXT);
+            response.setLastBuildTime(extTableSnapshot.getLastBuildTime());
+            response.setStorageType(extTableSnapshot.getStorageType());
+            response.setSourceTableSize(extTableSnapshot.getSignature().getSize());
+            response.setSourceTableLastModifyTime(extTableSnapshot.getSignature().getLastModifiedTime());
+            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(extTableSnapshot.getResourcePath()));
+            result.add(response);
+        }
+
+        for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
+            TableSnapshotResponse response = new TableSnapshotResponse();
+            response.setSnapshotID(metaStoreTableSnapshot.getId());
+            response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
+            response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
+            response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
+            response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
+            response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
+            response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
+            result.add(response);
+        }
+
+        return result;
+    }
+
+    /**
+     * @return Map of SnapshotID, CubeName or SegmentName list
+     */
+    private Map<String, List<String>> getSnapshotUsages() {
+        CubeManager cubeManager = CubeManager.getInstance(getConfig());
+        Map<String, List<String>> snapshotCubeSegmentMap = Maps.newHashMap();
+        for (CubeInstance cube : cubeManager.listAllCubes()) {
+            Collection<String> cubeSnapshots = cube.getSnapshots().values();
+            for (String cubeSnapshot : cubeSnapshots) {
+                List<String> usages = snapshotCubeSegmentMap.get(cubeSnapshot);
+                if (usages == null) {
+                    usages = Lists.newArrayList();
+                    snapshotCubeSegmentMap.put(cubeSnapshot, usages);
+                }
+                usages.add(cube.getName());
+            }
+            for (CubeSegment segment : cube.getSegments()) {
+                Collection<String> segmentSnapshots = segment.getSnapshotPaths();
+                for (String segmentSnapshot : segmentSnapshots) {
+                    List<String> usages = snapshotCubeSegmentMap.get(segmentSnapshot);
+                    if (usages == null) {
+                        usages = Lists.newArrayList();
+                        snapshotCubeSegmentMap.put(segmentSnapshot, usages);
+                    }
+                    usages.add(cube.getName() + ":" + segment.getName());
+                }
+            }
+        }
+        return snapshotCubeSegmentMap;
+    }
+
     /**
      * Generate cardinality for table This will trigger a hadoop job
      * The result will be merged into table exd info
diff --git a/server-base/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java b/server-base/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
new file mode 100644
index 0000000..86d34ac
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.rest.response.TableSnapshotResponse;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TableServiceTest extends LocalFileMetadataTestCase {
+    private TableService tableService;
+
+    @Before
+    public void setUp() {
+        this.createTestMetadata();
+        tableService = new TableService();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGetTableSnapshots() throws IOException {
+        TableSignature tableSignature = new TableSignature("TEST_CAL_DT.csv", 100, System.currentTimeMillis());
+        List<TableSnapshotResponse> snapshotResponseList = tableService.internalGetLookupTableSnapshots("EDW.TEST_CAL_DT", tableSignature);
+        Assert.assertEquals(8, snapshotResponseList.size());
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.

[kylin] 02/05: KYLIN-3374 Some improvements for lookup table - metadata change

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3221
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7f8d0181cfeccd7dce2ada70f1906a586aba7b80
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu May 10 12:38:46 2018 +0800

    KYLIN-3374 Some improvements for lookup table - metadata change
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../kylin/common/persistence/ResourceStore.java    |   1 +
 .../java/org/apache/kylin/cube/CubeInstance.java   |  18 ++
 .../java/org/apache/kylin/cube/model/CubeDesc.java |  47 +++++
 .../apache/kylin/cube/model/SnapshotTableDesc.java |  75 ++++++++
 .../kylin/dict/lookup/ExtTableSnapshotInfo.java    | 149 +++++++++++++++
 .../dict/lookup/ExtTableSnapshotInfoManager.java   | 209 +++++++++++++++++++++
 .../apache/kylin/dict/lookup/SnapshotManager.java  |  15 ++
 .../apache/kylin/dict/lookup/SnapshotTable.java    |  25 ++-
 8 files changed, 536 insertions(+), 3 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index bda6cd0..a71db45 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -77,6 +77,7 @@ abstract public class ResourceStore {
     public static final String BAD_QUERY_RESOURCE_ROOT = "/bad_query";
     public static final String DRAFT_RESOURCE_ROOT = "/draft";
     public static final String USER_ROOT = "/user";
+    public static final String EXT_SNAPSHOT_RESOURCE_ROOT = "/ext_table_snapshot";
 
     public static final String METASTORE_UUID_TAG = "/UUID";
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 035cf7b..bc6083e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -124,6 +125,9 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
     @JsonProperty("cuboid_last_optimized")
     private long cuboidLastOptimized;
 
+    @JsonProperty("snapshots")
+    private Map<String, String> snapshots = Maps.newHashMap();
+
     // cuboid scheduler lazy built
     transient private CuboidScheduler cuboidScheduler;
 
@@ -652,6 +656,20 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         return getDescriptor().getEngineType();
     }
 
+    public Map<String, String> getSnapshots() {
+        if (snapshots == null)
+            snapshots = Maps.newHashMap();
+        return snapshots;
+    }
+
+    public String getSnapshotResPath(String tableName) {
+        return getSnapshots().get(tableName);
+    }
+
+    public void putSnapshotResPath(String table, String snapshotResPath) {
+        getSnapshots().put(table, snapshotResPath);
+    }
+
     public static CubeInstance getCopyOf(CubeInstance cubeInstance) {
         CubeInstance newCube = new CubeInstance();
         newCube.setName(cubeInstance.getName());
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 77b808b..5b4a134 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -203,6 +203,9 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
     // Error messages during resolving json metadata
     private List<String> errors = new ArrayList<String>();
 
+    @JsonProperty("snapshot_table_desc_list")
+    private List<SnapshotTableDesc> snapshotTableDescList = Collections.emptyList();
+
     private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<>();
     private LinkedHashSet<ColumnDesc> allColumnDescs = new LinkedHashSet<>();
     private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<>();
@@ -1371,6 +1374,49 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
         return null;
     }
 
+    public List<SnapshotTableDesc> getSnapshotTableDescList() {
+        return snapshotTableDescList;
+    }
+
+    public void setSnapshotTableDescList(List<SnapshotTableDesc> snapshotTableDescList) {
+        this.snapshotTableDescList = snapshotTableDescList;
+    }
+
+    public SnapshotTableDesc getSnapshotTableDesc(String tableName) {
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) {
+            if (snapshotTableDesc.getTableName().equalsIgnoreCase(tableName)) {
+                return snapshotTableDesc;
+            }
+        }
+        return null;
+    }
+
+    public boolean isGlobalSnapshotTable(String tableName) {
+        SnapshotTableDesc desc = getSnapshotTableDesc(tableName);
+        if (desc == null) {
+            return false;
+        }
+        return desc.isGlobal();
+    }
+
+    public boolean isExtSnapshotTable(String tableName) {
+        SnapshotTableDesc desc = getSnapshotTableDesc(tableName);
+        if (desc == null) {
+            return false;
+        }
+        return desc.isExtSnapshotTable();
+    }
+    
+    public List<String> getAllExtLookupSnapshotTypes() {
+        List<String> result = Lists.newArrayList();
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) {
+            if (snapshotTableDesc.isExtSnapshotTable()) {
+                result.add(snapshotTableDesc.getStorageType());
+            }
+        }
+        return result;
+    }
+
     /** Get a column which can be used to cluster the source table.
      * To reduce memory footprint in base cuboid for global dict */
     // TODO handle more than one ultra high cardinality columns use global dict in one cube
@@ -1465,6 +1511,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
         newCubeDesc.setPartitionOffsetStart(cubeDesc.getPartitionOffsetStart());
         newCubeDesc.setVersion(cubeDesc.getVersion());
         newCubeDesc.setParentForward(cubeDesc.getParentForward());
+        newCubeDesc.setSnapshotTableDescList(cubeDesc.getSnapshotTableDescList());
         newCubeDesc.updateRandomUuid();
         return newCubeDesc;
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
new file mode 100644
index 0000000..e61240b
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/SnapshotTableDesc.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.model;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class SnapshotTableDesc implements java.io.Serializable{
+    @JsonProperty("table_name")
+    private String tableName;
+
+    @JsonProperty("storage_type")
+    private String storageType = SnapshotTable.STORAGE_TYPE_METASTORE;
+
+    @JsonProperty("local_cache_enable")
+    private boolean enableLocalCache = true;
+
+    @JsonProperty("global")
+    private boolean global = false;
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public String getStorageType() {
+        return storageType;
+    }
+
+    public void setStorageType(String storageType) {
+        this.storageType = storageType;
+    }
+
+    public boolean isGlobal() {
+        return global;
+    }
+
+    public void setGlobal(boolean global) {
+        this.global = global;
+    }
+
+    public boolean isExtSnapshotTable() {
+        return !SnapshotTable.STORAGE_TYPE_METASTORE.equals(storageType);
+    }
+
+    public boolean isEnableLocalCache() {
+        return enableLocalCache;
+    }
+
+    public void setEnableLocalCache(boolean enableLocalCache) {
+        this.enableLocalCache = enableLocalCache;
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfo.java
new file mode 100644
index 0000000..80a2b77
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfo.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@SuppressWarnings("serial")
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class ExtTableSnapshotInfo extends RootPersistentEntity {
+    public static final String STORAGE_TYPE_HBASE = "hbase";
+
+    @JsonProperty("tableName")
+    private String tableName;
+
+    @JsonProperty("signature")
+    private TableSignature signature;
+
+    @JsonProperty("key_columns")
+    private String[] keyColumns;
+
+    @JsonProperty("storage_type")
+    private String storageType;
+
+    @JsonProperty("storage_location_identifier")
+    private String storageLocationIdentifier;
+
+    @JsonProperty("shard_num")
+    private int shardNum;
+
+    @JsonProperty("row_cnt")
+    private long rowCnt;
+
+    @JsonProperty("last_build_time")
+    private long lastBuildTime;
+
+    // default constructor for JSON serialization
+    public ExtTableSnapshotInfo() {
+    }
+
+    public ExtTableSnapshotInfo(TableSignature signature, String tableName) throws IOException {
+        this.signature = signature;
+        this.tableName = tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public String getResourcePath() {
+        return getResourcePath(tableName, uuid);
+    }
+
+    public String getResourceDir() {
+        return getResourceDir(tableName);
+    }
+
+    public static String getResourcePath(String tableName, String uuid) {
+        return getResourceDir(tableName) + "/" + uuid + ".snapshot";
+    }
+
+    public static String getResourceDir(String tableName) {
+        return ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT + "/" + tableName;
+    }
+
+    public TableSignature getSignature() {
+        return signature;
+    }
+
+    public String getStorageType() {
+        return storageType;
+    }
+
+    public void setStorageType(String storageType) {
+        this.storageType = storageType;
+    }
+
+    public String getStorageLocationIdentifier() {
+        return storageLocationIdentifier;
+    }
+
+    public void setStorageLocationIdentifier(String storageLocationIdentifier) {
+        this.storageLocationIdentifier = storageLocationIdentifier;
+    }
+
+    public String[] getKeyColumns() {
+        return keyColumns;
+    }
+
+    public void setKeyColumns(String[] keyColumns) {
+        this.keyColumns = keyColumns;
+    }
+
+    public int getShardNum() {
+        return shardNum;
+    }
+
+    public void setShardNum(int shardNum) {
+        this.shardNum = shardNum;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setSignature(TableSignature signature) {
+        this.signature = signature;
+    }
+
+    public long getRowCnt() {
+        return rowCnt;
+    }
+
+    public void setRowCnt(long rowCnt) {
+        this.rowCnt = rowCnt;
+    }
+
+    public long getLastBuildTime() {
+        return lastBuildTime;
+    }
+
+    public void setLastBuildTime(long lastBuildTime) {
+        this.lastBuildTime = lastBuildTime;
+    }
+
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfoManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfoManager.java
new file mode 100644
index 0000000..1892e57
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ExtTableSnapshotInfoManager.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
+public class ExtTableSnapshotInfoManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(ExtTableSnapshotInfoManager.class);
+    public static Serializer<ExtTableSnapshotInfo> SNAPSHOT_SERIALIZER = new JsonSerializer<>(ExtTableSnapshotInfo.class);
+
+    // static cached instances
+    private static final ConcurrentMap<KylinConfig, ExtTableSnapshotInfoManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, ExtTableSnapshotInfoManager>();
+
+    public static ExtTableSnapshotInfoManager getInstance(KylinConfig config) {
+        ExtTableSnapshotInfoManager r = SERVICE_CACHE.get(config);
+        if (r == null) {
+            synchronized (ExtTableSnapshotInfoManager.class) {
+                r = SERVICE_CACHE.get(config);
+                if (r == null) {
+                    r = new ExtTableSnapshotInfoManager(config);
+                    SERVICE_CACHE.put(config, r);
+                    if (SERVICE_CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return r;
+    }
+
+    public static void clearCache() {
+        synchronized (SERVICE_CACHE) {
+            SERVICE_CACHE.clear();
+        }
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
+    private LoadingCache<String, ExtTableSnapshotInfo> snapshotCache; // resource
+
+    private ExtTableSnapshotInfoManager(KylinConfig config) {
+        this.config = config;
+        this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, ExtTableSnapshotInfo>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, ExtTableSnapshotInfo> notification) {
+                ExtTableSnapshotInfoManager.logger.info("Snapshot with resource path " + notification.getKey()
+                        + " is removed due to " + notification.getCause());
+            }
+        }).maximumSize(1000)//
+                .expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, ExtTableSnapshotInfo>() {
+                    @Override
+                    public ExtTableSnapshotInfo load(String key) throws Exception {
+                        ExtTableSnapshotInfo snapshot = ExtTableSnapshotInfoManager.this.load(key);
+                        return snapshot;
+                    }
+                });
+    }
+
+    public boolean hasLatestSnapshot(TableSignature signature, String tableName) throws IOException {
+        ExtTableSnapshotInfo snapshot = new ExtTableSnapshotInfo(signature, tableName);
+        snapshot.updateRandomUuid();
+        ExtTableSnapshotInfo dupSnapshot = checkDupByInfo(snapshot);
+        if (dupSnapshot != null) {
+            return true;
+        }
+        return false;
+    }
+
+    public ExtTableSnapshotInfo getSnapshot(String snapshotResPath) {
+        try {
+            return snapshotCache.get(snapshotResPath);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public ExtTableSnapshotInfo getSnapshot(String tableName, String snapshotID) {
+         return getSnapshot(ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID));
+    }
+
+    public List<ExtTableSnapshotInfo> getSnapshots(String tableName) throws IOException {
+        String tableSnapshotsPath = ExtTableSnapshotInfo.getResourceDir(tableName);
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        return store.getAllResources(tableSnapshotsPath, ExtTableSnapshotInfo.class, SNAPSHOT_SERIALIZER);
+    }
+
+    public Set<String> getAllExtSnapshotResPaths() throws IOException {
+        Set<String> result = Sets.newHashSet();
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        Set<String> snapshotTablePaths = store.listResources(ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT);
+        if (snapshotTablePaths == null) {
+            return result;
+        }
+        for (String snapshotTablePath : snapshotTablePaths) {
+            Set<String> snapshotPaths = store.listResources(snapshotTablePath);
+            if (snapshotPaths != null) {
+                result.addAll(snapshotPaths);
+            }
+        }
+        return result;
+    }
+
+    public void removeSnapshot(String tableName, String snapshotID) throws IOException {
+        String snapshotResPath = ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID);
+        snapshotCache.invalidate(snapshotResPath);
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        store.deleteResource(snapshotResPath);
+    }
+
+    /**
+     * create ext table snapshot
+     * @param signature
+     * @param tableName
+     * @param keyColumns
+     *@param storageType
+     * @param storageLocation   @return created snapshot
+     * @throws IOException
+     */
+    public ExtTableSnapshotInfo createSnapshot(TableSignature signature, String tableName, String snapshotID, String[] keyColumns,
+                                               int shardNum, String storageType, String storageLocation) throws IOException {
+        ExtTableSnapshotInfo snapshot = new ExtTableSnapshotInfo();
+        snapshot.setUuid(snapshotID);
+        snapshot.setSignature(signature);
+        snapshot.setTableName(tableName);
+        snapshot.setKeyColumns(keyColumns);
+        snapshot.setStorageType(storageType);
+        snapshot.setStorageLocationIdentifier(storageLocation);
+        snapshot.setShardNum(shardNum);
+        save(snapshot);
+        return snapshot;
+    }
+
+    public void updateSnapshot(ExtTableSnapshotInfo extTableSnapshot) throws IOException {
+        save(extTableSnapshot);
+        snapshotCache.invalidate(extTableSnapshot.getResourcePath());
+    }
+
+    private ExtTableSnapshotInfo checkDupByInfo(ExtTableSnapshotInfo snapshot) throws IOException {
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        String resourceDir = snapshot.getResourceDir();
+        NavigableSet<String> existings = store.listResources(resourceDir);
+        if (existings == null)
+            return null;
+
+        TableSignature sig = snapshot.getSignature();
+        for (String existing : existings) {
+            ExtTableSnapshotInfo existingSnapshot = load(existing);
+            // direct load from store
+            if (existingSnapshot != null && sig.equals(existingSnapshot.getSignature()))
+                return existingSnapshot;
+        }
+        return null;
+    }
+
+    private ExtTableSnapshotInfo load(String resourcePath) throws IOException {
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        ExtTableSnapshotInfo snapshot = store.getResource(resourcePath, ExtTableSnapshotInfo.class, SNAPSHOT_SERIALIZER);
+
+        return snapshot;
+    }
+
+    private void save(ExtTableSnapshotInfo snapshot) throws IOException {
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        String path = snapshot.getResourcePath();
+        store.putResource(path, snapshot, SNAPSHOT_SERIALIZER);
+    }
+
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 5192805..dd90b33 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -19,12 +19,15 @@
 package org.apache.kylin.dict.lookup;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.NavigableSet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
 import org.apache.kylin.source.IReadableTable.TableSignature;
@@ -94,6 +97,18 @@ public class SnapshotManager {
         }
     }
 
+    public List<SnapshotTable> getSnapshots(String tableName, TableSignature sourceTableSignature) throws IOException {
+        List<SnapshotTable> result = Lists.newArrayList();
+        String tableSnapshotsPath = SnapshotTable.getResourceDir(tableName);
+        ResourceStore store = TableMetadataManager.getInstance(this.config).getStore();
+        result.addAll(store.getAllResources(tableSnapshotsPath, SnapshotTable.class, SnapshotTableSerializer.INFO_SERIALIZER));
+        if (sourceTableSignature != null) {
+            String oldTableSnapshotsPath = SnapshotTable.getOldResourceDir(sourceTableSignature);
+            result.addAll(store.getAllResources(oldTableSnapshotsPath, SnapshotTable.class, SnapshotTableSerializer.INFO_SERIALIZER));
+        }
+        return result;
+    }
+
     public void removeSnapshot(String resourcePath) throws IOException {
         ResourceStore store = getStore();
         store.deleteResource(resourcePath);
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index 7bf32b1..6cae22b 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -50,6 +50,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 @SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class SnapshotTable extends RootPersistentEntity implements IReadableTable {
+    public static final String STORAGE_TYPE_METASTORE = "metaStore";
 
     @JsonProperty("tableName")
     private String tableName;
@@ -57,6 +58,8 @@ public class SnapshotTable extends RootPersistentEntity implements IReadableTabl
     private TableSignature signature;
     @JsonProperty("useDictionary")
     private boolean useDictionary;
+    @JsonProperty("last_build_time")
+    private long lastBuildTime;
 
     private ArrayList<int[]> rowIndices;
     private Dictionary<String> dict;
@@ -71,6 +74,18 @@ public class SnapshotTable extends RootPersistentEntity implements IReadableTabl
         this.useDictionary = true;
     }
 
+    public long getLastBuildTime() {
+        return lastBuildTime;
+    }
+
+    public void setLastBuildTime(long lastBuildTime) {
+        this.lastBuildTime = lastBuildTime;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
     public void takeSnapshot(IReadableTable table, TableDesc tableDesc) throws IOException {
         this.signature = table.getSignature();
 
@@ -121,13 +136,17 @@ public class SnapshotTable extends RootPersistentEntity implements IReadableTabl
 
     public String getResourceDir() {
         if (Strings.isNullOrEmpty(tableName)) {
-            return getOldResourceDir();
+            return getOldResourceDir(signature);
         } else {
-            return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + tableName;
+            return getResourceDir(tableName);
         }
     }
 
-    private String getOldResourceDir() {
+    public static String getResourceDir(String tableName) {
+        return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + tableName;
+    }
+
+    public static String getOldResourceDir(TableSignature signature) {
         return ResourceStore.SNAPSHOT_RESOURCE_ROOT + "/" + new File(signature.getPath()).getName();
     }
 

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.

[kylin] 01/05: KYLIN-3373 Some improvements for lookup table - UI part change

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3221
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit fd6dd9cf279df1928f636aa8114654d8c69fde7c
Author: liapan <li...@ebay.com>
AuthorDate: Thu May 10 11:54:27 2018 +0800

    KYLIN-3373 Some improvements for lookup table - UI part change
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 webapp/app/js/controllers/cubeAdvanceSetting.js    |  56 +++++++++
 webapp/app/js/controllers/cubes.js                 | 127 +++++++++++++++++++++
 webapp/app/js/controllers/sourceMeta.js            |  28 +++++
 webapp/app/js/directives/select.js                 |   2 +-
 webapp/app/js/model/cubeConfig.js                  |   4 +
 webapp/app/js/model/tableConfig.js                 |  10 +-
 webapp/app/js/services/cubes.js                    |   3 +-
 webapp/app/js/services/tables.js                   |   3 +-
 webapp/app/less/app.less                           |   7 ++
 .../partials/cubeDesigner/advanced_settings.html   |  99 ++++++++++++++++
 webapp/app/partials/cubes/cubes.html               |   2 +
 webapp/app/partials/jobs/lookup_refresh.html       |  71 ++++++++++++
 webapp/app/partials/tables/table_detail.html       |  50 +++++++-
 13 files changed, 457 insertions(+), 5 deletions(-)

diff --git a/webapp/app/js/controllers/cubeAdvanceSetting.js b/webapp/app/js/controllers/cubeAdvanceSetting.js
index 5687e12..211b4c8 100755
--- a/webapp/app/js/controllers/cubeAdvanceSetting.js
+++ b/webapp/app/js/controllers/cubeAdvanceSetting.js
@@ -445,4 +445,60 @@ KylinApp.controller('CubeAdvanceSettingCtrl', function ($scope, $modal,cubeConfi
       $scope.$emit('AdvancedSettingEdited');
     });
   }
+
+  $scope.newSnapshot = {
+    select: {}
+  };
+
+  $scope.removeSnapshotTable = function(index) {
+    $scope.cubeMetaFrame.snapshot_table_desc_list.splice(index, 1);
+  };
+
+  $scope.addSnapshot = function(newSnapshot) {
+    if (!newSnapshot.table_name || !newSnapshot.storage_type) {
+      swal('Oops...', 'Snapshot table name or storage should not be empty', 'warning');
+      return;
+    } else if ($scope.cubeMetaFrame.snapshot_table_desc_list.length){
+      var existSnapshot = _.find($scope.cubeMetaFrame.snapshot_table_desc_list, function(snapshot){ return snapshot.table_name === newSnapshot.table_name;});
+      if (!!existSnapshot) {
+        swal('Oops...', 'Snapshot table already existed', 'warning');
+        return;
+      }
+    }
+    $scope.cubeMetaFrame.snapshot_table_desc_list.push(angular.copy(newSnapshot));
+    $scope.newSnapshot.select = {};
+  };
+
+  $scope.changeSnapshotStorage = function(snapshot) {
+    if (snapshot.storage_type == 'hbase') {
+      snapshot.global = true;
+    }
+  };
+
+  $scope.changeSnapshotTable = function(changeSnapshot, beforeTableName, snapshotTableDescList) {
+    var existSnapshot = _.find(snapshotTableDescList, function(snapshot) {
+      return snapshot.table_name === changeSnapshot.table_name;
+    });
+    if (!!existSnapshot) {
+      changeSnapshot.table_name = beforeTableName;
+      swal('Oops...', 'Snapshot table already existed', 'warning');
+    }
+  };
+
+  $scope.getCubeLookups = function() {
+    var modelDesc = modelsManager.getModel($scope.cubeMetaFrame.model_name);
+    var modelLookups = modelDesc ? modelDesc.lookups : [];
+    var cubeLookups = [];
+    angular.forEach(modelLookups, function(modelLookup, index) {
+      var dimensionLookup = _.find($scope.cubeMetaFrame.dimensions, function(dimension){ return dimension.table === modelLookup.alias;});
+      if (!!dimensionLookup) {
+        if (cubeLookups.indexOf(modelLookup.table) === -1) {
+          cubeLookups.push(modelLookup.table);
+        }
+      }
+    });
+    return cubeLookups;
+  };
+
+  $scope.cubeLookups = $scope.getCubeLookups();
 });
diff --git a/webapp/app/js/controllers/cubes.js b/webapp/app/js/controllers/cubes.js
index cbd6fad..136d86e 100644
--- a/webapp/app/js/controllers/cubes.js
+++ b/webapp/app/js/controllers/cubes.js
@@ -542,6 +542,27 @@ KylinApp.controller('CubesCtrl', function ($scope, $q, $routeParams, $location,
        });
      };
 
+    $scope.startLookupRefresh = function(cube) {
+      $scope.loadDetail(cube).then(function () {
+        $scope.metaModel={
+          model:cube.model
+        };
+        $modal.open({
+          templateUrl: 'lookupRefresh.html',
+          controller: lookupRefreshCtrl,
+          resolve: {
+            cube: function () {
+              return cube;
+            },
+            scope:function(){
+              return $scope;
+            }
+          }
+        });
+        }
+      );
+    };
+
   });
 
 
@@ -780,3 +801,109 @@ var deleteSegmentCtrl = function($scope, $modalInstance, CubeService, SweetAlert
     });
   };
 };
+
+var lookupRefreshCtrl = function($scope, scope, CubeList, $modalInstance, CubeService, cube, SweetAlert, loadingRequest) {
+  $scope.cubeList = CubeList;
+  $scope.cube = cube;
+  $scope.dispalySegment = false;
+
+  $scope.getLookups = function() {
+    var modelLookups = cube.model ? cube.model.lookups : [];
+    var cubeLookups = [];
+    angular.forEach(modelLookups, function(modelLookup, index) {
+      var dimensionTables = _.find(cube.detail.dimensions, function(dimension){ return dimension.table === modelLookup.alias;});
+      if (!!dimensionTables) {
+        if (cubeLookups.indexOf(modelLookup.table) === -1) {
+          cubeLookups.push(modelLookup.table);
+        }
+      }
+    });
+    return cubeLookups;
+  };
+
+  $scope.cubeLookups = $scope.getLookups();
+
+  $scope.lookup = {
+    select: {}
+  };
+
+  $scope.getReadySegment = function(segment) {
+    return segment.status === 'READY';
+  };
+
+  $scope.cancel = function () {
+    $modalInstance.dismiss('cancel');
+  };
+
+  $scope.updateLookupTable = function(tableName) {
+    var lookupTable = _.find(cube.detail.snapshot_table_desc_list, function(table){ return table.table_name == tableName});
+    if (!!lookupTable && lookupTable.global) {
+      $scope.dispalySegment = false;
+      $scope.lookup.select.segments = [];
+    } else {
+      $scope.dispalySegment = true;
+    }
+  };
+
+  $scope.selectAllSegments = function(allSegments) {
+    if (allSegments) {
+      $scope.lookup.select.segments = $scope.cube.segments;
+    } else {
+      $scope.lookup.select.segments = [];
+    }
+  };
+
+  $scope.refresh = function() {
+    if (!$scope.lookup.select.table_name) {
+      SweetAlert.swal('Warning', 'Lookup table should not be empty', 'warning');
+      return;
+    }
+
+    // cube advance lookup table
+    var lookupTable = _.find(cube.detail.snapshot_table_desc_list, function(table){ return table.table_name == $scope.lookup.select.table_name});
+    if (!!lookupTable) {
+      if (!lookupTable.global && $scope.lookup.select.segments.length == 0) {
+        SweetAlert.swal('Warning', 'Segment should not be empty', 'warning');
+        return;
+      }
+    } else {
+      // cube lookup table
+      lookupTable = _.find($scope.cubeLookups, function(table){ return table == $scope.lookup.select.table_name});
+      if (!lookupTable) {
+        SweetAlert.swal('Warning', 'Lookup table not existed in cube', 'warning');
+        return;
+      } else {
+        if ($scope.lookup.select.segments.length == 0) {
+          SweetAlert.swal('Warning', 'Segment should not be empty', 'warning');
+          return;
+        }
+      }
+    }
+
+    var lookupSnapshotBuildRequest = {
+      lookupTableName: $scope.lookup.select.table_name,
+      segmentIDs: _.map($scope.lookup.select.segments, function(segment){ return segment.uuid})
+    };
+
+    loadingRequest.show();
+    CubeService.lookupRefresh({cubeId: cube.name}, lookupSnapshotBuildRequest, function (job) {
+      loadingRequest.hide();
+      $modalInstance.dismiss('cancel');
+      SweetAlert.swal('Success!', 'Lookup refresh job was submitted successfully', 'success');
+      scope.refreshCube(cube).then(function(_cube){
+          $scope.cubeList.cubes[$scope.cubeList.cubes.indexOf(cube)] = _cube;
+        });
+    }, function (e) {
+       loadingRequest.hide();
+      if (e.data && e.data.exception) {
+        var message = e.data.exception;
+
+        var msg = !!(message) ? message : 'Failed to take action.';
+        SweetAlert.swal('Oops...', msg, 'error');
+      } else {
+        SweetAlert.swal('Oops...', "Failed to take action.", 'error');
+      }
+    });
+  };
+
+};
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index 8a795d5..49d8998 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -925,3 +925,31 @@ KylinApp
 
   });
 
+/*snapshot controller*/
+KylinApp
+  .controller('TableSnapshotCtrl', function ($scope, TableService, CubeService, uiGridConstants) {
+    $scope.initSnapshots = function() {
+      var tableFullName = $scope.tableModel.selectedSrcTable.database + '.' + $scope.tableModel.selectedSrcTable.name
+      TableService.getSnapshots({tableName: tableFullName, pro: $scope.projectModel.selectedProject}, {}, function (data) {
+        var orgData = JSON.parse(angular.toJson(data));
+        angular.forEach(orgData, function(snapshot) {
+          if(!!snapshot.cubesAndSegmentsUsage && snapshot.cubesAndSegmentsUsage.length > 0) {
+            snapshot.usageInfo = '';
+            angular.forEach(snapshot.cubesAndSegmentsUsage, function(info) {
+              snapshot.usageInfo += info;
+              snapshot.usageInfo += '</br>';
+            });
+          } else {
+            snapshot.usageInfo = 'No Usage Info';
+          }
+        });
+        $scope.tableSnapshots = orgData;
+      });
+    };
+    $scope.$watch('tableModel.selectedSrcTable', function (newValue, oldValue) {
+      if (!newValue || !newValue.name) {
+        return;
+      }
+      $scope.initSnapshots();
+    });
+  }); 
\ No newline at end of file
diff --git a/webapp/app/js/directives/select.js b/webapp/app/js/directives/select.js
index 7327af9..038cf57 100644
--- a/webapp/app/js/directives/select.js
+++ b/webapp/app/js/directives/select.js
@@ -1627,7 +1627,7 @@ uis.directive('uiSelectMultiple', ['uiSelectMinErr','$timeout', function(uiSelec
                   }
                 }
             }
-            if (angular.equals(result.toUpperCase(),value.toUpperCase())){
+            if (angular.equals((typeof result =='string') ? result.toUpperCase() : result, (typeof value == 'string') ? value.toUpperCase() : value)) {
               resultMultiple.unshift(list[p]);
               return true;
             }
diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js
index e163d75..eacb915 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -114,6 +114,10 @@ KylinApp.constant('cubeConfig', {
     {name:"Segment Dictionary", value:"org.apache.kylin.dict.global.SegmentAppendTrieDictBuilder"}
   ],
   needSetLengthEncodingList:['fixed_length','fixed_length_hex','int','integer'],
+  snapshotStorageTypes: [
+    {name: 'Meta Store', value: 'metaStore'},
+    {name: 'HBase', value: 'hbase'}
+  ],
   baseChartOptions: {
     chart: {
       type: 'sunburstChart',
diff --git a/webapp/app/js/model/tableConfig.js b/webapp/app/js/model/tableConfig.js
index 3989531..0dc4a52 100644
--- a/webapp/app/js/model/tableConfig.js
+++ b/webapp/app/js/model/tableConfig.js
@@ -121,7 +121,15 @@ KylinApp.constant('tableConfig', {
       "fixed_length_hex",
       "integer"
     ]
-  }
+  },
+  snapshotTheaditems: [
+    {attr: 'snapshotID', name: 'ID'},
+    {attr: 'storageType', name: 'Storage Type'},
+    {attr: 'lastBuildTime', name: 'Last Build Time'},
+    {attr: 'sourceTableLastModifyTime', name: 'Source Table Last Modify Time'},
+    {attr: 'sourceTableSize', name: 'Size'},
+    {attr: 'usageInfo', name: 'Useage Info'}
+  ]
 
 
 });
diff --git a/webapp/app/js/services/cubes.js b/webapp/app/js/services/cubes.js
index 6140521..537e5c1 100644
--- a/webapp/app/js/services/cubes.js
+++ b/webapp/app/js/services/cubes.js
@@ -79,6 +79,7 @@ KylinApp.factory('CubeService', ['$resource', function ($resource, config) {
       }
     },
     optimize: {method: 'PUT', params: {action: 'optimize'}, isArray: false},
-    autoMigrate: {method: 'POST', params: {action: 'migrate'}, isArray: false}
+    autoMigrate: {method: 'POST', params: {action: 'migrate'}, isArray: false},
+    lookupRefresh: {method: 'PUT', params: {action: 'refresh_lookup'}, isArray: false}
   });
 }]);
diff --git a/webapp/app/js/services/tables.js b/webapp/app/js/services/tables.js
index 7d8bc1a..9a62cf8 100755
--- a/webapp/app/js/services/tables.js
+++ b/webapp/app/js/services/tables.js
@@ -24,6 +24,7 @@ KylinApp.factory('TableService', ['$resource', function ($resource, config) {
     unLoadHiveTable: {method: 'DELETE', params: {}, isArray: false},
     genCardinality: {method: 'PUT', params: {action: 'cardinality'}, isArray: false},
     showHiveDatabases: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true},
-    showHiveTables: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true}
+    showHiveTables: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true},
+    getSnapshots: {method: 'GET', params: {action: 'snapshots'}, isArray: true}
   });
 }]);
diff --git a/webapp/app/less/app.less b/webapp/app/less/app.less
index eca76ff..7aa7283 100644
--- a/webapp/app/less/app.less
+++ b/webapp/app/less/app.less
@@ -929,4 +929,11 @@ pre {
 div[title="Cube Info Detail"].popover {
   max-width: 1024px;
   with: min-content;
+}
+/*snapshot usage info tooltip*/
+td.snapshot-usage .tooltip {
+  font-size: 16px;
+}
+td.snapshot-usage .tooltip-inner {
+  max-width: 1024px;
 }
\ No newline at end of file
diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html b/webapp/app/partials/cubeDesigner/advanced_settings.html
index 7b9193b..fb7a1b1 100755
--- a/webapp/app/partials/cubeDesigner/advanced_settings.html
+++ b/webapp/app/partials/cubeDesigner/advanced_settings.html
@@ -503,6 +503,99 @@
              <button class="btn btn-link" ng-click="clearNewDictionaries()">Cancel</button>
            </div>
          </div>
+         <!-- Advanced Lookup Table-->
+         <div class="form-group large-popover" style="margin-bottom:30px;">
+           <h3 style="margin-left:42px;margin-bottom:30px;">Advanced Snapshot Table  <i kylinpopover placement="right" title="Cube Engine" template="AdvanceSnapshotTableTip.html" class="fa fa-info-circle"></i></h3>
+           <div style="margin-left:42px">
+             <!-- edit mode-->
+             <div ng-if="state.mode=='edit'" class="box-body">
+               <table class="table table-hover table-bordered list" style="table-layout: fixed;margin-left:42px;width:92%;">
+                 <thead>
+                   <tr>
+                     <th style="width:60%">Snapshot Table</th>
+                     <th style="width:25%">Type</th>
+                     <th style="width:10%">Global</th>
+                     <th style="width:5%"></th>
+                   </tr>
+                 </thead>
+                 <tbody>
+                   <tr ng-repeat="snapshot in cubeMetaFrame.snapshot_table_desc_list track by $index">
+                     <td>
+                       <select style="width:95%" chosen ng-model="snapshot.table_name"
+                               ng-change="changeSnapshotTable(snapshot, '{{snapshot.table_name}}', {{cubeMetaFrame.snapshot_table_desc_list}})"
+                               ng-options="tableName as tableName for tableName in cubeLookups">
+                         <option value=""></option>
+                       </select>
+                     </td>
+                     <td>
+                       <select style="width:95%" chosen ng-model="snapshot.storage_type"
+                               ng-change="changeSnapshotStorage(snapshot)"
+                               ng-options="storageType.value as storageType.name for storageType in cubeConfig.snapshotStorageTypes">
+                         <option value=""></option>
+                       </select>
+                     </td>
+                     <td>
+                       <input type="checkbox" ng-model="snapshot.global" ng-disabled="(snapshot.storage_type == 'hbase')">
+                     </td>
+                     <td>
+                       <button class="btn btn-xs btn-info" ng-click="removeSnapshotTable($index)">
+                         <i class="fa fa-minus"></i>
+                       </button>
+                     </td>
+                   </tr>
+                   <tr>
+                     <td>
+                       <select style="width:95%" chosen ng-model="newSnapshot.select.table_name"
+                               ng-options="tableName as tableName for tableName in cubeLookups">
+                         <option value=""></option>
+                       </select>
+                     </td>
+                     <td>
+                       <select style="width:95%" chosen ng-model="newSnapshot.select.storage_type"
+                               ng-change="changeSnapshotStorage(newSnapshot.select)"
+                               ng-options="storageType.value as storageType.name for storageType in cubeConfig.snapshotStorageTypes">
+                         <option value=""></option>
+                       </select>
+                     </td>
+                     <td>
+                       <input type="checkbox" ng-model="newSnapshot.select.global" ng-disabled="(newSnapshot.select.storage_type == 'hbase')">
+                     </td>
+                     <td>
+                       <button class="btn btn-xs btn-info" ng-click="addSnapshot(newSnapshot.select)">
+                         <i class="fa fa-plus"></i>
+                       </button>
+                     </td>
+                   </tr>
+                 </tbody>
+               </table>
+             </div>
+             <!-- view-->
+             <div ng-if="state.mode=='view'" class="box-body">
+               <table class="table table-hover table-bordered list" style="table-layout: fixed;margin-left:42px;width:92%;">
+                  <thead>
+                   <tr>
+                     <th style="width:60%">Snapshot Table</th>
+                     <th style="width:25%">Type</th>
+                     <th style="width:10%">Global</th>
+                   </tr>
+                 </thead>
+                 <tbody>
+                   <tr ng-repeat="snapshot in cubeMetaFrame.snapshot_table_desc_list track by $index">
+                     <td>
+                         <p>{{snapshot.table_name}}</p>
+                     </td>
+                     <td>
+                         <p>{{snapshot.storage_type}}</p>
+                     </td>
+                     <td>
+                         <input type="checkbox" ng-model="snapshot.global" disabled="true">
+                     </td>
+                   </tr>
+                 </tbody>
+               </table>
+             </div>
+           </div>
+         </div>
          <!--Edit ColumnFamily-->
          <div class="form-group large-popover" >
            <h3 style="margin-left:42px">Advanced ColumnFamily  <i kylinpopover placement="right" title="Advanced ColumnFamily" template="AdvancedColumnFamilyTip.html" class="fa fa-info-circle"></i></h3>
@@ -647,3 +740,9 @@
     </h4>
   </div>
 </script>
+
+<script type="text/ng-template" id="AdvanceSnapshotTableTip.html">
+  <div>
+    <h4>Advance snapshot design for global lookup table and provide different storage type.</h4>
+  </div>
+</script>
diff --git a/webapp/app/partials/cubes/cubes.html b/webapp/app/partials/cubes/cubes.html
index 3fd5e61..21b7a34 100644
--- a/webapp/app/partials/cubes/cubes.html
+++ b/webapp/app/partials/cubes/cubes.html
@@ -95,6 +95,7 @@
                         <li ng-if="cube.status!='DESCBROKEN'"><a ng-click="startJobSubmit(cube);">Build</a></li>
                         <li ng-if="cube.status!='DESCBROKEN'"><a ng-click="startRefresh(cube)">Refresh</a></li>
                         <li ng-if="cube.status!='DESCBROKEN'"><a ng-click="startMerge(cube)">Merge</a></li>
+                        <li ng-if="cube.status!='DESCBROKEN'"><a ng-click="startLookupRefresh(cube);">Lookup Refresh</a></li>
                         <li ng-if="cube.status=='READY' && (userService.hasRole('ROLE_ADMIN') || hasPermission('cube',cube, permissions.ADMINISTRATION.mask, permissions.MANAGEMENT.mask))"><a ng-click="disable(cube)">Disable</a></li>
                         <li ng-if="cube.status=='DISABLED' && (userService.hasRole('ROLE_ADMIN') || hasPermission('cube',cube, permissions.ADMINISTRATION.mask, permissions.MANAGEMENT.mask))"><a ng-click="enable(cube)">Enable</a></li>
                         <li ng-if="cube.status=='DISABLED' && (userService.hasRole('ROLE_ADMIN') || hasPermission('cube',cube, permissions.ADMINISTRATION.mask, permissions.MANAGEMENT.mask))"><a ng-click="startDeleteSegment(cube)">Delete Segment</a></li>
@@ -146,4 +147,5 @@
 <div ng-include="'partials/models/model_detail.html'"></div>
 <div ng-include="'partials/cubes/cube_clone.html'"></div>
 <div ng-include="'partials/cubes/cube_delete_segment.html'"></div>
+<div ng-include="'partials/jobs/lookup_refresh.html'"></div>
 </div>
diff --git a/webapp/app/partials/jobs/lookup_refresh.html b/webapp/app/partials/jobs/lookup_refresh.html
new file mode 100644
index 0000000..ce8bbf9
--- /dev/null
+++ b/webapp/app/partials/jobs/lookup_refresh.html
@@ -0,0 +1,71 @@
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+-->
+
+<script type="text/ng-template" id="lookupRefresh.html">
+  <div class="modal-header">
+    <h4 tooltip="refresh">LOOKUP REFRESH CONFIRM</h4>
+  </div>
+  <div class="modal-body" style="background-color: white">
+    <div ng-if="!!cube.detail.snapshot_table_desc_list" class="row">
+      <div class="col-md-2"></div>
+      <div class="col-md-8">
+        <table class="table table-striped list">
+          <tbody>
+            <tr>
+              <td style="width:30%">Lookup Table</td>
+              <td style="width:70%" colspan="2">
+                <select style="width:95%" chosen ng-model="lookup.select.table_name"
+                  ng-change="updateLookupTable(lookup.select.table_name)"
+                  ng-options="lookup as lookup for lookup in cubeLookups">
+                  <option value=""></option>
+                </select>
+              </td>
+            </tr>
+            <tr ng-if="dispalySegment">
+              <td style="width:30%">Segments</td>
+              <td style="width:60%">
+                <ui-select multiple ng-model="lookup.select.segments" theme="bootstrap" sortable="true" close-on-select="false" class="form-control">
+                  <ui-select-match placeholder="Select Segments...">{{$item.name}}</ui-select-match>
+                  <ui-select-choices repeat="segment in cube.segments | filter:getReadySegment">
+                    <div ng-bind-html="segment.name | highlight: $select.search"></div>
+                  </ui-select-choices>
+                </ui-select>
+              </td>
+              <td style="width:10%">
+                <input type="checkbox" ng-change="selectAllSegments(allSegments)" ng-model="allSegments">&nbsp;All
+              </td>
+            </tr>
+          </tbody>
+        </table>
+      </div>
+      <div class="col-md-2"></div>
+    </div>
+    <div ng-if="!cube.detail.snapshot_table_desc_list" class="row">
+      <div class="col-md-2"></div>
+      <div class="col-md-8">
+        <span>No lookup table defined. Please configurate lookup in model</span>
+      </div>
+      <div class="col-md-2"></div>
+    </div>
+  </div>
+
+  <div class="modal-footer">
+    <button class="btn btn-primary" ng-click="cancel()">Close</button>
+    <button ng-if="!!cube.detail.snapshot_table_desc_list" class="btn btn-success" ng-click="refresh()">Submit</button>
+  </div>
+</script>
\ No newline at end of file
diff --git a/webapp/app/partials/tables/table_detail.html b/webapp/app/partials/tables/table_detail.html
index d8209a8..441e093 100644
--- a/webapp/app/partials/tables/table_detail.html
+++ b/webapp/app/partials/tables/table_detail.html
@@ -35,6 +35,9 @@
         <li>
           <a data-toggle="tab"  href="#access">Access</a>
         </li>
+        <li>
+          <a data-toggle="tab" href="#snapshot">Snapshot</a>
+        </li>
       </ul>
       <div class="tab-content">
         <!--Schema-->
@@ -184,8 +187,53 @@
               </div>
             </div>
           </div>
+        </div>
 
-
+        <!--snapshot-->
+        <div id="snapshot" class="tab-pane" ng-controller="TableSnapshotCtrl">
+          <div ng-if="tableSnapshots.length > 0">
+            <table class="table table-hover table-striped list">
+              <thead>
+                <tr style="cursor: pointer">
+                  <th ng-repeat="theaditem in tableConfig.snapshotTheaditems"
+                    ng-click="state.filterAttr= theaditem.attr;state.reverseColumn=theaditem.attr;state.filterReverse=!state.filterReverse;">
+                  {{theaditem.name}}
+                    <i ng-if="state.reverseColumn!= theaditem.attr"
+                       class="fa fa-unsorted"></i>
+                    <i ng-if="state.reverseColumn== theaditem.attr && !state.filterReverse"
+                       class="fa fa-sort-asc"></i>
+                    <i ng-if="state.reverseColumn== theaditem.attr && state.filterReverse"
+                       class="fa fa-sort-desc"></i>
+                  </th>
+                </tr>
+              </thead>
+              <tbody>
+                <tr ng-repeat="snapshot in tableSnapshots | filter: columnName | orderObjectBy:state.filterAttr:state.filterReverse">
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.snapshotID}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.storageType}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.lastBuildTime | utcToConfigTimeZone}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.sourceTableLastModifyTime | utcToConfigTimeZone}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}">
+                    {{snapshot.sourceTableSize | bytes}}
+                  </td>
+                  <td style="{{(snapshot.snapshotID == snapshot.snapshotID)? 'background-color:#EBF9FE':''}}" class="snapshot-usage">
+                    <i class="fa fa-list text-aqua" style="cursor: pointer;"  aria-hidden="true" tooltip-placement="left" tooltip-html-unsafe="<div style='text-align:left'>{{snapshot.usageInfo}}</div>"></i>
+                  </td>
+                </tr>
+              </tbody>
+            </table>
+          </div>
+          <div ng-if="tableSnapshots.length == 0">
+            <div no-result text="No Snapshot Info."></div>
+          </div>
         </div>
       </div>
 

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.

[kylin] 03/05: KYLIN-3375 Some improvements for lookup table - build change

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3221
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 45f998b368ab46f8fda05875b5945beccfd57869
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu May 10 13:00:10 2018 +0800

    KYLIN-3375 Some improvements for lookup table - build change
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  13 +
 .../apache/kylin/common/restclient/RestClient.java |  21 ++
 .../java/org/apache/kylin/cube/CubeManager.java    | 118 ++++++--
 .../java/org/apache/kylin/cube/CubeUpdate.java     |   9 +
 .../kylin/cube/cli/DictionaryGeneratorCLI.java     |   7 +-
 .../dict/lookup/AbstractLookupRowEncoder.java      | 122 +++++++++
 .../kylin/dict/lookup/IExtLookupProvider.java      |  29 +-
 .../kylin/dict/lookup/IExtLookupTableCache.java    |  28 +-
 .../org/apache/kylin/dict/lookup/ILookupTable.java |  27 +-
 .../kylin/dict/lookup/LookupProviderFactory.java   | 112 ++++++++
 .../kylin/job/constant/ExecutableConstants.java    |   5 +
 .../kylin/job/execution/AbstractExecutable.java    |   6 +-
 .../job/execution/DefaultChainedExecutable.java    |  26 ++
 .../realization/IRealizationConstants.java         |   2 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  17 ++
 .../java/org/apache/kylin/engine/mr/CubingJob.java |  26 --
 .../kylin/engine/mr/ILookupMaterializer.java       |  25 +-
 .../java/org/apache/kylin/engine/mr/MRUtil.java    |   5 +
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |   4 +
 .../kylin/engine/mr/common/BatchConstants.java     |  11 +
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java     |  23 ++
 .../localmeta/cube_desc/ci_left_join_cube.json     |   8 +
 .../apache/kylin/rest/job/MetadataCleanupJob.java  |   3 +-
 .../kylin/rest/job/StorageCleanJobHbaseUtil.java   |  37 +++
 server/src/main/resources/kylinSecurity.xml        |   2 +
 storage-hbase/pom.xml                              |   6 +
 .../storage/hbase/lookup/HBaseLookupMRSteps.java   | 175 ++++++++++++
 .../hbase/lookup/HBaseLookupMaterializer.java      |  40 +++
 .../storage/hbase/lookup/HBaseLookupProvider.java  |  58 ++++
 .../hbase/lookup/HBaseLookupRowEncoder.java        | 134 +++++++++
 .../storage/hbase/lookup/HBaseLookupTable.java     | 130 +++++++++
 .../hbase/lookup/KVSortReducerWithDupKeyCheck.java |  62 +++++
 .../hbase/lookup/LookupTableHFilesBulkLoadJob.java | 106 ++++++++
 .../hbase/lookup/LookupTableToHFileJob.java        | 302 +++++++++++++++++++++
 .../hbase/lookup/LookupTableToHFileMapper.java     | 109 ++++++++
 .../UpdateSnapshotCacheForQueryServersStep.java    | 106 ++++++++
 .../hbase/lookup/HBaseLookupRowEncoderTest.java    |  98 +++++++
 .../hbase/lookup/LookupTableToHFileJobTest.java    | 110 ++++++++
 ...UpdateSnapshotCacheForQueryServersStepTest.java |  55 ++++
 39 files changed, 2052 insertions(+), 125 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7b24864..5f8172b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -414,6 +414,19 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.snapshot.max-mb", "300"));
     }
 
+    public int getExtTableSnapshotShardingMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.ext.shard-mb", "500"));
+    }
+
+    public String getExtTableSnapshotLocalCachePath() {
+        return getOptional("kylin.snapshot.ext.local.cache.path", "lookup_cache");
+    }
+
+    public double getExtTableSnapshotLocalCacheMaxSizeGB() {
+        return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
+    }
+
+
     // ============================================================================
     // CUBE
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 93f5e19..11284f6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -303,6 +303,27 @@ public class RestClient {
         }
     }
 
+    public void buildLookupSnapshotCache(String project, String lookupTableName, String snapshotID) throws IOException {
+        String url = baseUrl + "/tables/" + project + "/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache";
+        HttpPut put = new HttpPut(url);
+        HttpResponse response = client.execute(put);
+        getContent(response);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+        }
+    }
+
+    public String getLookupSnapshotCacheState(String lookupTableName, String snapshotID) throws IOException {
+        String url = baseUrl + "/tables/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache/state";
+        HttpGet get = new HttpGet(url);
+        HttpResponse response = client.execute(get);
+        String content = getContent(response);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+        }
+        return content;
+    }
+
     private HashMap dealResponse(HttpResponse response) throws IOException {
         if (response.getStatusLine().getStatusCode() != 200) {
             throw new IOException("Invalid response " + response.getStatusLine().getStatusCode());
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index fc2ad3d..a6022aa 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -44,9 +45,13 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.TableMetadataManager;
@@ -290,6 +295,18 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
+    public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String lookupTableName, String newSnapshotResPath) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            cube = cube.latestCopyForWrite();
+
+            CubeUpdate update = new CubeUpdate(cube);
+            Map<String, String> map = Maps.newHashMap();
+            map.put(lookupTableName, newSnapshotResPath);
+            update.setUpdateTableSnapshotPath(map);
+            return updateCube(update);
+        }
+    }
+
     private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
         if (update == null || update.getCubeInstance() == null)
             throw new IllegalStateException();
@@ -353,6 +370,12 @@ public class CubeManager implements IRealizationProvider {
             cube.setCuboidsRecommend(update.getCuboidsRecommend());
         }
 
+        if (update.getUpdateTableSnapshotPath() != null) {
+            for(Map.Entry<String, String> lookupSnapshotPathEntry : update.getUpdateTableSnapshotPath().entrySet()) {
+                cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), lookupSnapshotPathEntry.getValue());
+            }
+        }
+
         try {
             cube = crud.save(cube);
         } catch (WriteConflictException ise) {
@@ -435,6 +458,55 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
+    public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+        String tableName = join.getPKSide().getTableIdentity();
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName);
+        if (snapshotTableDesc == null || !snapshotTableDesc.isExtSnapshotTable()) {
+            return getInMemLookupTable(cubeSegment, join, snapshotTableDesc);
+        } else {
+            return getExtLookupTable(cubeSegment, tableName, snapshotTableDesc);
+        }
+    }
+
+    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join, SnapshotTableDesc snapshotTableDesc) {
+        String tableName = join.getPKSide().getTableIdentity();
+        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+        String[] pkCols = join.getPrimaryKey();
+
+        try {
+            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
+            TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+            return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+        }
+    }
+
+    private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
+        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+
+        ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot(
+                snapshotResPath);
+        TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+        return LookupProviderFactory.getExtLookupTable(tableDesc, extTableSnapshot);
+    }
+
+    private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
+        String snapshotResPath;
+        if (snapshotTableDesc == null || !snapshotTableDesc.isGlobal()) {
+            snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
+        } else {
+            snapshotResPath = cubeSegment.getCubeInstance().getSnapshotResPath(tableName);
+        }
+        if (snapshotResPath == null) {
+            throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
+                    + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+        }
+        return snapshotResPath;
+    }
+
     @VisibleForTesting
     /*private*/ String generateStorageLocation() {
         String namePrefix = config.getHBaseTableNamePrefix();
@@ -972,8 +1044,8 @@ public class CubeManager implements IRealizationProvider {
         return dictAssist.buildSnapshotTable(cubeSeg, lookupTable);
     }
 
-    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-        return dictAssist.getLookupTable(cubeSegment, join);
+    private TableMetadataManager getMetadataManager() {
+        return TableMetadataManager.getInstance(config);
     }
 
     private class DictionaryAssist {
@@ -1055,31 +1127,25 @@ public class CubeManager implements IRealizationProvider {
             IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
             SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
 
-            segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-            CubeUpdate update = new CubeUpdate(cubeCopy);
-            update.setToUpdateSegs(segCopy);
-            updateCube(update);
-
-            return snapshot;
-        }
-
-        public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-
-            String tableName = join.getPKSide().getTableIdentity();
-            String[] pkCols = join.getPrimaryKey();
-            String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
-            if (snapshotResPath == null)
-                throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
-                        + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
+                segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+                CubeUpdate update = new CubeUpdate(cubeCopy);
+                update.setToUpdateSegs(segCopy);
+                updateCube(update);
+
+                // Update the input cubeSeg after the resource store updated
+                cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
+            } else {
+                CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+                Map<String, String> map = Maps.newHashMap();
+                map.put(lookupTable, snapshot.getResourcePath());
+                cubeUpdate.setUpdateTableSnapshotPath(map);
+                updateCube(cubeUpdate);
 
-            try {
-                SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
-                TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject());
-                return new LookupStringTable(tableDesc, pkCols, snapshot);
-            } catch (IOException e) {
-                throw new IllegalStateException(
-                        "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+                cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
             }
+            return snapshot;
         }
     }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
index 378d082..62b46a9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -36,6 +36,7 @@ public class CubeUpdate {
     private int cost = -1;
     private Map<Long, Long> cuboids = null;
     private Set<Long> cuboidsRecommend = null;
+    private Map<String, String> updateTableSnapshotPath = null;
 
     public CubeUpdate(CubeInstance cubeInstance) {
         setCubeInstance(cubeInstance);
@@ -124,4 +125,12 @@ public class CubeUpdate {
         this.cuboidsRecommend = cuboidsRecommend;
         return this;
     }
+
+    public Map<String, String> getUpdateTableSnapshotPath() {
+        return updateTableSnapshotPath;
+    }
+
+    public void setUpdateTableSnapshotPath(Map<String, String> updateTableSnapshotPath) {
+        this.updateTableSnapshotPath = updateTableSnapshotPath;
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 36c06b7..2a24370 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -77,8 +77,11 @@ public class DictionaryGeneratorCLI {
         for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
             TableRef table = dim.getTableRef();
             if (cubeSeg.getModel().isLookupTable(table)) {
-                toSnapshot.add(table.getTableIdentity());
-                toCheckLookup.add(table);
+                // only the snapshot desc is not ext type, need to take snapshot
+                if (!cubeSeg.getCubeDesc().isExtSnapshotTable(table.getTableIdentity())) {
+                    toSnapshot.add(table.getTableIdentity());
+                    toCheckLookup.add(table);
+                }
             }
         }
 
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java
new file mode 100644
index 0000000..5efe129
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * Abstract encoder/decoder
+ * 
+ */
+abstract public class AbstractLookupRowEncoder<R> {
+    protected ByteBuffer keyByteBuffer = ByteBuffer.allocate(1024 * 1024);
+
+    protected int columnsNum;
+    protected int[] keyIndexes;
+    protected int[] valueIndexes;
+
+    public AbstractLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) {
+        this.columnsNum = tableDesc.getColumns().length;
+        this.keyIndexes = new int[keyColumns.length];
+        this.valueIndexes = new int[columnsNum - keyColumns.length];
+        int keyIdx = 0;
+        int valIdx = 0;
+        for (int i = 0; i < columnsNum; i++) {
+            boolean isKeyColumn = false;
+            for (String keyColumn : keyColumns) {
+                if (keyColumn.equals(tableDesc.getColumns()[i].getName())) {
+                    isKeyColumn = true;
+                    break;
+                }
+            }
+            if (isKeyColumn) {
+                keyIndexes[keyIdx] = i;
+                keyIdx++;
+            } else {
+                valueIndexes[valIdx] = i;
+                valIdx++;
+            }
+        }
+    }
+
+    abstract public R encode(String[] row);
+
+    abstract public String[] decode(R result);
+
+    public String[] getKeyData(String[] row) {
+        return extractColValues(row, keyIndexes);
+    }
+
+    public String[] getValueData(String[] row) {
+        return extractColValues(row, valueIndexes);
+    }
+
+    public byte[] encodeStringsWithLenPfx(String[] keys, boolean allowNull) {
+        keyByteBuffer.clear();
+        for (String key : keys) {
+            if (key == null && !allowNull) {
+                throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys));
+            }
+            byte[] byteKey = toBytes(key);
+            keyByteBuffer.putShort((short) byteKey.length);
+            keyByteBuffer.put(byteKey);
+        }
+        byte[] result = new byte[keyByteBuffer.position()];
+        System.arraycopy(keyByteBuffer.array(), 0, result, 0, keyByteBuffer.position());
+        return result;
+    }
+
+    protected void decodeFromLenPfxBytes(byte[] rowKey, int[] valueIdx, String[] result) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey);
+        for (int i = 0; i < valueIdx.length; i++) {
+            short keyLen = byteBuffer.getShort();
+            byte[] keyBytes = new byte[keyLen];
+            byteBuffer.get(keyBytes);
+            result[valueIdx[i]] = fromBytes(keyBytes);
+        }
+    }
+
+    protected String[] extractColValues(String[] row, int[] indexes) {
+        String[] result = new String[indexes.length];
+        int i = 0;
+        for (int idx : indexes) {
+            result[i++] = row[idx];
+        }
+        return result;
+    }
+
+    protected byte[] toBytes(String str) {
+        if (str == null) {
+            return new byte[] { DimensionEncoding.NULL };
+        }
+        return Bytes.toBytes(str);
+    }
+
+    protected String fromBytes(byte[] bytes) {
+        if (DimensionEncoding.isNull(bytes, 0, bytes.length)) {
+            return null;
+        }
+        return Bytes.toString(bytes);
+    }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java
similarity index 57%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
copy to core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java
index 2e4f400..a09a439 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java
@@ -16,26 +16,21 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.metadata.realization;
+package org.apache.kylin.dict.lookup;
 
-/**
- */
-public class IRealizationConstants {
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface IExtLookupProvider {
+    ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot);
 
     /**
-     * For each cube htable, we leverage htable's metadata to keep track of
-     * which kylin server(represented by its kylin_metadata prefix) owns this htable
+     * @return the local cache if the provider has, return null if no local cache exist
      */
-    public final static String HTableTag = "KYLIN_HOST";
-
-    public final static String HTableOwner = "OWNER";
-
-    public final static String HTableUser = "USER";
-
-    public final static String HTableCreationTime = "CREATION_TIME";
-
-    public final static String HTableSegmentTag = "SEGMENT";
-
-    public final static String HTableGitTag = "GIT_COMMIT";
+    IExtLookupTableCache getLocalCache();
 
+    /**
+     * Return an adaptor that implements specified interface as requested by the build engine.
+     * The ILookupMaterializer in particular, is required by the MR build engine.
+     */
+    <I> I adaptToBuildEngine(Class<I> engineInterface);
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java
similarity index 51%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
copy to core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java
index 2e4f400..f473059 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java
@@ -16,26 +16,24 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.metadata.realization;
+package org.apache.kylin.dict.lookup;
 
-/**
- */
-public class IRealizationConstants {
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface IExtLookupTableCache {
+    enum CacheState {NONE, IN_BUILDING, AVAILABLE}
 
     /**
-     * For each cube htable, we leverage htable's metadata to keep track of
-     * which kylin server(represented by its kylin_metadata prefix) owns this htable
+     * @param tableDesc
+     * @param extTableSnapshotInfo
+     * @param buildIfNotExist if true, when the cached lookup table not exist, build it.
+     * @return null if no cached lookup table exist
      */
-    public final static String HTableTag = "KYLIN_HOST";
-
-    public final static String HTableOwner = "OWNER";
-
-    public final static String HTableUser = "USER";
-
-    public final static String HTableCreationTime = "CREATION_TIME";
+    ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist);
 
-    public final static String HTableSegmentTag = "SEGMENT";
+    void buildSnapshotCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, ILookupTable sourceTable);
 
-    public final static String HTableGitTag = "GIT_COMMIT";
+    void removeSnapshotCache(ExtTableSnapshotInfo extTableSnapshotInfo);
 
+    CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo);
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
similarity index 57%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
copy to core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
index 2e4f400..dccb7c4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
@@ -16,26 +16,17 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.metadata.realization;
+package org.apache.kylin.dict.lookup;
 
-/**
- */
-public class IRealizationConstants {
+import org.apache.kylin.common.util.Array;
 
+import java.io.Closeable;
+
+public interface ILookupTable extends Iterable<String[]>, Closeable {
     /**
-     * For each cube htable, we leverage htable's metadata to keep track of
-     * which kylin server(represented by its kylin_metadata prefix) owns this htable
+     * get row according the key
+     * @param key
+     * @return
      */
-    public final static String HTableTag = "KYLIN_HOST";
-
-    public final static String HTableOwner = "OWNER";
-
-    public final static String HTableUser = "USER";
-
-    public final static String HTableCreationTime = "CREATION_TIME";
-
-    public final static String HTableSegmentTag = "SEGMENT";
-
-    public final static String HTableGitTag = "GIT_COMMIT";
-
+    String[] getRow(Array<String> key);
 }
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java
new file mode 100644
index 0000000..64ccef5
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class LookupProviderFactory {
+    private static final Logger logger = LoggerFactory.getLogger(LookupProviderFactory.class);
+    private static Map<String, String> lookupProviderImplClassMap = Maps.newConcurrentMap();
+
+    static {
+        registerLookupProvider(ExtTableSnapshotInfo.STORAGE_TYPE_HBASE,
+                "org.apache.kylin.storage.hbase.lookup.HBaseLookupProvider");
+    }
+
+    public static void registerLookupProvider(String storageType, String implClassName) {
+        lookupProviderImplClassMap.put(storageType, implClassName);
+    }
+
+    public static IExtLookupProvider getExtLookupProvider(String storageType) {
+        String className = lookupProviderImplClassMap.get(storageType);
+        if (className == null) {
+            throw new IllegalStateException("no implementation class found for storage type:" + storageType);
+        }
+        try {
+            Class clazz = Class.forName(className);
+            Constructor constructor = clazz.getConstructor();
+            return (IExtLookupProvider) constructor.newInstance();
+        } catch (ReflectiveOperationException e) {
+            throw new IllegalStateException("the lookup implementation class is invalid for storage type:"
+                    + storageType, e);
+        }
+    }
+
+    public static ILookupTable getInMemLookupTable(TableDesc tableDesc, String[] pkCols, IReadableTable readableTable)
+            throws IOException {
+        return new LookupStringTable(tableDesc, pkCols, readableTable);
+    }
+
+    public static ILookupTable getExtLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        IExtLookupTableCache extLookupTableCache = getExtLookupProvider(extTableSnapshot.getStorageType()).getLocalCache();
+        if (extLookupTableCache == null) {
+            return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot);
+        }
+        ILookupTable cachedLookupTable = extLookupTableCache.getCachedLookupTable(tableDesc, extTableSnapshot, true);
+        if (cachedLookupTable != null) {
+            logger.info("try to use cached lookup table:{}", extTableSnapshot.getResourcePath());
+            return cachedLookupTable;
+        }
+        logger.info("use ext lookup table:{}", extTableSnapshot.getResourcePath());
+        return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot);
+    }
+
+    public static ILookupTable getExtLookupTableWithoutCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        IExtLookupProvider provider = getExtLookupProvider(extTableSnapshot.getStorageType());
+        return provider.getLookupTable(tableDesc, extTableSnapshot);
+    }
+
+    public static <T> T createEngineAdapter(String lookupStorageType, Class<T> engineInterface) {
+        IExtLookupProvider provider = getExtLookupProvider(lookupStorageType);
+        return provider.adaptToBuildEngine(engineInterface);
+    }
+
+    public static void rebuildLocalCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) {
+        IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache();
+        if (tablesCache != null) {
+            tablesCache.buildSnapshotCache(tableDesc, extTableSnapshotInfo, getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo));
+        }
+    }
+
+    public static void removeLocalCache(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache();
+        if (tablesCache != null) {
+            tablesCache.removeSnapshotCache(extTableSnapshotInfo);
+        }
+    }
+
+    public static CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache();
+        if (tablesCache != null) {
+            return tablesCache.getCacheState(extTableSnapshotInfo);
+        }
+        return CacheState.NONE;
+    }
+
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index b9a3651..7b3c5a3 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -62,4 +62,9 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_GARBAGE_COLLECTION_HBASE = "Garbage Collection on HBase";
     public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
     public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Flat Hive Table";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_CONVERT_HFILE = "Convert Lookup Table to HFile";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_BULK_LOAD = "Load HFile to HBase Table";
+    public static final String STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE = "Update Lookup Snapshot Cache to Query Engine";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE = "Take Snapshot to Metadata Store";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE = "Update Cube Info";
 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index dbe11c2..1a534e1 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -399,10 +399,14 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         }
     }
 
-    protected final void addExtraInfo(String key, String value) {
+    public final void addExtraInfo(String key, String value) {
         getManager().addJobInfo(getId(), key, value);
     }
 
+    public final String getExtraInfo(String key) {
+        return getExtraInfo().get(key);
+    }
+
     protected final Map<String, String> getExtraInfo() {
         return getOutput().getExtra();
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 404db54..2297be7 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.job.execution;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -203,4 +205,28 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
     public int getDefaultPriority() {
         return DEFAULT_PRIORITY;
     }
+
+    public String findExtraInfo(String key, String dft) {
+        return findExtraInfo(key, dft, false);
+    }
+
+    public String findExtraInfoBackward(String key, String dft) {
+        return findExtraInfo(key, dft, true);
+    }
+
+    private String findExtraInfo(String key, String dft, boolean backward) {
+        ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
+
+        if (backward) {
+            Collections.reverse(tasks);
+        }
+
+        for (AbstractExecutable child : tasks) {
+            Output output = getManager().getOutput(child.getId());
+            String value = output.getExtra().get(key);
+            if (value != null)
+                return value;
+        }
+        return dft;
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
index 2e4f400..ad311d9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
@@ -22,6 +22,8 @@ package org.apache.kylin.metadata.realization;
  */
 public class IRealizationConstants {
 
+    public final static String LookupHbaseStorageLocationPrefix = "LOOKUP_";
+
     /**
      * For each cube htable, we leverage htable's metadata to keep track of
      * which kylin server(represented by its kylin_metadata prefix) owns this htable
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index faac724..a840bf7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -69,6 +70,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         result.addTask(createBuildDictionaryStep(jobId));
         result.addTask(createSaveStatisticsStep(jobId));
+
+        // add materialize lookup tables if needed
+        addMaterializeLookupTableSteps(result);
+
         outputSide.addStepPhase2_BuildDictionary(result);
 
         // Phase 3: Build Cube
@@ -97,6 +102,18 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return true;
     }
 
+    private void addMaterializeLookupTableSteps(final CubingJob result) {
+        CubeDesc cubeDesc = seg.getCubeDesc();
+        List<String> allSnapshotTypes = cubeDesc.getAllExtLookupSnapshotTypes();
+        if (allSnapshotTypes.isEmpty()) {
+            return;
+        }
+        for (String snapshotType : allSnapshotTypes) {
+            ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotType);
+            materializer.materializeLookupTablesForCube(result, seg.getCubeInstance());
+        }
+    }
+
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
         final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index c9ed359..7f7191d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -19,8 +19,6 @@
 package org.apache.kylin.engine.mr;
 
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -350,28 +348,4 @@ public class CubingJob extends DefaultChainedExecutable {
         return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
     }
 
-    public String findExtraInfo(String key, String dft) {
-        return findExtraInfo(key, dft, false);
-    }
-
-    public String findExtraInfoBackward(String key, String dft) {
-        return findExtraInfo(key, dft, true);
-    }
-
-    private String findExtraInfo(String key, String dft, boolean backward) {
-        ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
-
-        if (backward) {
-            Collections.reverse(tasks);
-        }
-
-        for (AbstractExecutable child : tasks) {
-            Output output = getManager().getOutput(child.getId());
-            String value = output.getExtra().get(key);
-            if (value != null)
-                return value;
-        }
-        return dft;
-    }
-
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java
similarity index 56%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java
index 2e4f400..f103da2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java
@@ -16,26 +16,13 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.metadata.realization;
+package org.apache.kylin.engine.mr;
 
-/**
- */
-public class IRealizationConstants {
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
-    /**
-     * For each cube htable, we leverage htable's metadata to keep track of
-     * which kylin server(represented by its kylin_metadata prefix) owns this htable
-     */
-    public final static String HTableTag = "KYLIN_HOST";
-
-    public final static String HTableOwner = "OWNER";
-
-    public final static String HTableUser = "USER";
-
-    public final static String HTableCreationTime = "CREATION_TIME";
-
-    public final static String HTableSegmentTag = "SEGMENT";
-
-    public final static String HTableGitTag = "GIT_COMMIT";
+public interface ILookupMaterializer {
+    void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName);
 
+    void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube);
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index b98608f..85a425c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
@@ -69,6 +70,10 @@ public class MRUtil {
     public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
     }
+
+    public static ILookupMaterializer getExtLookupMaterializer(String lookupStorageType) {
+        return LookupProviderFactory.createEngineAdapter(lookupStorageType, ILookupMaterializer.class);
+    }
     
     // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
     // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 7b25354..fa3c22e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -120,6 +120,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Option OPTION_NEED_UPDATE_BASE_CUBOID_SHARD = OptionBuilder
             .withArgName(BatchConstants.ARG_UPDATE_SHARD).hasArg().isRequired(false)
             .withDescription("If need to update base cuboid shard").create(BatchConstants.ARG_UPDATE_SHARD);
+    protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_TABLE_NAME).hasArg().isRequired(true).withDescription("Table name. For exmaple, default.table1").create(BatchConstants.ARG_TABLE_NAME);
+    protected static final Option OPTION_LOOKUP_SNAPSHOT_ID = OptionBuilder.withArgName(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID).hasArg()
+            .isRequired(true).withDescription("Lookup table snapshotID")
+            .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 64163ad..36f2566 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -72,6 +72,8 @@ public interface BatchConstants {
 
     String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum";
 
+    String CFG_SHARD_NUM = "shard.num";
+
     /**
      * command line ARGuments
      */
@@ -95,6 +97,8 @@ public interface BatchConstants {
     String ARG_LEVEL = "level";
     String ARG_CONF = "conf";
     String ARG_DICT_PATH = "dictPath";
+    String ARG_TABLE_NAME = "tableName";
+    String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
 
     /**
      * logger and counter
@@ -106,4 +110,11 @@ public interface BatchConstants {
      * dictionaries builder class
      */
     String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+    /**
+     * the prefix of ext lookup table snapshot resource path that stored in the build context
+     */
+    String LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX = "lookup.ext.snapshot.res.path.";
+
+    String LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX = "lookup.ext.snapshot.src.record.cnt.";
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index d085a77..ad1245c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.List;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -30,6 +31,7 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -71,6 +73,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         segment.setInputRecordsSize(sourceSizeBytes);
 
         try {
+            saveExtSnapshotIfNeeded(cubeManager, cubingJob, cube, segment);
             if (segment.isOffsetCube()) {
                 updateTimeRange(segment);
             }
@@ -83,6 +86,26 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         }
     }
 
+    private void saveExtSnapshotIfNeeded(CubeManager cubeManager, CubingJob cubingJob, CubeInstance cube, CubeSegment segment) throws IOException {
+        List<SnapshotTableDesc> snapshotTableDescList = cube.getDescriptor().getSnapshotTableDescList();
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) {
+            String tableName = snapshotTableDesc.getTableName();
+            if (snapshotTableDesc.isExtSnapshotTable()) {
+                String contextKey = BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + tableName;
+                String newSnapshotResPath = cubingJob.getExtraInfo(contextKey);
+                if (newSnapshotResPath == null) {
+                    continue;
+                }
+
+                if (snapshotTableDesc.isGlobal()) {
+                    cubeManager.updateCubeLookupSnapshot(cube, tableName, newSnapshotResPath);
+                } else {
+                    segment.putSnapshotResPath(tableName, newSnapshotResPath);
+                }
+            }
+        }
+    }
+
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
 
diff --git a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
index f1a42b1..e42c522 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
@@ -582,5 +582,13 @@
   "override_kylin_properties": {
     "kylin.cube.algorithm": "INMEM"
   },
+  "snapshot_table_desc_list": [
+    {
+      "table_name": "DEFAULT.TEST_CATEGORY_GROUPINGS",
+      "storage_type": "hbase",
+      "local_cache_enable": true,
+      "global": true
+    }
+  ],
   "partition_date_start": 0
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
index 1aec429..5ee5c7a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
@@ -73,7 +73,7 @@ public class MetadataCleanupJob {
 
         // two level resources, snapshot tables and cube statistics
         for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT,
-                ResourceStore.CUBE_STATISTICS_ROOT }) {
+                ResourceStore.CUBE_STATISTICS_ROOT, ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT}) {
             for (String dir : noNull(store.listResources(resourceRoot))) {
                 for (String res : noNull(store.listResources(dir))) {
                     if (store.getResourceTimestamp(res) < newResourceTimeCut)
@@ -97,6 +97,7 @@ public class MetadataCleanupJob {
         // exclude resources in use
         Set<String> activeResources = Sets.newHashSet();
         for (CubeInstance cube : cubeManager.listAllCubes()) {
+            activeResources.addAll(cube.getSnapshots().values());
             for (CubeSegment segment : cube.getSegments()) {
                 activeResources.addAll(segment.getSnapshotPaths());
                 activeResources.addAll(segment.getDictionaryPaths());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
index 126c598..4c8c426 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
@@ -21,6 +21,7 @@ package org.apache.kylin.rest.job;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -28,6 +29,7 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -35,6 +37,8 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,10 +64,17 @@ public class StorageCleanJobHbaseUtil {
                 ? config.getHBaseTableNamePrefix()
                 : (namespace + ":" + config.getHBaseTableNamePrefix());
         HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
+
         List<String> allTablesNeedToBeDropped = new ArrayList<String>();
+
+        boolean hasExtLookupTable = false;
         for (HTableDescriptor desc : tableDescriptors) {
             String host = desc.getValue(IRealizationConstants.HTableTag);
             if (config.getMetadataUrlPrefix().equalsIgnoreCase(host)) {
+                // check if there are hbase lookup table
+                if (desc.getTableName().getNameAsString().contains(IRealizationConstants.LookupHbaseStorageLocationPrefix)) {
+                    hasExtLookupTable = true;
+                }
                 //only take care htables that belongs to self, and created more than 2 days
                 allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
             }
@@ -88,6 +99,11 @@ public class StorageCleanJobHbaseUtil {
         
         logger.info(allTablesNeedToBeDropped.size() + " HTable(s) to clean up");
 
+        if (hasExtLookupTable) {
+            List<String> useExtLookupTables = getAllUsedExtLookupTables();
+            logger.info("Exclude tables:{}, as they are referred by snapshots.", useExtLookupTables);
+            allTablesNeedToBeDropped.removeAll(useExtLookupTables);
+        }
         if (delete) {
             // drop tables
             ExecutorService executorService = Executors.newSingleThreadExecutor();
@@ -115,6 +131,27 @@ public class StorageCleanJobHbaseUtil {
         return allTablesNeedToBeDropped;
     }
 
+    private static List<String> getAllUsedExtLookupTables() throws IOException {
+        List<String> result = Lists.newArrayList();
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths();
+
+        for (String extSnapshotResource : activeSnapshotSet) {
+            try {
+                ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot(
+                        extSnapshotResource);
+                if (extTableSnapshot != null) {
+                    if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(extTableSnapshot.getStorageType())) {
+                        result.add(extTableSnapshot.getStorageLocationIdentifier());
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("error fetch ext table snapshot:" + extSnapshotResource, e);
+            }
+        }
+        return result;
+    }
+
     static class DeleteHTableRunnable implements Callable {
         HBaseAdmin hbaseAdmin;
         String htableName;
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index f9c0d71..c08ae70 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -250,6 +250,7 @@
             <scr:intercept-url pattern="/api/admin/public_config" access="permitAll"/>
             <scr:intercept-url pattern="/api/projects" access="permitAll"/>
             <scr:intercept-url pattern="/api/admin*/**" access="hasRole('ROLE_ADMIN')"/>
+            <scr:intercept-url pattern="/api/tables/**/snapshotLocalCache/**" access="permitAll"/>
             <scr:intercept-url pattern="/api/**" access="isAuthenticated()"/>
 
             <scr:form-login login-page="/login" />
@@ -295,6 +296,7 @@
             <scr:intercept-url pattern="/api/admin/config" access="permitAll"/>
             <scr:intercept-url pattern="/api/projects*/*" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/admin*/**" access="hasRole('ROLE_ADMIN')"/>
+            <scr:intercept-url pattern="/api/tables/**/snapshotLocalCache/**" access="permitAll"/>
             <scr:intercept-url pattern="/api/**" access="isAuthenticated()"/>
 
             <scr:form-login login-page="/login" />
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 9bf62f0..4709c08 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -96,6 +96,12 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4-rule-agent</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
new file mode 100644
index 0000000..757f6d0
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class HBaseLookupMRSteps {
+    protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupMRSteps.class);
+    private CubeInstance cube;
+    private JobEngineConfig config;
+
+    public HBaseLookupMRSteps(CubeInstance cube) {
+        this.cube = cube;
+        this.config = new JobEngineConfig(cube.getConfig());
+    }
+
+    public void addMaterializeLookupTablesSteps(DefaultChainedExecutable jobFlow) {
+        CubeDesc cubeDesc = cube.getDescriptor();
+        Set<String> allLookupTables = Sets.newHashSet();
+        for (DimensionDesc dim : cubeDesc.getDimensions()) {
+            TableRef table = dim.getTableRef();
+            if (cubeDesc.getModel().isLookupTable(table)) {
+                allLookupTables.add(table.getTableIdentity());
+            }
+        }
+        List<SnapshotTableDesc> snapshotTableDescs = cubeDesc.getSnapshotTableDescList();
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescs) {
+            if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(snapshotTableDesc.getStorageType())
+                    && allLookupTables.contains(snapshotTableDesc.getTableName())) {
+                addMaterializeLookupTableSteps(jobFlow, snapshotTableDesc.getTableName(), snapshotTableDesc);
+            }
+        }
+    }
+
+    public void addMaterializeLookupTableSteps(DefaultChainedExecutable jobFlow, String tableName, SnapshotTableDesc snapshotTableDesc) {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+        TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName, cube.getProject());
+        IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc);
+        try {
+            if (extTableSnapshotInfoManager.hasLatestSnapshot(sourceTable.getSignature(), tableName)) {
+                logger.info("there is latest snapshot exist for table:{}, skip build snapshot step.", tableName);
+                return;
+            }
+        } catch (IOException ioException) {
+            throw new RuntimeException(ioException);
+        }
+        logger.info("add build snapshot steps for table:{}", tableName);
+        String snapshotID = genLookupSnapshotID();
+        addLookupTableConvertToHFilesStep(jobFlow, tableName, snapshotID);
+        addLookupTableHFilesBulkLoadStep(jobFlow, tableName, snapshotID);
+        if (snapshotTableDesc !=null && snapshotTableDesc.isEnableLocalCache()) {
+            addUpdateSnapshotQueryCacheStep(jobFlow, tableName, snapshotID);
+        }
+    }
+
+    private String genLookupSnapshotID() {
+        return UUID.randomUUID().toString();
+    }
+
+    private void addLookupTableConvertToHFilesStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) {
+        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+        createHFilesStep
+                .setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_CONVERT_HFILE + ":" + tableName);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, cube.getName());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT,
+                getLookupTableHFilePath(tableName, jobFlow.getId()));
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, tableName);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobFlow.getId());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_LOOKUP_SNAPSHOT_ID, snapshotID);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_LookupTable_HFile_Generator_" + tableName + "_Step");
+
+        createHFilesStep.setMapReduceParams(cmd.toString());
+        createHFilesStep.setMapReduceJobClass(LookupTableToHFileJob.class);
+        createHFilesStep.setCounterSaveAs(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName);
+
+        jobFlow.addTask(createHFilesStep);
+    }
+
+    private void addLookupTableHFilesBulkLoadStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) {
+        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_BULK_LOAD + ":" + tableName);
+
+        StringBuilder cmd = new StringBuilder();
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT,
+                getLookupTableHFilePath(tableName, jobFlow.getId()));
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, tableName);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobFlow.getId());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_LOOKUP_SNAPSHOT_ID, snapshotID);
+
+        bulkLoadStep.setJobParams(cmd.toString());
+        bulkLoadStep.setJobClass(LookupTableHFilesBulkLoadJob.class);
+        jobFlow.addTask(bulkLoadStep);
+    }
+
+    private void addUpdateSnapshotQueryCacheStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) {
+        UpdateSnapshotCacheForQueryServersStep updateSnapshotCacheStep = new UpdateSnapshotCacheForQueryServersStep();
+        updateSnapshotCacheStep.setName(ExecutableConstants.STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE + ":" + tableName);
+
+        LookupExecutableUtil.setProjectName(cube.getProject(), updateSnapshotCacheStep.getParams());
+        LookupExecutableUtil.setLookupTableName(tableName, updateSnapshotCacheStep.getParams());
+        LookupExecutableUtil.setLookupSnapshotID(snapshotID, updateSnapshotCacheStep.getParams());
+        jobFlow.addTask(updateSnapshotCacheStep);
+    }
+
+    private String getLookupTableHFilePath(String tableName, String jobId) {
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(JobBuilderSupport.getJobWorkingDir(config, jobId) + "/"
+                + tableName + "/hfile/");
+    }
+
+    public void appendMapReduceParameters(StringBuilder buf) {
+        appendMapReduceParameters(buf, JobEngineConfig.DEFAUL_JOB_CONF_SUFFIX);
+    }
+
+    public void appendMapReduceParameters(StringBuilder buf, String jobType) {
+        try {
+            String jobConf = config.getHadoopJobConfFilePath(jobType);
+            if (jobConf != null && jobConf.length() > 0) {
+                buf.append(" -conf ").append(jobConf);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java
new file mode 100644
index 0000000..cf28ed6
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.engine.mr.ILookupMaterializer;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class HBaseLookupMaterializer implements ILookupMaterializer{
+
+    @Override
+    public void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName) {
+        HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube);
+        SnapshotTableDesc snapshotTableDesc = cube.getDescriptor().getSnapshotTableDesc(lookupTableName);
+        lookupMRSteps.addMaterializeLookupTableSteps(jobFlow, lookupTableName, snapshotTableDesc);
+    }
+
+    @Override
+    public void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube) {
+        HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube);
+        lookupMRSteps.addMaterializeLookupTablesSteps(jobFlow);
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java
new file mode 100644
index 0000000..3e8c2c5
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.IExtLookupProvider;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupTableCache;
+import org.apache.kylin.engine.mr.ILookupMaterializer;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Use HBase as lookup table storage
+ */
+public class HBaseLookupProvider implements IExtLookupProvider{
+    protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupProvider.class);
+
+
+    @Override
+    public ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        return new HBaseLookupTable(tableDesc, extTableSnapshot);
+    }
+
+    @Override
+    public IExtLookupTableCache getLocalCache() {
+        return RocksDBLookupTableCache.getInstance(KylinConfig.getInstanceFromEnv());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+        if (engineInterface == ILookupMaterializer.class) {
+            return (I) new HBaseLookupMaterializer();
+        } else {
+            throw new RuntimeException("Cannot adapt to " + engineInterface);
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java
new file mode 100644
index 0000000..9ceabd2
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dict.lookup.AbstractLookupRowEncoder;
+import org.apache.kylin.metadata.model.TableDesc;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+
+/**
+ * encode/decode original table row to hBase row
+ * 
+ */
+public class HBaseLookupRowEncoder extends AbstractLookupRowEncoder<HBaseRow> {
+    public static final String CF_STRING = "F";
+    public static final byte[] CF = Bytes.toBytes(CF_STRING);
+
+    private int shardNum;
+
+    public HBaseLookupRowEncoder(TableDesc tableDesc, String[] keyColumns, int shardNum) {
+        super(tableDesc, keyColumns);
+        this.shardNum = shardNum;
+    }
+
+    @Override
+    public HBaseRow encode(String[] row) {
+        String[] keys = getKeyData(row);
+        String[] values = getValueData(row);
+        byte[] rowKey = encodeRowKey(keys);
+        NavigableMap<byte[], byte[]> qualifierValMap = Maps
+                .newTreeMap(org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR);
+        for (int i = 0; i < values.length; i++) {
+            byte[] qualifier = Bytes.toBytes(String.valueOf(valueIndexes[i]));
+            byte[] byteValue = toBytes(values[i]);
+            qualifierValMap.put(qualifier, byteValue);
+        }
+        return new HBaseRow(rowKey, qualifierValMap);
+    }
+
+    @Override
+    public String[] decode(HBaseRow hBaseRow) {
+        if (hBaseRow == null) {
+            return null;
+        }
+        String[] result = new String[columnsNum];
+        fillKeys(hBaseRow.rowKey, result);
+        fillValues(hBaseRow.qualifierValMap, result);
+
+        return result;
+    }
+
+    public byte[] encodeRowKey(String[] keys) {
+        keyByteBuffer.clear();
+        for (String key : keys) {
+            if (key == null) {
+                throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys));
+            }
+            byte[] byteKey = Bytes.toBytes(key);
+            keyByteBuffer.putShort((short) byteKey.length);
+            keyByteBuffer.put(byteKey);
+        }
+        byte[] result = new byte[RowConstants.ROWKEY_SHARDID_LEN + keyByteBuffer.position()];
+        System.arraycopy(keyByteBuffer.array(), 0, result, RowConstants.ROWKEY_SHARDID_LEN, keyByteBuffer.position());
+        short shard = ShardingHash.getShard(result, RowConstants.ROWKEY_SHARDID_LEN, result.length, shardNum);
+        BytesUtil.writeShort(shard, result, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        return result;
+    }
+
+    private void fillKeys(byte[] rowKey, String[] result) {
+        int keyNum = keyIndexes.length;
+        ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey);
+        byteBuffer.getShort(); // read shard
+        for (int i = 0; i < keyNum; i++) {
+            short keyLen = byteBuffer.getShort();
+            byte[] keyBytes = new byte[keyLen];
+            byteBuffer.get(keyBytes);
+            result[keyIndexes[i]] = Bytes.toString(keyBytes);
+        }
+    }
+
+    private void fillValues(Map<byte[], byte[]> qualifierValMap, String[] result) {
+        for (Entry<byte[], byte[]> qualifierValEntry : qualifierValMap.entrySet()) {
+            byte[] qualifier = qualifierValEntry.getKey();
+            byte[] value = qualifierValEntry.getValue();
+            int valIdx = Integer.valueOf(Bytes.toString(qualifier));
+            result[valIdx] = fromBytes(value);
+        }
+    }
+
+    public static class HBaseRow {
+        private byte[] rowKey;
+        private NavigableMap<byte[], byte[]> qualifierValMap;
+
+        public HBaseRow(byte[] rowKey, NavigableMap<byte[], byte[]> qualifierValMap) {
+            this.rowKey = rowKey;
+            this.qualifierValMap = qualifierValMap;
+        }
+
+        public byte[] getRowKey() {
+            return rowKey;
+        }
+
+        public NavigableMap<byte[], byte[]> getQualifierValMap() {
+            return qualifierValMap;
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java
new file mode 100644
index 0000000..78877f7
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Use HBase as lookup table storage
+ */
+public class HBaseLookupTable implements ILookupTable{
+    protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupTable.class);
+
+    private TableName lookupTableName;
+    private Table table;
+
+    private HBaseLookupRowEncoder encoder;
+
+    public HBaseLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        String tableName = extTableSnapshot.getStorageLocationIdentifier();
+        this.lookupTableName = TableName.valueOf(tableName);
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Connection connection = HBaseConnection.get(kylinConfig.getStorageUrl());
+        try {
+            table = connection.getTable(lookupTableName);
+        } catch (IOException e) {
+            throw new RuntimeException("error when connect HBase", e);
+        }
+
+        String[] keyColumns = extTableSnapshot.getKeyColumns();
+        encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, extTableSnapshot.getShardNum());
+    }
+
+    @Override
+    public String[] getRow(Array<String> key) {
+        byte[] encodedKey = encoder.encodeRowKey(key.data);
+        Get get = new Get(encodedKey);
+        try {
+            Result result = table.get(get);
+            if (result.isEmpty()) {
+                return null;
+            }
+            return encoder.decode(new HBaseRow(result.getRow(), result.getFamilyMap(HBaseLookupRowEncoder.CF)));
+        } catch (IOException e) {
+            throw new RuntimeException("error when get row from hBase", e);
+        }
+    }
+
+    @Override
+    public Iterator<String[]> iterator() {
+        return new HBaseScanBasedIterator(table);
+    }
+
+    @Override
+    public void close() throws IOException{
+        table.close();
+    }
+
+    private class HBaseScanBasedIterator implements Iterator<String[]> {
+        private Iterator<Result> scannerIterator;
+        private long counter;
+
+        public HBaseScanBasedIterator(Table table) {
+            try {
+                Scan scan = new Scan();
+                scan.setCaching(1000);
+                ResultScanner scanner = table.getScanner(HBaseLookupRowEncoder.CF);
+                scannerIterator = scanner.iterator();
+            } catch (IOException e) {
+                logger.error("error when scan HBase", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return scannerIterator.hasNext();
+        }
+
+        @Override
+        public String[] next() {
+            counter ++;
+            if (counter % 100000 == 0) {
+                logger.info("scanned {} rows from hBase", counter);
+            }
+            Result result = scannerIterator.next();
+            byte[] rowKey = result.getRow();
+            NavigableMap<byte[], byte[]> qualifierValMap = result.getFamilyMap(HBaseLookupRowEncoder.CF);
+            return encoder.decode(new HBaseRow(rowKey, qualifierValMap));
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove is not supported");
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java
new file mode 100644
index 0000000..3daffa3
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Most code from {@link KeyValueSortReducer}, add logic to check whether the row key has duplicated
+ * if there is duplicated key, throws IllegalStateException
+ */
+public class KVSortReducerWithDupKeyCheck extends KeyValueSortReducer {
+    protected void reduce(
+            ImmutableBytesWritable row,
+            java.lang.Iterable<KeyValue> kvs,
+            org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+            throws java.io.IOException, InterruptedException {
+        TreeSet<KeyValue> map = new TreeSet<>(KeyValue.COMPARATOR);
+
+        TreeSet<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+        for (KeyValue kv : kvs) {
+            byte[] qualifier = CellUtil.cloneQualifier(kv);
+            if (qualifierSet.contains(qualifier)) {
+                throw new IllegalStateException("there is duplicate key:" + row);
+            }
+            qualifierSet.add(qualifier);
+            try {
+                map.add(kv.clone());
+            } catch (CloneNotSupportedException e) {
+                throw new java.io.IOException(e);
+            }
+        }
+        context.setStatus("Read " + map.getClass());
+        int index = 0;
+        for (KeyValue kv : map) {
+            context.write(row, kv);
+            if (++index % 100 == 0)
+                context.setStatus("Wrote " + index);
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java
new file mode 100644
index 0000000..5598ed9
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LookupTableHFilesBulkLoadJob extends AbstractHadoopJob {
+
+    protected static final Logger logger = LoggerFactory.getLogger(LookupTableHFilesBulkLoadJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_TABLE_NAME);
+        options.addOption(OPTION_CUBING_JOB_ID);
+        options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
+        parseOptions(options, args);
+
+        String tableName = getOptionValue(OPTION_TABLE_NAME);
+        String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
+        String snapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
+        DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(cubingJobID);
+
+        ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+        ExtTableSnapshotInfo snapshot = extTableSnapshotInfoManager.getSnapshot(tableName, snapshotID);
+        long srcTableRowCnt = Long.valueOf(job.findExtraInfoBackward(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName, "-1"));
+        logger.info("update table:{} snapshot row count:{}", tableName, srcTableRowCnt);
+        snapshot.setRowCnt(srcTableRowCnt);
+        snapshot.setLastBuildTime(System.currentTimeMillis());
+        extTableSnapshotInfoManager.updateSnapshot(snapshot);
+
+        String hTableName = snapshot.getStorageLocationIdentifier();
+        // e.g
+        // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
+        // end with "/"
+        String input = getOptionValue(OPTION_INPUT_PATH);
+
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        FsShell shell = new FsShell(conf);
+
+        int exitCode = -1;
+        int retryCount = 10;
+        while (exitCode != 0 && retryCount >= 1) {
+            exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
+            retryCount--;
+            Thread.sleep(5000);
+        }
+
+        if (exitCode != 0) {
+            logger.error("Failed to change the file permissions: " + input);
+            throw new IOException("Failed to change the file permissions: " + input);
+        }
+
+        String[] newArgs = new String[2];
+        newArgs[0] = input;
+        newArgs[1] = hTableName;
+
+        logger.debug("Start to run LoadIncrementalHFiles");
+        int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
+        logger.debug("End to run LoadIncrementalHFiles");
+        return ret;
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new LookupTableHFilesBulkLoadJob(), args);
+        System.exit(exitCode);
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
new file mode 100644
index 0000000..aac0108
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LookupTableToHFileJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(LookupTableToHFileJob.class);
+
+    private static String ALPHA_NUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+    private static int HBASE_TABLE_LENGTH = 10;
+
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_TABLE_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
+            parseOptions(options, args);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String tableName = getOptionValue(OPTION_TABLE_NAME);
+            String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
+            String lookupSnapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);
+
+            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+
+            TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName,
+                    cube.getProject());
+
+            ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+            removeSnapshotIfExist(extSnapshotInfoManager, kylinConfig, tableName, lookupSnapshotID);
+
+            IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc);
+
+            logger.info("create HTable for source table snapshot:{}", tableName);
+            Pair<String, Integer> hTableNameAndShard = createHTable(tableName, sourceTable, kylinConfig);
+            String[] keyColumns = getLookupKeyColumns(cube, tableName);
+            ExtTableSnapshotInfo snapshot = createSnapshotResource(extSnapshotInfoManager, tableName, lookupSnapshotID,
+                    keyColumns, hTableNameAndShard.getFirst(), hTableNameAndShard.getSecond(), sourceTable);
+            logger.info("created snapshot information at:{}", snapshot.getResourcePath());
+            saveSnapshotInfoToJobContext(kylinConfig, cubingJobID, snapshot);
+
+            job = Job.getInstance(HBaseConfiguration.create(getConf()), getOptionValue(OPTION_JOB_NAME));
+
+            setJobClasspath(job, cube.getConfig());
+            // For separate HBase cluster, note the output is a qualified HDFS path if "kylin.storage.hbase.cluster-fs" is configured, ref HBaseMRSteps.getHFilePath()
+            HBaseConnection.addHBaseClusterNNHAConfiguration(job.getConfiguration());
+
+            FileOutputFormat.setOutputPath(job, output);
+
+            IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
+            tableInputFormat.configureJob(job);
+            job.setMapperClass(LookupTableToHFileMapper.class);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, tableName);
+            // set block replication to 3 for hfiles
+            job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+            job.getConfiguration().set(BatchConstants.CFG_SHARD_NUM, String.valueOf(hTableNameAndShard.getSecond()));
+            // add metadata to distributed cache
+            attachCubeMetadata(cube, job.getConfiguration());
+
+            Connection conn = getHBaseConnection(kylinConfig);
+            HTable htable = (HTable) conn.getTable(TableName.valueOf(hTableNameAndShard.getFirst()));
+
+            // Automatic config !
+            HFileOutputFormat2.configureIncrementalLoad(job, htable, htable.getRegionLocator());
+
+            job.setReducerClass(KVSortReducerWithDupKeyCheck.class);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    private void removeSnapshotIfExist(ExtTableSnapshotInfoManager extSnapshotInfoManager, KylinConfig kylinConfig,
+            String tableName, String lookupSnapshotID) throws IOException {
+        ExtTableSnapshotInfo snapshotInfo = null;
+        try {
+            snapshotInfo = extSnapshotInfoManager.getSnapshot(tableName, lookupSnapshotID);
+        } catch (Exception e) {
+            // swallow the exception, means not snapshot exist of this snapshot id
+        }
+        if (snapshotInfo == null) {
+            return;
+        }
+        logger.info("the table:{} snapshot:{} exist, remove it", tableName, lookupSnapshotID);
+        extSnapshotInfoManager.removeSnapshot(tableName, lookupSnapshotID);
+        String hTableName = snapshotInfo.getStorageLocationIdentifier();
+        logger.info("remove related HBase table:{} for snapshot:{}", hTableName, lookupSnapshotID);
+        Connection conn = getHBaseConnection(kylinConfig);
+        Admin admin = conn.getAdmin();
+        admin.deleteTable(TableName.valueOf(hTableName));
+    }
+
+    private String[] getLookupKeyColumns(CubeInstance cube, String tableName) {
+        CubeDesc cubeDesc = cube.getDescriptor();
+        DataModelDesc modelDesc = cubeDesc.getModel();
+        TableRef lookupTableRef = null;
+        for (TableRef tableRef : modelDesc.getLookupTables()) {
+            if (tableRef.getTableIdentity().equalsIgnoreCase(tableName)) {
+                lookupTableRef = tableRef;
+                break;
+            }
+        }
+        if (lookupTableRef == null) {
+            throw new IllegalStateException("cannot find table in model:" + tableName);
+        }
+        JoinDesc joinDesc = modelDesc.getJoinByPKSide(lookupTableRef);
+        TblColRef[] keyColRefs = joinDesc.getPrimaryKeyColumns();
+        String[] result = new String[keyColRefs.length];
+        for (int i = 0; i < keyColRefs.length; i++) {
+            result[i] = keyColRefs[i].getName();
+        }
+        return result;
+    }
+
+    private void saveSnapshotInfoToJobContext(KylinConfig kylinConfig, String jobID, ExtTableSnapshotInfo snapshot) {
+        ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
+        DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(jobID);
+        job.addExtraInfo(BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + snapshot.getTableName(),
+                snapshot.getResourcePath());
+    }
+
+    /**
+     *
+     * @param sourceTableName
+     * @param sourceTable
+     * @param kylinConfig
+     * @return Pair of HTableName and shard number
+     * @throws IOException
+     */
+    private Pair<String, Integer> createHTable(String sourceTableName, IReadableTable sourceTable,
+            KylinConfig kylinConfig) throws IOException {
+        TableSignature signature = sourceTable.getSignature();
+        int shardNum = calculateShardNum(kylinConfig, signature.getSize());
+        Connection conn = getHBaseConnection(kylinConfig);
+        Admin admin = conn.getAdmin();
+        String hTableName = genHTableName(kylinConfig, admin, sourceTableName);
+
+        TableName tableName = TableName.valueOf(hTableName);
+        HTableDescriptor hTableDesc = new HTableDescriptor(tableName);
+        hTableDesc.setCompactionEnabled(false);
+        hTableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+        hTableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+        hTableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+        String commitInfo = KylinVersion.getGitCommitInfo();
+        if (!StringUtils.isEmpty(commitInfo)) {
+            hTableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+        }
+
+        HColumnDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false);
+        hTableDesc.addFamily(cf);
+
+        try {
+            if (shardNum > 1) {
+                admin.createTable(hTableDesc, getSplitsByShardNum(shardNum));
+            } else {
+                admin.createTable(hTableDesc);
+            }
+        } finally {
+            IOUtils.closeQuietly(admin);
+        }
+        return new Pair<>(hTableName, shardNum);
+    }
+
+    private int calculateShardNum(KylinConfig kylinConfig, long dataSize) {
+        long shardSize = kylinConfig.getExtTableSnapshotShardingMB() * 1024 * 1024;
+        return dataSize < shardSize ? 1 : (int) (Math.ceil(dataSize / shardSize));
+    }
+
+    private byte[][] getSplitsByShardNum(int shardNum) {
+        byte[][] result = new byte[shardNum - 1][];
+        for (int i = 1; i < shardNum; ++i) {
+            byte[] split = new byte[RowConstants.ROWKEY_SHARDID_LEN];
+            BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
+            result[i - 1] = split;
+        }
+        return result;
+    }
+
+    private ExtTableSnapshotInfo createSnapshotResource(ExtTableSnapshotInfoManager extSnapshotInfoManager,
+            String tableName, String snapshotID, String[] keyColumns, String hTableName, int shardNum,
+            IReadableTable sourceTable) throws IOException {
+        return extSnapshotInfoManager.createSnapshot(sourceTable.getSignature(), tableName, snapshotID, keyColumns,
+                shardNum, ExtTableSnapshotInfo.STORAGE_TYPE_HBASE, hTableName);
+    }
+
+    private String genHTableName(KylinConfig kylinConfig, Admin admin, String tableName) throws IOException {
+        String namePrefix = kylinConfig.getHBaseTableNamePrefix()
+                + IRealizationConstants.LookupHbaseStorageLocationPrefix + tableName + "_";
+        String namespace = kylinConfig.getHBaseStorageNameSpace();
+        String hTableName;
+        Random ran = new Random();
+        do {
+            StringBuffer sb = new StringBuffer();
+            if ((namespace.equals("default") || namespace.equals("")) == false) {
+                sb.append(namespace).append(":");
+            }
+            sb.append(namePrefix);
+            for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
+                sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
+            }
+            hTableName = sb.toString();
+        } while (hTableExists(admin, hTableName));
+
+        return hTableName;
+    }
+
+    private boolean hTableExists(Admin admin, String hTableName) throws IOException {
+        return admin.tableExists(TableName.valueOf(hTableName));
+    }
+
+    private Connection getHBaseConnection(KylinConfig kylinConfig) throws IOException {
+        return HBaseConnection.get(kylinConfig.getStorageUrl());
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new LookupTableToHFileJob(), args);
+        System.exit(exitCode);
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java
new file mode 100644
index 0000000..4be9533
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+
+public class LookupTableToHFileMapper<KEYIN> extends KylinMapper<KEYIN, Object, ImmutableBytesWritable, KeyValue> {
+    ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+
+    private String cubeName;
+    private CubeDesc cubeDesc;
+    private String tableName;
+    private int shardNum;
+    private IMRTableInputFormat lookupTableInputFormat;
+    private long timestamp = 0;
+    private HBaseLookupRowEncoder encoder;
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+        tableName = context.getConfiguration().get(BatchConstants.CFG_TABLE_NAME);
+        shardNum = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_SHARD_NUM));
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+        cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
+        DataModelDesc modelDesc = cubeDesc.getModel();
+        TableDesc tableDesc = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(
+                tableName, cubeDesc.getProject());
+        TableRef lookupTableRef = null;
+        for (TableRef tableRef : modelDesc.getLookupTables()) {
+            if (tableRef.getTableIdentity().equalsIgnoreCase(tableName)) {
+                lookupTableRef = tableRef;
+                break;
+            }
+        }
+        JoinDesc joinDesc = modelDesc.getJoinByPKSide(lookupTableRef);
+        TblColRef[] keyColRefs = joinDesc.getPrimaryKeyColumns();
+        String[] keyColumns = new String[keyColRefs.length];
+        for (int i = 0; i < keyColRefs.length; i++) {
+            keyColumns[i] = keyColRefs[i].getName();
+        }
+        encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, shardNum);
+        lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc);
+    }
+
+    @Override
+    public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
+        Collection<String[]> rowCollection = lookupTableInputFormat.parseMapperInput(value);
+        for (String[] row : rowCollection) {
+            HBaseRow hBaseRow = encoder.encode(row);
+
+            byte[] rowKey = hBaseRow.getRowKey();
+            Map<byte[], byte[]> qualifierValMap = hBaseRow.getQualifierValMap();
+            outputKey.set(rowKey);
+            for (Entry<byte[], byte[]> qualifierValEntry : qualifierValMap.entrySet()) {
+                KeyValue outputValue = createKeyValue(rowKey, qualifierValEntry.getKey(), qualifierValEntry.getValue());
+                context.write(outputKey, outputValue);
+            }
+        }
+    }
+
+    private KeyValue createKeyValue(byte[] keyBytes, byte[] qualifier, byte[] value) {
+        return new KeyValue(keyBytes, 0, keyBytes.length, //
+                HBaseLookupRowEncoder.CF, 0, HBaseLookupRowEncoder.CF.length, //
+                qualifier, 0, qualifier.length, //
+                timestamp, KeyValue.Type.Put, //
+                value, 0, value.length);
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
new file mode 100644
index 0000000..409116d
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.ExecuteResult.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateSnapshotCacheForQueryServersStep extends AbstractExecutable {
+    private static final Logger logger = LoggerFactory.getLogger(UpdateSnapshotCacheForQueryServersStep.class);
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final String tableName = LookupExecutableUtil.getLookupTableName(this.getParams());
+        final String snapshotID = LookupExecutableUtil.getLookupSnapshotID(this.getParams());
+        final String projectName = LookupExecutableUtil.getProjectName(this.getParams());
+
+        final KylinConfig config = KylinConfig.getInstanceFromEnv();
+        int checkInterval = 10 * 1000;
+        int maxCheckTime = 10 * 60 * 1000;
+
+        StringWriter outputWriter = new StringWriter();
+        PrintWriter pw = new PrintWriter(outputWriter);
+        String[] restServers = config.getRestServers();
+        List<String> serversNeedCheck = Lists.newArrayList();
+        for (String restServer : restServers) {
+            logger.info("send build lookup table cache request to server: " + restServer);
+            try {
+                RestClient restClient = new RestClient(restServer);
+                restClient.buildLookupSnapshotCache(projectName, tableName, snapshotID);
+                serversNeedCheck.add(restServer);
+            } catch (IOException e) {
+                logger.error("error when send build cache request to rest server:" + restServer, e);
+                pw.println("cache build fail for rest server:" + restServer);
+            }
+        }
+        if (serversNeedCheck.isEmpty()) {
+            return new ExecuteResult(State.SUCCEED, outputWriter.toString());
+        }
+
+        List<String> completeServers = Lists.newArrayList();
+        long startTime = System.currentTimeMillis();
+        while ((System.currentTimeMillis() - startTime) < maxCheckTime) {
+            serversNeedCheck.removeAll(completeServers);
+            if (serversNeedCheck.isEmpty()) {
+                break;
+            }
+            for (String restServer : serversNeedCheck) {
+                logger.info("check lookup table cache build status for server: " + restServer);
+                try {
+                    RestClient restClient = new RestClient(restServer);
+                    String stateName = restClient.getLookupSnapshotCacheState(tableName, snapshotID);
+                    if (!stateName.equals(CacheState.IN_BUILDING.name())) {
+                        completeServers.add(restServer);
+                        pw.println("cache build complete for rest server:" + restServer + " cache state:" + stateName);
+                    }
+                } catch (IOException e) {
+                    logger.error("error when send build cache request to rest server:" + restServer, e);
+                }
+            }
+            try {
+                Thread.sleep(checkInterval);
+            } catch (InterruptedException e) {
+                logger.error("interrupted", e);
+            }
+        }
+        serversNeedCheck.removeAll(completeServers);
+        if (!serversNeedCheck.isEmpty()) {
+            pw.println();
+            pw.println("check timeout!");
+            pw.println("servers not complete:" + serversNeedCheck);
+        }
+        return new ExecuteResult(State.SUCCEED, outputWriter.toString());
+    }
+
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java
new file mode 100644
index 0000000..2b21ae3
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.NavigableMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HBaseLookupRowEncoderTest extends LocalFileMetadataTestCase {
+    private TableDesc tableDesc;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testEnDeCode() {
+        HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }, 1);
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        HBaseRow hBaseRow = lookupRowEncoder.encode(row);
+
+        assertEquals(6, hBaseRow.getRowKey().length);
+        assertEquals(3, hBaseRow.getQualifierValMap().size());
+        NavigableMap<byte[], byte[]> qualifierMap = hBaseRow.getQualifierValMap();
+        assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1"))));
+        assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2"))));
+        String[] decodeRow = lookupRowEncoder.decode(hBaseRow);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithNullValue() {
+        HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }, 1);
+        String[] row = new String[] { "AD", "42.546245", "1.601554", null };
+        HBaseRow hBaseRow = lookupRowEncoder.encode(row);
+
+        assertEquals(6, hBaseRow.getRowKey().length);
+        assertEquals(3, hBaseRow.getQualifierValMap().size());
+        NavigableMap<byte[], byte[]> qualifierMap = hBaseRow.getQualifierValMap();
+        assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1"))));
+        assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2"))));
+        String[] decodeRow = lookupRowEncoder.decode(hBaseRow);
+        assertNull(decodeRow[3]);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithMultiKeys() {
+        HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc,
+                new String[] { "COUNTRY", "NAME" }, 1);
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        HBaseRow hBaseRow = lookupRowEncoder.encode(row);
+
+        assertEquals(2, hBaseRow.getQualifierValMap().size());
+        NavigableMap<byte[], byte[]> qualifierMap = hBaseRow.getQualifierValMap();
+        assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1"))));
+        assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2"))));
+        String[] decodeRow = lookupRowEncoder.decode(hBaseRow);
+        assertArrayEquals(row, decodeRow);
+    }
+
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java
new file mode 100644
index 0000000..cbd0461
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+//package org.apache.kylin.storage.hbase.lookup;
+//
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.mapreduce.Job;
+//import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+//import org.apache.kylin.cube.CubeSegment;
+//import org.apache.kylin.engine.mr.CubingJob;
+//import org.apache.kylin.job.engine.JobEngineConfig;
+//import org.junit.After;
+//import org.junit.Before;
+//import org.junit.Rule;
+//import org.junit.Test;
+//import org.powermock.api.mockito.PowerMockito;
+//import org.powermock.core.classloader.annotations.PrepareForTest;
+//import org.powermock.modules.junit4.rule.PowerMockRule;
+//
+//@PrepareForTest({ LookupTableToHFileJob.class, Job.class})
+//public class LookupTableToHFileJobTest extends LocalFileMetadataTestCase {
+//
+//    @Rule
+//    public PowerMockRule rule = new PowerMockRule();
+//
+//    @Before
+//    public void setup() throws Exception {
+//        createTestMetadata();
+//    }
+//
+//    @After
+//    public void after() throws Exception {
+//        cleanupTestMetadata();
+//    }
+//
+//    @Test
+//    public void testRun() throws Exception {
+//        String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+//        String segmentID = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b";
+//        CubeInstance cubeInstance = CubeManager.getInstance(getTestConfig()).getCube(cubeName);
+//        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentID);
+//
+//        Configuration conf = HadoopUtil.getCurrentConfiguration();
+//        conf.set("fs.defaultFS", "file:///");
+//        conf.set("mapreduce.framework.name", "local");
+//        conf.set("mapreduce.application.framework.path", "");
+//        conf.set("fs.file.impl.disable.cache", "true");
+//
+//        FileSystem localFileSystem = new LocalFileSystem();
+//        URI uri = URI.create("file:///");
+//        localFileSystem.initialize(uri, conf);
+//
+//        Job mockedJob = createMockMRJob(conf);
+//        PowerMockito.stub(PowerMockito.method(Job.class, "getInstance", Configuration.class, String.class))
+//                .toReturn(mockedJob);
+//        PowerMockito.stub(PowerMockito.method(Job.class, "getInstance", Configuration.class, String.class))
+//                .toReturn(mockedJob);
+//
+//        StringBuilder cmd = new StringBuilder();
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+//                "Build_Lookup_Table_For_Segment_20130331080000_20131212080000_Step");
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME,
+//                cubeName);
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID,
+//                segmentID);
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getOutputPath());
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, "EDW.TEST_SITES");
+//
+//        LookupTableToHFileJob job = new LookupTableToHFileJob();
+//        job.setConf(conf);
+//        job.setAsync(true);
+//
+//        String[] args = cmd.toString().trim().split("\\s+");
+//        job.run(args);
+//    }
+//
+//    private String getOutputPath() {
+//        return "_tmp_output";
+//    }
+//
+//    private CubingJob createMockCubingJob(CubeSegment cubeSeg) {
+//        JobEngineConfig jobEngineConfig = new JobEngineConfig(getTestConfig());
+//        CubingJob cubingJob = CubingJob.createBuildJob(cubeSeg, "unitTest", jobEngineConfig);
+//
+//        return cubingJob;
+//    }
+//
+//    private Job createMockMRJob(Configuration conf) throws Exception {
+//        Job job = PowerMockito.mock(Job.class);
+//        PowerMockito.when(job.getConfiguration()).thenReturn(conf);
+//        PowerMockito.doNothing().when(job).submit();
+//        return job;
+//    }
+
+//}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
new file mode 100644
index 0000000..e98762d
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.DefaultContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class UpdateSnapshotCacheForQueryServersStepTest extends LocalFileMetadataTestCase {
+    private KylinConfig kylinConfig;
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testExecute() throws ExecuteException {
+        UpdateSnapshotCacheForQueryServersStep step = new UpdateSnapshotCacheForQueryServersStep();
+        ExecuteResult result = step.doWork(new DefaultContext(Maps.<String, Executable>newConcurrentMap(), kylinConfig));
+        System.out.println(result.output());
+        assertTrue(result.succeed());
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.