You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/21 00:27:36 UTC

[kylin] 04/05: KYLIN-3376 Some improvements for lookup table - query change

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3221
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1d7ec09cb0e0e2c126a5cbe3b83491ce04278a69
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu May 10 13:09:11 2018 +0800

    KYLIN-3376 Some improvements for lookup table - query change
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../apache/kylin/cube/CubeCapabilityChecker.java   |   2 +-
 core-dictionary/pom.xml                            |   5 +
 .../kylin/dict/lookup/LookupStringTable.java       |  12 +-
 .../org/apache/kylin/dict/lookup/LookupTable.java  |   4 +-
 .../dict/lookup/cache/RocksDBLookupBuilder.java    |  83 ++++
 .../dict/lookup/cache/RocksDBLookupRowEncoder.java |  70 ++++
 .../dict/lookup/cache/RocksDBLookupTable.java      | 113 ++++++
 .../dict/lookup/cache/RocksDBLookupTableCache.java | 420 +++++++++++++++++++++
 .../lookup/cache/RocksDBLookupRowEncoderTest.java  |  80 ++++
 .../lookup/cache/RocksDBLookupTableCacheTest.java  | 220 +++++++++++
 .../dict/lookup/cache/RocksDBLookupTableTest.java  | 161 ++++++++
 .../org/apache/kylin/job/SelfStopExecutable.java   |   2 +-
 .../kylin/storage/gtrecord/CubeTupleConverter.java |  63 ++--
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |  14 +-
 .../kylin/storage/gtrecord/ITupleConverter.java    |   3 +-
 .../storage/gtrecord/SegmentCubeTupleIterator.java |   1 +
 .../storage/translate/DerivedFilterTranslator.java |   6 +-
 pom.xml                                            |   6 +
 .../query/enumerator/LookupTableEnumerator.java    |  19 +-
 19 files changed, 1224 insertions(+), 60 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 5dffd96..e12b689 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -75,7 +75,7 @@ public class CubeCapabilityChecker {
             }
         } else {
             //for non query-on-facttable 
-            if (cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable)) {
+            if (cube.getSegments().get(0).getSnapshots().containsKey(digest.factTable) || cube.getSnapshots().containsKey(digest.factTable)) {
 
                 Set<TblColRef> dimCols = Sets.newHashSet(cube.getModel().findFirstTable(digest.factTable).getColumns());
 
diff --git a/core-dictionary/pom.xml b/core-dictionary/pom.xml
index 7e8bee1..d82d204 100644
--- a/core-dictionary/pom.xml
+++ b/core-dictionary/pom.xml
@@ -38,6 +38,11 @@
             <artifactId>kylin-core-metadata</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.rocksdb</groupId>
+            <artifactId>rocksdbjni</artifactId>
+        </dependency>
+
         <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
index 886de22..1d0348a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict.lookup;
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.Iterator;
 
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.datatype.DataType;
@@ -31,7 +32,7 @@ import org.apache.kylin.source.IReadableTable;
  * @author yangli9
  * 
  */
-public class LookupStringTable extends LookupTable<String> {
+public class LookupStringTable extends LookupTable<String> implements ILookupTable{
 
     private static final Comparator<String> dateStrComparator = new Comparator<String>() {
         @Override
@@ -109,4 +110,13 @@ public class LookupStringTable extends LookupTable<String> {
         return String.class;
     }
 
+    @Override
+    public Iterator<String[]> iterator() {
+        return data.values().iterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
 }
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index a99ef29..e1123fa 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -22,10 +22,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.Array;
@@ -53,7 +53,7 @@ abstract public class LookupTable<T> {
         this.tableDesc = tableDesc;
         this.keyColumns = keyColumns;
         this.table = table;
-        this.data = new ConcurrentHashMap<Array<T>, T[]>();
+        this.data = new HashMap<Array<T>, T[]>();
         init();
     }
 
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java
new file mode 100644
index 0000000..269684e
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupBuilder.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.util.SizeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocksDBLookupBuilder {
+    private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupBuilder.class);
+
+    static {
+        RocksDB.loadLibrary();
+    }
+    private Options options;
+    private String dbPath;
+    private TableDesc tableDesc;
+    private RocksDBLookupRowEncoder encoder;
+    private int writeBatchSize;
+
+    public RocksDBLookupBuilder(TableDesc tableDesc, String[] keyColumns, String dbPath) {
+        this.tableDesc = tableDesc;
+        this.encoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns);
+        this.dbPath = dbPath;
+        this.writeBatchSize = 500;
+        this.options = new Options();
+        options.setCreateIfMissing(true).setWriteBufferSize(8 * SizeUnit.KB).setMaxWriteBufferNumber(3)
+                .setMaxBackgroundCompactions(5).setCompressionType(CompressionType.SNAPPY_COMPRESSION)
+                .setCompactionStyle(CompactionStyle.UNIVERSAL);
+
+    }
+
+    public void build(ILookupTable srcLookupTable) {
+        File dbFolder = new File(dbPath);
+        if (dbFolder.exists()) {
+            logger.info("remove rocksdb folder:{} to rebuild table cache:{}", dbPath, tableDesc.getIdentity());
+            FileUtils.deleteQuietly(dbFolder);
+        } else {
+            logger.info("create new rocksdb folder:{} for table cache:{}", dbPath, tableDesc.getIdentity());
+            dbFolder.mkdirs();
+        }
+        logger.info("start to build lookup table:{} to rocks db:{}", tableDesc.getIdentity(), dbPath);
+        try (RocksDB rocksDB = RocksDB.open(options, dbPath)) {
+            // todo use batch may improve write performance
+            for (String[] row : srcLookupTable) {
+                KV kv = encoder.encode(row);
+
+                rocksDB.put(kv.getKey(), kv.getValue());
+            }
+        } catch (RocksDBException e) {
+            throw new RuntimeException("error when write data to rocks db", e);
+        }
+
+        logger.info("source table:{} has been written to rocks db:{}", tableDesc.getIdentity(), dbPath);
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java
new file mode 100644
index 0000000..7455a7b
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import org.apache.kylin.dict.lookup.AbstractLookupRowEncoder;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * encode/decode original table row to rocksDB KV
+ * 
+ */
+public class RocksDBLookupRowEncoder extends AbstractLookupRowEncoder<KV>{
+
+    public RocksDBLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) {
+        super(tableDesc, keyColumns);
+    }
+
+    public KV encode(String[] row) {
+        String[] keys = getKeyData(row);
+        String[] values = getValueData(row);
+        byte[] encodeKey = encodeStringsWithLenPfx(keys, false);
+        byte[] encodeValue = encodeStringsWithLenPfx(values, true);
+
+        return new KV(encodeKey, encodeValue);
+    }
+
+    public String[] decode(KV kv) {
+        String[] result = new String[columnsNum];
+
+        decodeFromLenPfxBytes(kv.key, keyIndexes, result);
+        decodeFromLenPfxBytes(kv.value, valueIndexes, result);
+
+        return result;
+    }
+
+    public static class KV {
+        private byte[] key;
+        private byte[] value;
+
+        public KV(byte[] key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public byte[] getKey() {
+            return key;
+        }
+
+        public byte[] getValue() {
+            return value;
+        }
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java
new file mode 100644
index 0000000..4f0c816
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTable.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RocksDBLookupTable implements ILookupTable {
+    private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTable.class);
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    private TableDesc tableDesc;
+    private RocksDB rocksDB;
+    private Options options;
+
+    private RocksDBLookupRowEncoder rowEncoder;
+
+    public RocksDBLookupTable(TableDesc tableDesc, String[] keyColumns, String dbPath) {
+        this.tableDesc = tableDesc;
+        this.options = new Options();
+        this.rowEncoder = new RocksDBLookupRowEncoder(tableDesc, keyColumns);
+        try {
+            this.rocksDB = RocksDB.openReadOnly(options, dbPath);
+        } catch (RocksDBException e) {
+            throw new RuntimeException("cannot open rocks db in path:" + dbPath, e);
+        }
+    }
+
+    @Override
+    public String[] getRow(Array<String> key) {
+        byte[] encodeKey = rowEncoder.encodeStringsWithLenPfx(key.data, false);
+        try {
+            byte[] value = rocksDB.get(encodeKey);
+            if (value == null) {
+                return null;
+            }
+            return rowEncoder.decode(new KV(encodeKey, value));
+        } catch (RocksDBException e) {
+            throw new RuntimeException("error when get key from rocksdb", e);
+        }
+    }
+
+    @Override
+    public Iterator<String[]> iterator() {
+        final RocksIterator rocksIterator = rocksDB.newIterator();
+        rocksIterator.seekToFirst();
+
+        return new Iterator<String[]>() {
+            int counter;
+            @Override
+            public boolean hasNext() {
+                boolean valid = rocksIterator.isValid();
+                if (!valid) {
+                    rocksIterator.close();
+                }
+                return valid;
+            }
+
+            @Override
+            public String[] next() {
+                counter ++;
+                if (counter % 100000 == 0) {
+                    logger.info("scanned {} rows from rocksDB", counter);
+                }
+                String[] result = rowEncoder.decode(new KV(rocksIterator.key(), rocksIterator.value()));
+                rocksIterator.next();
+                return result;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("not support operation");
+            }
+        };
+    }
+
+    @Override
+    public void close() throws IOException {
+        options.close();
+        if (rocksDB != null) {
+            rocksDB.close();
+        }
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
new file mode 100644
index 0000000..460c448
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Predicate;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+
+public class RocksDBLookupTableCache implements IExtLookupTableCache {
+    private static final Logger logger = LoggerFactory.getLogger(RocksDBLookupTableCache.class);
+
+    private static final String CACHE_TYPE_ROCKSDB = "rocksdb";
+    private static final String STATE_FILE = "STATE";
+    private static final String DB_FILE = "db";
+
+    private String basePath;
+    private long maxCacheSizeInKB;
+    private Cache<String, CachedTableInfo> tablesCache;
+
+    private ConcurrentMap<String, Boolean> inBuildingTables = Maps.newConcurrentMap();
+
+    private ExecutorService cacheBuildExecutor;
+    private ScheduledExecutorService cacheStateCheckExecutor;
+    private CacheStateChecker cacheStateChecker;
+
+    // static cached instances
+    private static final ConcurrentMap<KylinConfig, RocksDBLookupTableCache> SERVICE_CACHE = new ConcurrentHashMap<>();
+
+    public static RocksDBLookupTableCache getInstance(KylinConfig config) {
+        RocksDBLookupTableCache r = SERVICE_CACHE.get(config);
+        if (r == null) {
+            synchronized (RocksDBLookupTableCache.class) {
+                r = SERVICE_CACHE.get(config);
+                if (r == null) {
+                    r = new RocksDBLookupTableCache(config);
+                    SERVICE_CACHE.put(config, r);
+                    if (SERVICE_CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return r;
+    }
+
+    public static void clearCache() {
+        synchronized (SERVICE_CACHE) {
+            SERVICE_CACHE.clear();
+        }
+    }
+
+    // ============================================================================
+
+    private KylinConfig config;
+
+    private RocksDBLookupTableCache(KylinConfig config) {
+        this.config = config;
+        init();
+    }
+
+    private void init() {
+        this.basePath = getCacheBasePath(config);
+
+        this.maxCacheSizeInKB = (long) (config.getExtTableSnapshotLocalCacheMaxSizeGB() * 1024 * 1024);
+        this.tablesCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, CachedTableInfo>() {
+            @Override
+            public void onRemoval(RemovalNotification<String, CachedTableInfo> notification) {
+                logger.warn(notification.getValue() + " is removed " + "because of " + notification.getCause());
+                notification.getValue().cleanStorage();
+            }
+        }).maximumWeight(maxCacheSizeInKB).weigher(new Weigher<String, CachedTableInfo>() {
+            @Override
+            public int weigh(String key, CachedTableInfo value) {
+                return value.getSizeInKB();
+            }
+        }).build();
+        restoreCacheState();
+        cacheStateChecker = new CacheStateChecker();
+        initExecutors();
+    }
+
+    protected static String getCacheBasePath(KylinConfig config) {
+        String basePath = config.getExtTableSnapshotLocalCachePath();
+        if (!basePath.startsWith("/")) {
+            basePath = KylinConfig.getKylinHome() + File.separator + basePath;
+        }
+        return basePath + File.separator + CACHE_TYPE_ROCKSDB;
+    }
+
+    private void restoreCacheState() {
+        File dbBaseFolder = new File(basePath);
+        if (!dbBaseFolder.exists()) {
+            dbBaseFolder.mkdirs();
+        }
+        Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(dbBaseFolder);
+        for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) {
+            for (File snapshotFolder : tableSnapshotsEntry.getValue()) {
+                initSnapshotState(tableSnapshotsEntry.getKey(), snapshotFolder);
+            }
+        }
+    }
+
+    private Map<String, File[]> getCachedTableSnapshotsFolders(File dbBaseFolder) {
+        Map<String, File[]> result = Maps.newHashMap();
+        File[] tableFolders = dbBaseFolder.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File file) {
+                return file.isDirectory();
+            }
+        });
+        if (tableFolders == null) {
+            return result;
+        }
+
+        for (File tableFolder : tableFolders) {
+            String tableName = tableFolder.getName();
+            File[] snapshotFolders = tableFolder.listFiles(new FileFilter() {
+                @Override
+                public boolean accept(File snapshotFile) {
+                    return snapshotFile.isDirectory();
+                }
+            });
+            result.put(tableName, snapshotFolders);
+        }
+        return result;
+    }
+
+    private void initSnapshotState(String tableName, File snapshotCacheFolder) {
+        String snapshotID = snapshotCacheFolder.getName();
+        File stateFile = getCacheStateFile(snapshotCacheFolder.getAbsolutePath());
+        if (stateFile.exists()) {
+            try {
+                String stateStr = Files.toString(stateFile, Charsets.UTF_8);
+                String resourcePath = ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID);
+                if (CacheState.AVAILABLE.name().equals(stateStr)) {
+                    tablesCache.put(resourcePath, new CachedTableInfo(snapshotCacheFolder.getAbsolutePath()));
+                }
+            } catch (IOException e) {
+                logger.error("error to read state file:" + stateFile.getAbsolutePath());
+            }
+        }
+    }
+
+    private void initExecutors() {
+        this.cacheBuildExecutor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("lookup-cache-build-thread"));
+        this.cacheStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(
+                "lookup-cache-state-checker"));
+        cacheStateCheckExecutor.scheduleAtFixedRate(cacheStateChecker, 10, 10 * 60, TimeUnit.SECONDS); // check every 10 minutes
+    }
+
+    @Override
+    public ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist) {
+        String resourcePath = extTableSnapshotInfo.getResourcePath();
+        if (inBuildingTables.containsKey(resourcePath)) {
+            logger.info("cache is in building for snapshot:" + resourcePath);
+            return null;
+        }
+        CachedTableInfo cachedTableInfo = tablesCache.getIfPresent(resourcePath);
+        if (cachedTableInfo == null) {
+            if (buildIfNotExist) {
+                buildSnapshotCache(tableDesc, extTableSnapshotInfo, getSourceLookupTable(tableDesc, extTableSnapshotInfo));
+            }
+            logger.info("no available cache ready for the table snapshot:" + extTableSnapshotInfo.getResourcePath());
+            return null;
+        }
+
+        String[] keyColumns = extTableSnapshotInfo.getKeyColumns();
+        String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+        return new RocksDBLookupTable(tableDesc, keyColumns, dbPath);
+    }
+
+    private ILookupTable getSourceLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) {
+        return LookupProviderFactory.getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo);
+    }
+
+    @Override
+    public void buildSnapshotCache(final TableDesc tableDesc, final ExtTableSnapshotInfo extTableSnapshotInfo, final ILookupTable sourceTable) {
+        if (extTableSnapshotInfo.getSignature().getSize() / 1024 > maxCacheSizeInKB * 2 / 3) {
+            logger.warn("the size is to large to build to cache for snapshot:{}, size:{}, skip cache building",
+                    extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo.getSignature().getSize());
+            return;
+        }
+        final String[] keyColumns = extTableSnapshotInfo.getKeyColumns();
+        final String cachePath = getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+        final String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+        final String snapshotResPath = extTableSnapshotInfo.getResourcePath();
+
+        if (inBuildingTables.containsKey(snapshotResPath)) {
+            logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath);
+            return;
+        }
+        if (inBuildingTables.putIfAbsent(snapshotResPath, true) == null) {
+            cacheBuildExecutor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, keyColumns, dbPath);
+                        builder.build(sourceTable);
+                    } finally {
+                        inBuildingTables.remove(snapshotResPath);
+                    }
+                    saveSnapshotCacheState(extTableSnapshotInfo, cachePath);
+                }
+            });
+
+        } else {
+            logger.info("there is already snapshot cache in building for snapshot:{}, skip it", snapshotResPath);
+        }
+    }
+
+    @Override
+    public void removeSnapshotCache(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        tablesCache.invalidate(extTableSnapshotInfo.getResourcePath());
+    }
+
+    @Override
+    public CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        String resourcePath = extTableSnapshotInfo.getResourcePath();
+        if (inBuildingTables.containsKey(resourcePath)) {
+            return CacheState.IN_BUILDING;
+        }
+        File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
+                extTableSnapshotInfo.getId()));
+        if (!stateFile.exists()) {
+            return CacheState.NONE;
+        }
+        try {
+            String stateString = Files.toString(stateFile, Charsets.UTF_8);
+            return CacheState.valueOf(stateString);
+        } catch (IOException e) {
+            logger.error("error when read state file", e);
+        }
+        return CacheState.NONE;
+    }
+
+    public long getTotalCacheSize() {
+        return FileUtils.sizeOfDirectory(new File(getCacheBasePath(config)));
+    }
+
+    public void checkCacheState() {
+        cacheStateChecker.run();
+    }
+
+    private void saveSnapshotCacheState(ExtTableSnapshotInfo extTableSnapshotInfo, String cachePath) {
+        File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
+                extTableSnapshotInfo.getId()));
+        try {
+            Files.write(CacheState.AVAILABLE.name(), stateFile, Charsets.UTF_8);
+            tablesCache.put(extTableSnapshotInfo.getResourcePath(), new CachedTableInfo(cachePath));
+        } catch (IOException e) {
+            throw new RuntimeException("error when write cache state for snapshot:"
+                    + extTableSnapshotInfo.getResourcePath());
+        }
+    }
+
+    private File getCacheStateFile(String snapshotCacheFolder) {
+        String stateFilePath = snapshotCacheFolder + File.separator + STATE_FILE;
+        return new File(stateFilePath);
+    }
+
+    protected String getSnapshotStorePath(String tableName, String snapshotID) {
+        return getSnapshotCachePath(tableName, snapshotID) + File.separator + DB_FILE;
+    }
+
+    protected String getSnapshotCachePath(String tableName, String snapshotID) {
+        return basePath + File.separator + tableName + File.separator + snapshotID;
+    }
+
+    private class CacheStateChecker implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                String cacheBasePath = getCacheBasePath(config);
+                logger.info("check snapshot local cache state, local path:{}", cacheBasePath);
+                File baseFolder = new File(cacheBasePath);
+                if (!baseFolder.exists()) {
+                    return;
+                }
+                Map<String, File[]> tableSnapshotsFileMap = getCachedTableSnapshotsFolders(baseFolder);
+                List<Pair<String, File>> allCachedSnapshots = Lists.newArrayList();
+                for (Entry<String, File[]> tableSnapshotsEntry : tableSnapshotsFileMap.entrySet()) {
+                    String tableName = tableSnapshotsEntry.getKey();
+                    for (File file : tableSnapshotsEntry.getValue()) {
+                        String snapshotID = file.getName();
+                        allCachedSnapshots
+                                .add(new Pair<>(ExtTableSnapshotInfo.getResourcePath(tableName, snapshotID), file));
+                    }
+                }
+
+                final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths();
+
+                List<Pair<String, File>> toRemovedCachedSnapshots = Lists.newArrayList(FluentIterable.from(
+                        allCachedSnapshots).filter(new Predicate<Pair<String, File>>() {
+                    @Override
+                    public boolean apply(@Nullable Pair<String, File> input) {
+                        return !activeSnapshotSet.contains(input.getFirst());
+                    }
+                }));
+                for (Pair<String, File> toRemovedCachedSnapshot : toRemovedCachedSnapshots) {
+                    File snapshotCacheFolder = toRemovedCachedSnapshot.getSecond();
+                    logger.info("removed cache file:{}, it is not referred by any cube",
+                            snapshotCacheFolder.getAbsolutePath());
+                    try {
+                        FileUtils.deleteDirectory(snapshotCacheFolder);
+                    } catch (IOException e) {
+                        logger.error("fail to remove folder:" + snapshotCacheFolder.getAbsolutePath(), e);
+                    }
+                    tablesCache.invalidate(toRemovedCachedSnapshot.getFirst());
+                }
+            } catch (Exception e) {
+                logger.error("error happens when check cache state", e);
+            }
+
+        }
+    }
+
+    private static class CachedTableInfo {
+        private String cachePath;
+
+        private long dbSize;
+
+        public CachedTableInfo(String cachePath) {
+            this.cachePath = cachePath;
+            this.dbSize = FileUtils.sizeOfDirectory(new File(cachePath));
+        }
+
+        public int getSizeInKB() {
+            return (int) (dbSize / 1024);
+        }
+
+        public void cleanStorage() {
+            logger.info("clean cache storage for path:" + cachePath);
+            try {
+                FileUtils.deleteDirectory(new File(cachePath));
+            } catch (IOException e) {
+                logger.error("file delete fail:" + cachePath, e);
+            }
+        }
+    }
+
+    /**
+     * A simple named thread factory.
+     */
+    private static class NamedThreadFactory implements ThreadFactory {
+        private final ThreadGroup group;
+        private final AtomicInteger threadNumber = new AtomicInteger(1);
+        private final String namePrefix;
+
+        public NamedThreadFactory(String threadPrefix) {
+            final SecurityManager s = System.getSecurityManager();
+            this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+            this.namePrefix = threadPrefix + "-";
+        }
+
+        @Override
+        public Thread newThread(Runnable r) {
+            final Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+            t.setDaemon(true);
+            return t;
+        }
+    }
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java
new file mode 100644
index 0000000..ff5fdff
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupRowEncoderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupRowEncoder.KV;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RocksDBLookupRowEncoderTest extends LocalFileMetadataTestCase {
+    private TableDesc tableDesc;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testEnDeCode() {
+        RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" });
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        KV kv = lookupRowEncoder.encode(row);
+
+        String[] decodeRow = lookupRowEncoder.decode(kv);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithNullValue() {
+        RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY" });
+        String[] row = new String[] { "AD", "42.546245", "1.601554", null };
+        KV kv = lookupRowEncoder.encode(row);
+
+        String[] decodeRow = lookupRowEncoder.decode(kv);
+        assertNull(decodeRow[3]);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithMultiKeys() {
+        RocksDBLookupRowEncoder lookupRowEncoder = new RocksDBLookupRowEncoder(tableDesc, new String[] { "COUNTRY",
+                "NAME" });
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        KV kv = lookupRowEncoder.encode(row);
+
+        String[] decodeRow = lookupRowEncoder.decode(kv);
+        assertArrayEquals(row, decodeRow);
+    }
+
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java
new file mode 100644
index 0000000..55b25b2
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCacheTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.IExtLookupProvider;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+/**
+ */
+public class RocksDBLookupTableCacheTest extends LocalFileMetadataTestCase {
+    private static final String TABLE_COUNTRY = "DEFAULT.TEST_COUNTRY";
+    private static final String MOCK_EXT_LOOKUP = "mock";
+    private TableDesc tableDesc;
+    private KylinConfig kylinConfig;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        System.setProperty("KYLIN_HOME", ".");
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        kylinConfig = getTestConfig();
+        tableDesc = metadataManager.getTableDesc(TABLE_COUNTRY, "default");
+        cleanCache();
+        LookupProviderFactory.registerLookupProvider(MOCK_EXT_LOOKUP, MockedLookupProvider.class.getName());
+    }
+
+    private void cleanCache() {
+        FileUtils.deleteQuietly(new File(kylinConfig.getExtTableSnapshotLocalCachePath()));
+    }
+
+    @After
+    public void tearDown() {
+        cleanupTestMetadata();
+        cleanCache();
+    }
+
+    @Test
+    public void testBuildTableCache() throws Exception {
+        String snapshotID = UUID.randomUUID().toString();
+        ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(snapshotID, 100000);
+        assertEquals(CacheState.AVAILABLE, RocksDBLookupTableCache.getInstance(kylinConfig).getCacheState(snapshotInfo));
+    }
+
+    private ExtTableSnapshotInfo buildSnapshotCache(String snapshotID, int rowCnt) throws Exception {
+        ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo();
+        snapshotInfo.setTableName(TABLE_COUNTRY);
+        snapshotInfo.setUuid(snapshotID);
+        snapshotInfo.setStorageType(MOCK_EXT_LOOKUP);
+        snapshotInfo.setKeyColumns(new String[] { "COUNTRY" });
+        snapshotInfo.setRowCnt(rowCnt);
+        snapshotInfo.setSignature(new TableSignature("/test", rowCnt, System.currentTimeMillis()));
+
+        RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+        cache.buildSnapshotCache(tableDesc, snapshotInfo, getLookupTableWithRandomData(rowCnt));
+
+        while (cache.getCacheState(snapshotInfo) == CacheState.IN_BUILDING) {
+            Thread.sleep(500);
+        }
+        return snapshotInfo;
+    }
+
+    @Test
+    public void testRestoreCacheFromFiles() throws Exception {
+        String snapshotID = UUID.randomUUID().toString();
+        String snapshotCacheBasePath = RocksDBLookupTableCache.getCacheBasePath(kylinConfig) + File.separator
+                + TABLE_COUNTRY + File.separator + snapshotID;
+        String dbPath = snapshotCacheBasePath + File.separator + "db";
+        RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" }, dbPath);
+        builder.build(getLookupTableWithRandomData(10000));
+        String stateFilePath = snapshotCacheBasePath + File.separator + "STATE";
+        Files.write(CacheState.AVAILABLE.name(), new File(stateFilePath), Charsets.UTF_8);
+
+        RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+        ExtTableSnapshotInfo snapshotInfo = new ExtTableSnapshotInfo();
+        snapshotInfo.setTableName(TABLE_COUNTRY);
+        snapshotInfo.setUuid(snapshotID);
+        snapshotInfo.setStorageType(MOCK_EXT_LOOKUP);
+        snapshotInfo.setKeyColumns(new String[] { "COUNTRY" });
+        ILookupTable lookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+        int rowCnt = 0;
+        for (String[] strings : lookupTable) {
+            rowCnt++;
+        }
+        lookupTable.close();
+        assertEquals(10000, rowCnt);
+    }
+
+    @Test
+    public void testEvict() throws Exception {
+        kylinConfig.setProperty("kylin.snapshot.ext.local.cache.max-size-gb", "0.005");
+        int snapshotNum = 10;
+        int snapshotRowCnt = 100000;
+        for (int i = 0; i < snapshotNum; i++) {
+            buildSnapshotCache(UUID.randomUUID().toString(), snapshotRowCnt);
+        }
+        assertTrue(RocksDBLookupTableCache.getInstance(kylinConfig).getTotalCacheSize() < 0.006 * 1024 * 1024 * 1024);
+    }
+
+    @Test
+    public void testCheckCacheState() throws Exception {
+        ExtTableSnapshotInfo snapshotInfo = buildSnapshotCache(UUID.randomUUID().toString(), 1000);
+        RocksDBLookupTableCache cache = RocksDBLookupTableCache.getInstance(kylinConfig);
+        ILookupTable cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+        assertNotNull(cachedLookupTable);
+        cachedLookupTable.close();
+        cache.checkCacheState();
+        String cacheLocalPath = cache.getSnapshotCachePath(snapshotInfo.getTableName(), snapshotInfo.getId());
+        assertFalse(new File(cacheLocalPath).exists());
+        cachedLookupTable = cache.getCachedLookupTable(tableDesc, snapshotInfo, false);
+        assertNull(cachedLookupTable);
+    }
+
+    public static class MockedLookupProvider implements IExtLookupProvider {
+
+        @Override
+        public ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+            return getLookupTableWithRandomData(extTableSnapshot.getRowCnt());
+        }
+
+        @Override
+        public IExtLookupTableCache getLocalCache() {
+            return null;
+        }
+
+        @Override
+        public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+            return null;
+        }
+    }
+
+    private static ILookupTable getLookupTableWithRandomData(final long rowNum) {
+        return new ILookupTable() {
+            Random random = new Random();
+
+            @Override
+            public String[] getRow(Array<String> key) {
+                return new String[0];
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public Iterator<String[]> iterator() {
+                return new Iterator<String[]>() {
+                    private int iterCnt = 0;
+
+                    @Override
+                    public boolean hasNext() {
+                        return iterCnt < rowNum;
+                    }
+
+                    @Override
+                    public String[] next() {
+                        iterCnt++;
+                        return genRandomRow(iterCnt);
+                    }
+
+                    @Override
+                    public void remove() {
+
+                    }
+                };
+            }
+
+            private String[] genRandomRow(int id) {
+                return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()),
+                        String.valueOf(random.nextDouble()), "Andorra" + id };
+            }
+        };
+    }
+
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java
new file mode 100644
index 0000000..d5bbd40
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class RocksDBLookupTableTest extends LocalFileMetadataTestCase {
+
+    private TableDesc tableDesc;
+    private RocksDBLookupTable lookupTable;
+    private Random random;
+    private int sourceRowNum;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        this.random = new Random();
+        tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+        sourceRowNum = 10000;
+        genTestData();
+        lookupTable = new RocksDBLookupTable(tableDesc, new String[] { "COUNTRY" }, "lookup_cache/TEST_COUNTRY");
+    }
+
+    private void genTestData() {
+        removeTestDataIfExists();
+        File file = new File("lookup_cache/TEST_COUNTRY");
+        file.mkdirs();
+        RocksDBLookupBuilder builder = new RocksDBLookupBuilder(tableDesc, new String[] { "COUNTRY" },
+                "lookup_cache/TEST_COUNTRY");
+        long start = System.currentTimeMillis();
+        builder.build(getLookupTableWithRandomData(sourceRowNum));
+        long take = System.currentTimeMillis() - start;
+        System.out.println("take:" + take + " ms to complete build");
+    }
+
+    private void removeTestDataIfExists() {
+        FileUtils.deleteQuietly(new File("lookup_cache"));
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        cleanupTestMetadata();
+        removeTestDataIfExists();
+        lookupTable.close();
+    }
+
+    @Test
+    public void testIterator() throws Exception {
+        System.out.println("start iterator table");
+        long start = System.currentTimeMillis();
+        Iterator<String[]> iter = lookupTable.iterator();
+        int count = 0;
+        while (iter.hasNext()) {
+            iter.next();
+            count++;
+        }
+        long take = System.currentTimeMillis() - start;
+        System.out.println("scan " + count + " rows, take " + take + " ms");
+
+    }
+
+    @Test
+    public void testGet() throws Exception {
+        int getNum = 3000;
+        List<String[]> keys = Lists.newArrayList();
+        for (int i = 0; i < getNum; i++) {
+            String[] keyi = new String[] { "keyyyyy" + random.nextInt(sourceRowNum) };
+            keys.add(keyi);
+        }
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < getNum; i++) {
+            String[] row = lookupTable.getRow(new Array<>(keys.get(i)));
+            if (row == null) {
+                System.out.println("null value for key:" + Arrays.toString(keys.get(i)));
+            }
+        }
+        long take = System.currentTimeMillis() - start;
+        System.out.println("muliti get " + getNum + " rows, take " + take + " ms");
+    }
+
+    private ILookupTable getLookupTableWithRandomData(final int rowNum) {
+        return new ILookupTable() {
+            @Override
+            public String[] getRow(Array<String> key) {
+                return new String[0];
+            }
+
+            @Override
+            public void close() throws IOException {
+
+            }
+
+            @Override
+            public Iterator<String[]> iterator() {
+                return new Iterator<String[]>() {
+                    private int iterCnt = 0;
+
+                    @Override
+                    public boolean hasNext() {
+                        return iterCnt < rowNum;
+                    }
+
+                    @Override
+                    public String[] next() {
+                        iterCnt++;
+                        return genRandomRow(iterCnt);
+                    }
+
+                    @Override
+                    public void remove() {
+
+                    }
+                };
+            }
+        };
+    }
+
+    private String[] genRandomRow(int id) {
+        return new String[] { "keyyyyy" + id, String.valueOf(random.nextDouble()), String.valueOf(random.nextDouble()),
+                "Andorra" + id };
+    }
+
+}
diff --git a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
index 09f5a7a..ddeeafa 100644
--- a/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
+++ b/core-job/src/test/java/org/apache/kylin/job/SelfStopExecutable.java
@@ -36,7 +36,7 @@ public class SelfStopExecutable extends BaseTestExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         doingWork = true;
         try {
-            for (int i = 0; i < 20; i++) {
+            for (int i = 0; i < 60; i++) {
                 sleepOneSecond();
                 
                 if (isDiscarded())
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 8af9e3f..05fc90f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -19,30 +19,26 @@
 package org.apache.kylin.storage.gtrecord;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
-import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.source.IReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +63,7 @@ public class CubeTupleConverter implements ITupleConverter {
 
     public final List<IAdvMeasureFiller> advMeasureFillers;
     public final List<Integer> advMeasureIndexInGTValues;
+    private List<ILookupTable> usedLookupTables;
 
     public final int nSelectedDims;
 
@@ -86,6 +83,7 @@ public class CubeTupleConverter implements ITupleConverter {
 
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
+        usedLookupTables = Lists.newArrayList();
 
         ////////////
 
@@ -178,6 +176,17 @@ public class CubeTupleConverter implements ITupleConverter {
         }
     }
 
+    @Override
+    public void close() throws IOException {
+        for (ILookupTable usedLookupTable : usedLookupTables) {
+            try {
+                usedLookupTable.close();
+            } catch (Exception e) {
+                logger.error("error when close lookup table:" + usedLookupTable);
+            }
+        }
+    }
+
     protected interface IDerivedColumnFiller {
         public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
     }
@@ -204,7 +213,7 @@ public class CubeTupleConverter implements ITupleConverter {
         switch (deriveInfo.type) {
         case LOOKUP:
             return new IDerivedColumnFiller() {
-                LookupStringTable lookupTable = getLookupTable(cubeSeg, deriveInfo.join);
+                ILookupTable lookupTable = getAndAddLookupTable(cubeSeg, deriveInfo.join);
                 int[] derivedColIdx = initDerivedColIdx();
                 Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
 
@@ -263,40 +272,10 @@ public class CubeTupleConverter implements ITupleConverter {
         return -1;
     }
 
-    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-        long ts = System.currentTimeMillis();
-
-        TableMetadataManager metaMgr = TableMetadataManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-        SnapshotManager snapshotMgr = SnapshotManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-
-        String tableName = join.getPKSide().getTableIdentity();
-        String[] pkCols = join.getPrimaryKey();
-        String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
-        if (snapshotResPath == null)
-            throw new IllegalStateException("No snaphot for table '" + tableName + "' found on cube segment" + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
-
-        try {
-            SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotResPath);
-            TableDesc tableDesc = metaMgr.getTableDesc(tableName, cubeSegment.getProject());
-            EnhancedStringLookupTable enhancedStringLookupTable = new EnhancedStringLookupTable(tableDesc, pkCols, snapshot);
-            logger.info("Time to get lookup up table for {} is {} ", join.getPKSide().getTableName(), (System.currentTimeMillis() - ts));
-            return enhancedStringLookupTable;
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
-        }
-    }
-
-    public static class EnhancedStringLookupTable extends LookupStringTable {
-
-        public EnhancedStringLookupTable(TableDesc tableDesc, String[] keyColumns, IReadableTable table) throws IOException {
-            super(tableDesc, keyColumns, table);
-        }
-
-        @Override
-        protected void init() throws IOException {
-            this.data = new HashMap<>();
-            super.init();
-        }
+    public ILookupTable getAndAddLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+        ILookupTable lookupTable = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getLookupTable(cubeSegment, join);
+        usedLookupTables.add(lookupTable);
+        return lookupTable;
     }
 
     private static String toString(Object o) {
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 2f69b76..5e11e91 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -36,7 +37,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.RowKeyColDesc;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.gridtable.StorageLimitLevel;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.bitmap.BitmapMeasureType;
@@ -363,9 +364,16 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
             return compf;
 
         DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
-        LookupStringTable lookup = cubeDesc.getHostInfo(derived).type == CubeDesc.DeriveType.PK_FK ? null
+        ILookupTable lookup = cubeDesc.getHostInfo(derived).type == CubeDesc.DeriveType.PK_FK ? null
                 : getLookupStringTableForDerived(derived, hostInfo);
         Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+        try {
+            if (lookup != null) {
+                lookup.close();
+            }
+        } catch (IOException e) {
+            logger.error("error when close lookup table.", e);
+        }
         TupleFilter translatedFilter = translated.getFirst();
         boolean loosened = translated.getSecond();
         if (loosened) {
@@ -375,7 +383,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
     }
 
     @SuppressWarnings("unchecked")
-    protected LookupStringTable getLookupStringTableForDerived(TblColRef derived, DeriveInfo hostInfo) {
+    protected ILookupTable getLookupStringTableForDerived(TblColRef derived, DeriveInfo hostInfo) {
         CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
         CubeSegment seg = cubeInstance.getLatestReadySegment();
         return cubeMgr.getLookupTable(seg, hostInfo.join);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
index dd48e4d..72e6848 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/ITupleConverter.java
@@ -18,12 +18,13 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import java.io.Closeable;
 import java.util.List;
 
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.tuple.Tuple;
 
-public interface ITupleConverter {
+public interface ITupleConverter extends Closeable {
 
     public List<IAdvMeasureFiller> translateResult(Object[] gtValues, Tuple tuple);
 }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
index 3bac5ec..a6e0bc1 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -202,6 +202,7 @@ public class SegmentCubeTupleIterator implements ITupleIterator {
     protected void close(CubeSegmentScanner scanner) {
         try {
             scanner.close();
+            cubeTupleConverter.close();
         } catch (IOException e) {
             logger.error("Exception when close CubeScanner", e);
         }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
index f4150fe..7fa426f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/DerivedFilterTranslator.java
@@ -27,7 +27,7 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.kv.RowKeyColumnOrder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.CubeDesc.DeriveType;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -50,7 +50,7 @@ public class DerivedFilterTranslator {
 
     private static final Logger logger = LoggerFactory.getLogger(DerivedFilterTranslator.class);
 
-    public static Pair<TupleFilter, Boolean> translate(LookupStringTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
+    public static Pair<TupleFilter, Boolean> translate(ILookupTable lookup, DeriveInfo hostInfo, CompareTupleFilter compf) {
 
         TblColRef derivedCol = compf.getColumn();
         TblColRef[] hostCols = hostInfo.columns;
@@ -76,7 +76,7 @@ public class DerivedFilterTranslator {
 
         Set<Array<String>> satisfyingHostRecords = Sets.newHashSet();
         SingleColumnTuple tuple = new SingleColumnTuple(derivedCol);
-        for (String[] row : lookup.getAllRows()) {
+        for (String[] row : lookup) {
             tuple.value = row[di];
             if (compf.evaluate(tuple, FilterCodeSystemFactory.getFilterCodeSystem(derivedCol.getColumnDesc().getType()))) {
                 collect(row, pi, satisfyingHostRecords);
diff --git a/pom.xml b/pom.xml
index a8e8312..a5cc1fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
         <tomcat.version>7.0.85</tomcat.version>
         <t-digest.version>3.1</t-digest.version>
         <freemarker.version>2.3.23</freemarker.version>
+        <rocksdb.version>5.9.2</rocksdb.version>
         <!--metric-->
         <dropwizard.version>3.1.2</dropwizard.version>
         <!-- REST Service, ref https://github.com/spring-projects/spring-boot/blob/v1.3.8.RELEASE/spring-boot-dependencies/pom.xml -->
@@ -655,6 +656,11 @@
                 <artifactId>freemarker</artifactId>
                 <version>${freemarker.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.rocksdb</groupId>
+                <artifactId>rocksdbjni</artifactId>
+                <version>${rocksdb.version}</version>
+            </dependency>
 
             <!-- Logging -->
             <dependency>
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
index 1500def..c3407bc 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/LookupTableEnumerator.java
@@ -18,8 +18,8 @@
 
 package org.apache.kylin.query.enumerator;
 
+import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
@@ -27,19 +27,22 @@ import org.apache.calcite.linq4j.Enumerator;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ILookupTable;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.schema.OLAPTable;
 import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
 public class LookupTableEnumerator implements Enumerator<Object[]> {
+    private final static Logger logger = LoggerFactory.getLogger(LookupTableEnumerator.class);
 
-    private final Collection<String[]> allRows;
+    private final ILookupTable lookupTable;
     private final List<ColumnDesc> colDescs;
     private final Object[] current;
     private Iterator<String[]> iterator;
@@ -67,8 +70,7 @@ public class LookupTableEnumerator implements Enumerator<Object[]> {
             throw new IllegalStateException("No dimension with derived columns found for lookup table " + lookupTableName + ", cube desc " + cube.getDescriptor());
 
         CubeManager cubeMgr = CubeManager.getInstance(cube.getConfig());
-        LookupStringTable table = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
-        this.allRows = table.getAllRows();
+        this.lookupTable = cubeMgr.getLookupTable(cube.getLatestReadySegment(), dim.getJoin());
 
         OLAPTable olapTable = (OLAPTable) olapContext.firstTableScan.getOlapTable();
         this.colDescs = olapTable.getSourceColumns();
@@ -103,11 +105,16 @@ public class LookupTableEnumerator implements Enumerator<Object[]> {
 
     @Override
     public void reset() {
-        this.iterator = allRows.iterator();
+        this.iterator = lookupTable.iterator();
     }
 
     @Override
     public void close() {
+        try {
+            lookupTable.close();
+        } catch (IOException e) {
+            logger.error("error when close lookup table", e);
+        }
     }
 
 }

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.