You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/21 00:27:35 UTC

[kylin] 03/05: KYLIN-3375 Some improvements for lookup table - build change

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch KYLIN-3221
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 45f998b368ab46f8fda05875b5945beccfd57869
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu May 10 13:00:10 2018 +0800

    KYLIN-3375 Some improvements for lookup table - build change
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  13 +
 .../apache/kylin/common/restclient/RestClient.java |  21 ++
 .../java/org/apache/kylin/cube/CubeManager.java    | 118 ++++++--
 .../java/org/apache/kylin/cube/CubeUpdate.java     |   9 +
 .../kylin/cube/cli/DictionaryGeneratorCLI.java     |   7 +-
 .../dict/lookup/AbstractLookupRowEncoder.java      | 122 +++++++++
 .../kylin/dict/lookup/IExtLookupProvider.java      |  29 +-
 .../kylin/dict/lookup/IExtLookupTableCache.java    |  28 +-
 .../org/apache/kylin/dict/lookup/ILookupTable.java |  27 +-
 .../kylin/dict/lookup/LookupProviderFactory.java   | 112 ++++++++
 .../kylin/job/constant/ExecutableConstants.java    |   5 +
 .../kylin/job/execution/AbstractExecutable.java    |   6 +-
 .../job/execution/DefaultChainedExecutable.java    |  26 ++
 .../realization/IRealizationConstants.java         |   2 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  17 ++
 .../java/org/apache/kylin/engine/mr/CubingJob.java |  26 --
 .../kylin/engine/mr/ILookupMaterializer.java       |  25 +-
 .../java/org/apache/kylin/engine/mr/MRUtil.java    |   5 +
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |   4 +
 .../kylin/engine/mr/common/BatchConstants.java     |  11 +
 .../mr/steps/UpdateCubeInfoAfterBuildStep.java     |  23 ++
 .../localmeta/cube_desc/ci_left_join_cube.json     |   8 +
 .../apache/kylin/rest/job/MetadataCleanupJob.java  |   3 +-
 .../kylin/rest/job/StorageCleanJobHbaseUtil.java   |  37 +++
 server/src/main/resources/kylinSecurity.xml        |   2 +
 storage-hbase/pom.xml                              |   6 +
 .../storage/hbase/lookup/HBaseLookupMRSteps.java   | 175 ++++++++++++
 .../hbase/lookup/HBaseLookupMaterializer.java      |  40 +++
 .../storage/hbase/lookup/HBaseLookupProvider.java  |  58 ++++
 .../hbase/lookup/HBaseLookupRowEncoder.java        | 134 +++++++++
 .../storage/hbase/lookup/HBaseLookupTable.java     | 130 +++++++++
 .../hbase/lookup/KVSortReducerWithDupKeyCheck.java |  62 +++++
 .../hbase/lookup/LookupTableHFilesBulkLoadJob.java | 106 ++++++++
 .../hbase/lookup/LookupTableToHFileJob.java        | 302 +++++++++++++++++++++
 .../hbase/lookup/LookupTableToHFileMapper.java     | 109 ++++++++
 .../UpdateSnapshotCacheForQueryServersStep.java    | 106 ++++++++
 .../hbase/lookup/HBaseLookupRowEncoderTest.java    |  98 +++++++
 .../hbase/lookup/LookupTableToHFileJobTest.java    | 110 ++++++++
 ...UpdateSnapshotCacheForQueryServersStepTest.java |  55 ++++
 39 files changed, 2052 insertions(+), 125 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7b24864..5f8172b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -414,6 +414,19 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.snapshot.max-mb", "300"));
     }
 
+    public int getExtTableSnapshotShardingMB() {
+        return Integer.parseInt(getOptional("kylin.snapshot.ext.shard-mb", "500"));
+    }
+
+    public String getExtTableSnapshotLocalCachePath() {
+        return getOptional("kylin.snapshot.ext.local.cache.path", "lookup_cache");
+    }
+
+    public double getExtTableSnapshotLocalCacheMaxSizeGB() {
+        return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
+    }
+
+
     // ============================================================================
     // CUBE
     // ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 93f5e19..11284f6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -303,6 +303,27 @@ public class RestClient {
         }
     }
 
+    public void buildLookupSnapshotCache(String project, String lookupTableName, String snapshotID) throws IOException {
+        String url = baseUrl + "/tables/" + project + "/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache";
+        HttpPut put = new HttpPut(url);
+        HttpResponse response = client.execute(put);
+        getContent(response);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+        }
+    }
+
+    public String getLookupSnapshotCacheState(String lookupTableName, String snapshotID) throws IOException {
+        String url = baseUrl + "/tables/" + lookupTableName + "/" + snapshotID + "/snapshotLocalCache/state";
+        HttpGet get = new HttpGet(url);
+        HttpResponse response = client.execute(get);
+        String content = getContent(response);
+        if (response.getStatusLine().getStatusCode() != 200) {
+            throw new IOException("Invalid response " + response.getStatusLine().getStatusCode() + " with url " + url + "\n");
+        }
+        return content;
+    }
+
     private HashMap dealResponse(HttpResponse response) throws IOException {
         if (response.getStatusLine().getStatusCode() != 200) {
             throw new IOException("Invalid response " + response.getStatusLine().getStatusCode());
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index fc2ad3d..a6022aa 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -44,9 +45,13 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.TableMetadataManager;
@@ -290,6 +295,18 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
+    public CubeInstance updateCubeLookupSnapshot(CubeInstance cube, String lookupTableName, String newSnapshotResPath) throws IOException {
+        try (AutoLock lock = cubeMapLock.lockForWrite()) {
+            cube = cube.latestCopyForWrite();
+
+            CubeUpdate update = new CubeUpdate(cube);
+            Map<String, String> map = Maps.newHashMap();
+            map.put(lookupTableName, newSnapshotResPath);
+            update.setUpdateTableSnapshotPath(map);
+            return updateCube(update);
+        }
+    }
+
     private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException {
         if (update == null || update.getCubeInstance() == null)
             throw new IllegalStateException();
@@ -353,6 +370,12 @@ public class CubeManager implements IRealizationProvider {
             cube.setCuboidsRecommend(update.getCuboidsRecommend());
         }
 
+        if (update.getUpdateTableSnapshotPath() != null) {
+            for(Map.Entry<String, String> lookupSnapshotPathEntry : update.getUpdateTableSnapshotPath().entrySet()) {
+                cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), lookupSnapshotPathEntry.getValue());
+            }
+        }
+
         try {
             cube = crud.save(cube);
         } catch (WriteConflictException ise) {
@@ -435,6 +458,55 @@ public class CubeManager implements IRealizationProvider {
         }
     }
 
+    public ILookupTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
+        String tableName = join.getPKSide().getTableIdentity();
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(tableName);
+        if (snapshotTableDesc == null || !snapshotTableDesc.isExtSnapshotTable()) {
+            return getInMemLookupTable(cubeSegment, join, snapshotTableDesc);
+        } else {
+            return getExtLookupTable(cubeSegment, tableName, snapshotTableDesc);
+        }
+    }
+
+    private ILookupTable getInMemLookupTable(CubeSegment cubeSegment, JoinDesc join, SnapshotTableDesc snapshotTableDesc) {
+        String tableName = join.getPKSide().getTableIdentity();
+        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+        String[] pkCols = join.getPrimaryKey();
+
+        try {
+            SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
+            TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+            return LookupProviderFactory.getInMemLookupTable(tableDesc, pkCols, snapshot);
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                    "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+        }
+    }
+
+    private ILookupTable getExtLookupTable(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
+        String snapshotResPath = getSnapshotResPath(cubeSegment, tableName, snapshotTableDesc);
+
+        ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot(
+                snapshotResPath);
+        TableDesc tableDesc = getMetadataManager().getTableDesc(tableName, cubeSegment.getProject());
+        return LookupProviderFactory.getExtLookupTable(tableDesc, extTableSnapshot);
+    }
+
+    private String getSnapshotResPath(CubeSegment cubeSegment, String tableName, SnapshotTableDesc snapshotTableDesc) {
+        String snapshotResPath;
+        if (snapshotTableDesc == null || !snapshotTableDesc.isGlobal()) {
+            snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
+        } else {
+            snapshotResPath = cubeSegment.getCubeInstance().getSnapshotResPath(tableName);
+        }
+        if (snapshotResPath == null) {
+            throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
+                    + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+        }
+        return snapshotResPath;
+    }
+
     @VisibleForTesting
     /*private*/ String generateStorageLocation() {
         String namePrefix = config.getHBaseTableNamePrefix();
@@ -972,8 +1044,8 @@ public class CubeManager implements IRealizationProvider {
         return dictAssist.buildSnapshotTable(cubeSeg, lookupTable);
     }
 
-    public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-        return dictAssist.getLookupTable(cubeSegment, join);
+    private TableMetadataManager getMetadataManager() {
+        return TableMetadataManager.getInstance(config);
     }
 
     private class DictionaryAssist {
@@ -1055,31 +1127,25 @@ public class CubeManager implements IRealizationProvider {
             IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
             SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
 
-            segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
-            CubeUpdate update = new CubeUpdate(cubeCopy);
-            update.setToUpdateSegs(segCopy);
-            updateCube(update);
-
-            return snapshot;
-        }
-
-        public LookupStringTable getLookupTable(CubeSegment cubeSegment, JoinDesc join) {
-
-            String tableName = join.getPKSide().getTableIdentity();
-            String[] pkCols = join.getPrimaryKey();
-            String snapshotResPath = cubeSegment.getSnapshotResPath(tableName);
-            if (snapshotResPath == null)
-                throw new IllegalStateException("No snapshot for table '" + tableName + "' found on cube segment"
-                        + cubeSegment.getCubeInstance().getName() + "/" + cubeSegment);
+            CubeDesc cubeDesc = cubeSeg.getCubeDesc();
+            if (!cubeDesc.isGlobalSnapshotTable(lookupTable)) {
+                segCopy.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
+                CubeUpdate update = new CubeUpdate(cubeCopy);
+                update.setToUpdateSegs(segCopy);
+                updateCube(update);
+
+                // Update the input cubeSeg after the resource store updated
+                cubeSeg.putSnapshotResPath(lookupTable, segCopy.getSnapshotResPath(lookupTable));
+            } else {
+                CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+                Map<String, String> map = Maps.newHashMap();
+                map.put(lookupTable, snapshot.getResourcePath());
+                cubeUpdate.setUpdateTableSnapshotPath(map);
+                updateCube(cubeUpdate);
 
-            try {
-                SnapshotTable snapshot = getSnapshotManager().getSnapshotTable(snapshotResPath);
-                TableDesc tableDesc = getTableManager().getTableDesc(tableName, cubeSegment.getProject());
-                return new LookupStringTable(tableDesc, pkCols, snapshot);
-            } catch (IOException e) {
-                throw new IllegalStateException(
-                        "Failed to load lookup table " + tableName + " from snapshot " + snapshotResPath, e);
+                cubeSeg.getCubeInstance().putSnapshotResPath(lookupTable, snapshot.getResourcePath());
             }
+            return snapshot;
         }
     }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
index 378d082..62b46a9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -36,6 +36,7 @@ public class CubeUpdate {
     private int cost = -1;
     private Map<Long, Long> cuboids = null;
     private Set<Long> cuboidsRecommend = null;
+    private Map<String, String> updateTableSnapshotPath = null;
 
     public CubeUpdate(CubeInstance cubeInstance) {
         setCubeInstance(cubeInstance);
@@ -124,4 +125,12 @@ public class CubeUpdate {
         this.cuboidsRecommend = cuboidsRecommend;
         return this;
     }
+
+    public Map<String, String> getUpdateTableSnapshotPath() {
+        return updateTableSnapshotPath;
+    }
+
+    public void setUpdateTableSnapshotPath(Map<String, String> updateTableSnapshotPath) {
+        this.updateTableSnapshotPath = updateTableSnapshotPath;
+    }
 }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 36c06b7..2a24370 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -77,8 +77,11 @@ public class DictionaryGeneratorCLI {
         for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
             TableRef table = dim.getTableRef();
             if (cubeSeg.getModel().isLookupTable(table)) {
-                toSnapshot.add(table.getTableIdentity());
-                toCheckLookup.add(table);
+                // only the snapshot desc is not ext type, need to take snapshot
+                if (!cubeSeg.getCubeDesc().isExtSnapshotTable(table.getTableIdentity())) {
+                    toSnapshot.add(table.getTableIdentity());
+                    toCheckLookup.add(table);
+                }
             }
         }
 
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java
new file mode 100644
index 0000000..5efe129
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/AbstractLookupRowEncoder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.metadata.model.TableDesc;
+
+/**
+ * Abstract encoder/decoder
+ * 
+ */
+abstract public class AbstractLookupRowEncoder<R> {
+    protected ByteBuffer keyByteBuffer = ByteBuffer.allocate(1024 * 1024);
+
+    protected int columnsNum;
+    protected int[] keyIndexes;
+    protected int[] valueIndexes;
+
+    public AbstractLookupRowEncoder(TableDesc tableDesc, String[] keyColumns) {
+        this.columnsNum = tableDesc.getColumns().length;
+        this.keyIndexes = new int[keyColumns.length];
+        this.valueIndexes = new int[columnsNum - keyColumns.length];
+        int keyIdx = 0;
+        int valIdx = 0;
+        for (int i = 0; i < columnsNum; i++) {
+            boolean isKeyColumn = false;
+            for (String keyColumn : keyColumns) {
+                if (keyColumn.equals(tableDesc.getColumns()[i].getName())) {
+                    isKeyColumn = true;
+                    break;
+                }
+            }
+            if (isKeyColumn) {
+                keyIndexes[keyIdx] = i;
+                keyIdx++;
+            } else {
+                valueIndexes[valIdx] = i;
+                valIdx++;
+            }
+        }
+    }
+
+    abstract public R encode(String[] row);
+
+    abstract public String[] decode(R result);
+
+    public String[] getKeyData(String[] row) {
+        return extractColValues(row, keyIndexes);
+    }
+
+    public String[] getValueData(String[] row) {
+        return extractColValues(row, valueIndexes);
+    }
+
+    public byte[] encodeStringsWithLenPfx(String[] keys, boolean allowNull) {
+        keyByteBuffer.clear();
+        for (String key : keys) {
+            if (key == null && !allowNull) {
+                throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys));
+            }
+            byte[] byteKey = toBytes(key);
+            keyByteBuffer.putShort((short) byteKey.length);
+            keyByteBuffer.put(byteKey);
+        }
+        byte[] result = new byte[keyByteBuffer.position()];
+        System.arraycopy(keyByteBuffer.array(), 0, result, 0, keyByteBuffer.position());
+        return result;
+    }
+
+    protected void decodeFromLenPfxBytes(byte[] rowKey, int[] valueIdx, String[] result) {
+        ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey);
+        for (int i = 0; i < valueIdx.length; i++) {
+            short keyLen = byteBuffer.getShort();
+            byte[] keyBytes = new byte[keyLen];
+            byteBuffer.get(keyBytes);
+            result[valueIdx[i]] = fromBytes(keyBytes);
+        }
+    }
+
+    protected String[] extractColValues(String[] row, int[] indexes) {
+        String[] result = new String[indexes.length];
+        int i = 0;
+        for (int idx : indexes) {
+            result[i++] = row[idx];
+        }
+        return result;
+    }
+
+    protected byte[] toBytes(String str) {
+        if (str == null) {
+            return new byte[] { DimensionEncoding.NULL };
+        }
+        return Bytes.toBytes(str);
+    }
+
+    protected String fromBytes(byte[] bytes) {
+        if (DimensionEncoding.isNull(bytes, 0, bytes.length)) {
+            return null;
+        }
+        return Bytes.toString(bytes);
+    }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java
similarity index 57%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
copy to core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java
index 2e4f400..a09a439 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupProvider.java
@@ -16,26 +16,21 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.metadata.realization;
+package org.apache.kylin.dict.lookup;
 
-/**
- */
-public class IRealizationConstants {
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface IExtLookupProvider {
+    ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot);
 
     /**
-     * For each cube htable, we leverage htable's metadata to keep track of
-     * which kylin server(represented by its kylin_metadata prefix) owns this htable
+     * @return the local cache if the provider has, return null if no local cache exist
      */
-    public final static String HTableTag = "KYLIN_HOST";
-
-    public final static String HTableOwner = "OWNER";
-
-    public final static String HTableUser = "USER";
-
-    public final static String HTableCreationTime = "CREATION_TIME";
-
-    public final static String HTableSegmentTag = "SEGMENT";
-
-    public final static String HTableGitTag = "GIT_COMMIT";
+    IExtLookupTableCache getLocalCache();
 
+    /**
+     * Return an adaptor that implements specified interface as requested by the build engine.
+     * The ILookupMaterializer in particular, is required by the MR build engine.
+     */
+    <I> I adaptToBuildEngine(Class<I> engineInterface);
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java
similarity index 51%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
copy to core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java
index 2e4f400..f473059 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/IExtLookupTableCache.java
@@ -16,26 +16,24 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.metadata.realization;
+package org.apache.kylin.dict.lookup;
 
-/**
- */
-public class IRealizationConstants {
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface IExtLookupTableCache {
+    enum CacheState {NONE, IN_BUILDING, AVAILABLE}
 
     /**
-     * For each cube htable, we leverage htable's metadata to keep track of
-     * which kylin server(represented by its kylin_metadata prefix) owns this htable
+     * @param tableDesc
+     * @param extTableSnapshotInfo
+     * @param buildIfNotExist if true, when the cached lookup table not exist, build it.
+     * @return null if no cached lookup table exist
      */
-    public final static String HTableTag = "KYLIN_HOST";
-
-    public final static String HTableOwner = "OWNER";
-
-    public final static String HTableUser = "USER";
-
-    public final static String HTableCreationTime = "CREATION_TIME";
+    ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist);
 
-    public final static String HTableSegmentTag = "SEGMENT";
+    void buildSnapshotCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, ILookupTable sourceTable);
 
-    public final static String HTableGitTag = "GIT_COMMIT";
+    void removeSnapshotCache(ExtTableSnapshotInfo extTableSnapshotInfo);
 
+    CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo);
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
similarity index 57%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
copy to core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
index 2e4f400..dccb7c4 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/ILookupTable.java
@@ -16,26 +16,17 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.metadata.realization;
+package org.apache.kylin.dict.lookup;
 
-/**
- */
-public class IRealizationConstants {
+import org.apache.kylin.common.util.Array;
 
+import java.io.Closeable;
+
+public interface ILookupTable extends Iterable<String[]>, Closeable {
     /**
-     * For each cube htable, we leverage htable's metadata to keep track of
-     * which kylin server(represented by its kylin_metadata prefix) owns this htable
+     * get row according the key
+     * @param key
+     * @return
      */
-    public final static String HTableTag = "KYLIN_HOST";
-
-    public final static String HTableOwner = "OWNER";
-
-    public final static String HTableUser = "USER";
-
-    public final static String HTableCreationTime = "CREATION_TIME";
-
-    public final static String HTableSegmentTag = "SEGMENT";
-
-    public final static String HTableGitTag = "GIT_COMMIT";
-
+    String[] getRow(Array<String> key);
 }
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java
new file mode 100644
index 0000000..64ccef5
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupProviderFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+public class LookupProviderFactory {
+    private static final Logger logger = LoggerFactory.getLogger(LookupProviderFactory.class);
+    private static Map<String, String> lookupProviderImplClassMap = Maps.newConcurrentMap();
+
+    static {
+        registerLookupProvider(ExtTableSnapshotInfo.STORAGE_TYPE_HBASE,
+                "org.apache.kylin.storage.hbase.lookup.HBaseLookupProvider");
+    }
+
+    public static void registerLookupProvider(String storageType, String implClassName) {
+        lookupProviderImplClassMap.put(storageType, implClassName);
+    }
+
+    public static IExtLookupProvider getExtLookupProvider(String storageType) {
+        String className = lookupProviderImplClassMap.get(storageType);
+        if (className == null) {
+            throw new IllegalStateException("no implementation class found for storage type:" + storageType);
+        }
+        try {
+            Class clazz = Class.forName(className);
+            Constructor constructor = clazz.getConstructor();
+            return (IExtLookupProvider) constructor.newInstance();
+        } catch (ReflectiveOperationException e) {
+            throw new IllegalStateException("the lookup implementation class is invalid for storage type:"
+                    + storageType, e);
+        }
+    }
+
+    public static ILookupTable getInMemLookupTable(TableDesc tableDesc, String[] pkCols, IReadableTable readableTable)
+            throws IOException {
+        return new LookupStringTable(tableDesc, pkCols, readableTable);
+    }
+
+    public static ILookupTable getExtLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        IExtLookupTableCache extLookupTableCache = getExtLookupProvider(extTableSnapshot.getStorageType()).getLocalCache();
+        if (extLookupTableCache == null) {
+            return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot);
+        }
+        ILookupTable cachedLookupTable = extLookupTableCache.getCachedLookupTable(tableDesc, extTableSnapshot, true);
+        if (cachedLookupTable != null) {
+            logger.info("try to use cached lookup table:{}", extTableSnapshot.getResourcePath());
+            return cachedLookupTable;
+        }
+        logger.info("use ext lookup table:{}", extTableSnapshot.getResourcePath());
+        return getExtLookupTableWithoutCache(tableDesc, extTableSnapshot);
+    }
+
+    public static ILookupTable getExtLookupTableWithoutCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        IExtLookupProvider provider = getExtLookupProvider(extTableSnapshot.getStorageType());
+        return provider.getLookupTable(tableDesc, extTableSnapshot);
+    }
+
+    public static <T> T createEngineAdapter(String lookupStorageType, Class<T> engineInterface) {
+        IExtLookupProvider provider = getExtLookupProvider(lookupStorageType);
+        return provider.adaptToBuildEngine(engineInterface);
+    }
+
+    public static void rebuildLocalCache(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo) {
+        IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache();
+        if (tablesCache != null) {
+            tablesCache.buildSnapshotCache(tableDesc, extTableSnapshotInfo, getExtLookupTableWithoutCache(tableDesc, extTableSnapshotInfo));
+        }
+    }
+
+    public static void removeLocalCache(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache();
+        if (tablesCache != null) {
+            tablesCache.removeSnapshotCache(extTableSnapshotInfo);
+        }
+    }
+
+    public static CacheState getCacheState(ExtTableSnapshotInfo extTableSnapshotInfo) {
+        IExtLookupTableCache tablesCache = getExtLookupProvider(extTableSnapshotInfo.getStorageType()).getLocalCache();
+        if (tablesCache != null) {
+            return tablesCache.getCacheState(extTableSnapshotInfo);
+        }
+        return CacheState.NONE;
+    }
+
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index b9a3651..7b3c5a3 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -62,4 +62,9 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_GARBAGE_COLLECTION_HBASE = "Garbage Collection on HBase";
     public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on HDFS";
     public static final String STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE = "Redistribute Flat Hive Table";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_CONVERT_HFILE = "Convert Lookup Table to HFile";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_BULK_LOAD = "Load HFile to HBase Table";
+    public static final String STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE = "Update Lookup Snapshot Cache to Query Engine";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE = "Take Snapshot to Metadata Store";
+    public static final String STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE = "Update Cube Info";
 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index dbe11c2..1a534e1 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -399,10 +399,14 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         }
     }
 
-    protected final void addExtraInfo(String key, String value) {
+    public final void addExtraInfo(String key, String value) {
         getManager().addJobInfo(getId(), key, value);
     }
 
+    public final String getExtraInfo(String key) {
+        return getExtraInfo().get(key);
+    }
+
     protected final Map<String, String> getExtraInfo() {
         return getOutput().getExtra();
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 404db54..2297be7 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -18,6 +18,8 @@
 
 package org.apache.kylin.job.execution;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -203,4 +205,28 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
     public int getDefaultPriority() {
         return DEFAULT_PRIORITY;
     }
+
+    public String findExtraInfo(String key, String dft) {
+        return findExtraInfo(key, dft, false);
+    }
+
+    public String findExtraInfoBackward(String key, String dft) {
+        return findExtraInfo(key, dft, true);
+    }
+
+    private String findExtraInfo(String key, String dft, boolean backward) {
+        ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
+
+        if (backward) {
+            Collections.reverse(tasks);
+        }
+
+        for (AbstractExecutable child : tasks) {
+            Output output = getManager().getOutput(child.getId());
+            String value = output.getExtra().get(key);
+            if (value != null)
+                return value;
+        }
+        return dft;
+    }
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
index 2e4f400..ad311d9 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
@@ -22,6 +22,8 @@ package org.apache.kylin.metadata.realization;
  */
 public class IRealizationConstants {
 
+    public final static String LookupHbaseStorageLocationPrefix = "LOOKUP_";
+
     /**
      * For each cube htable, we leverage htable's metadata to keep track of
      * which kylin server(represented by its kylin_metadata prefix) owns this htable
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index faac724..a840bf7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -69,6 +70,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         result.addTask(createBuildDictionaryStep(jobId));
         result.addTask(createSaveStatisticsStep(jobId));
+
+        // add materialize lookup tables if needed
+        addMaterializeLookupTableSteps(result);
+
         outputSide.addStepPhase2_BuildDictionary(result);
 
         // Phase 3: Build Cube
@@ -97,6 +102,18 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return true;
     }
 
+    private void addMaterializeLookupTableSteps(final CubingJob result) {
+        CubeDesc cubeDesc = seg.getCubeDesc();
+        List<String> allSnapshotTypes = cubeDesc.getAllExtLookupSnapshotTypes();
+        if (allSnapshotTypes.isEmpty()) {
+            return;
+        }
+        for (String snapshotType : allSnapshotTypes) {
+            ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotType);
+            materializer.materializeLookupTablesForCube(result, seg.getCubeInstance());
+        }
+    }
+
     protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
         // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
         final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index c9ed359..7f7191d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -19,8 +19,6 @@
 package org.apache.kylin.engine.mr;
 
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -350,28 +348,4 @@ public class CubingJob extends DefaultChainedExecutable {
         return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
     }
 
-    public String findExtraInfo(String key, String dft) {
-        return findExtraInfo(key, dft, false);
-    }
-
-    public String findExtraInfoBackward(String key, String dft) {
-        return findExtraInfo(key, dft, true);
-    }
-
-    private String findExtraInfo(String key, String dft, boolean backward) {
-        ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
-
-        if (backward) {
-            Collections.reverse(tasks);
-        }
-
-        for (AbstractExecutable child : tasks) {
-            Output output = getManager().getOutput(child.getId());
-            String value = output.getExtra().get(key);
-            if (value != null)
-                return value;
-        }
-        return dft;
-    }
-
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java
similarity index 56%
copy from core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java
index 2e4f400..f103da2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealizationConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ILookupMaterializer.java
@@ -16,26 +16,13 @@
  * limitations under the License.
 */
 
-package org.apache.kylin.metadata.realization;
+package org.apache.kylin.engine.mr;
 
-/**
- */
-public class IRealizationConstants {
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
-    /**
-     * For each cube htable, we leverage htable's metadata to keep track of
-     * which kylin server(represented by its kylin_metadata prefix) owns this htable
-     */
-    public final static String HTableTag = "KYLIN_HOST";
-
-    public final static String HTableOwner = "OWNER";
-
-    public final static String HTableUser = "USER";
-
-    public final static String HTableCreationTime = "CREATION_TIME";
-
-    public final static String HTableSegmentTag = "SEGMENT";
-
-    public final static String HTableGitTag = "GIT_COMMIT";
+public interface ILookupMaterializer {
+    void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName);
 
+    void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube);
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index b98608f..85a425c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
@@ -69,6 +70,10 @@ public class MRUtil {
     public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
         return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
     }
+
+    public static ILookupMaterializer getExtLookupMaterializer(String lookupStorageType) {
+        return LookupProviderFactory.createEngineAdapter(lookupStorageType, ILookupMaterializer.class);
+    }
     
     // use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
     // Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 7b25354..fa3c22e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -120,6 +120,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Option OPTION_NEED_UPDATE_BASE_CUBOID_SHARD = OptionBuilder
             .withArgName(BatchConstants.ARG_UPDATE_SHARD).hasArg().isRequired(false)
             .withDescription("If need to update base cuboid shard").create(BatchConstants.ARG_UPDATE_SHARD);
+    protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_TABLE_NAME).hasArg().isRequired(true).withDescription("Table name. For exmaple, default.table1").create(BatchConstants.ARG_TABLE_NAME);
+    protected static final Option OPTION_LOOKUP_SNAPSHOT_ID = OptionBuilder.withArgName(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID).hasArg()
+            .isRequired(true).withDescription("Lookup table snapshotID")
+            .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 64163ad..36f2566 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -72,6 +72,8 @@ public interface BatchConstants {
 
     String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum";
 
+    String CFG_SHARD_NUM = "shard.num";
+
     /**
      * command line ARGuments
      */
@@ -95,6 +97,8 @@ public interface BatchConstants {
     String ARG_LEVEL = "level";
     String ARG_CONF = "conf";
     String ARG_DICT_PATH = "dictPath";
+    String ARG_TABLE_NAME = "tableName";
+    String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
 
     /**
      * logger and counter
@@ -106,4 +110,11 @@ public interface BatchConstants {
      * dictionaries builder class
      */
     String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
+
+    /**
+     * the prefix of ext lookup table snapshot resource path that stored in the build context
+     */
+    String LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX = "lookup.ext.snapshot.res.path.";
+
+    String LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX = "lookup.ext.snapshot.src.record.cnt.";
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
index d085a77..ad1245c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.List;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -30,6 +31,7 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -71,6 +73,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         segment.setInputRecordsSize(sourceSizeBytes);
 
         try {
+            saveExtSnapshotIfNeeded(cubeManager, cubingJob, cube, segment);
             if (segment.isOffsetCube()) {
                 updateTimeRange(segment);
             }
@@ -83,6 +86,26 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
         }
     }
 
+    private void saveExtSnapshotIfNeeded(CubeManager cubeManager, CubingJob cubingJob, CubeInstance cube, CubeSegment segment) throws IOException {
+        List<SnapshotTableDesc> snapshotTableDescList = cube.getDescriptor().getSnapshotTableDescList();
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescList) {
+            String tableName = snapshotTableDesc.getTableName();
+            if (snapshotTableDesc.isExtSnapshotTable()) {
+                String contextKey = BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + tableName;
+                String newSnapshotResPath = cubingJob.getExtraInfo(contextKey);
+                if (newSnapshotResPath == null) {
+                    continue;
+                }
+
+                if (snapshotTableDesc.isGlobal()) {
+                    cubeManager.updateCubeLookupSnapshot(cube, tableName, newSnapshotResPath);
+                } else {
+                    segment.putSnapshotResPath(tableName, newSnapshotResPath);
+                }
+            }
+        }
+    }
+
     private void updateTimeRange(CubeSegment segment) throws IOException {
         final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
 
diff --git a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
index f1a42b1..e42c522 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
@@ -582,5 +582,13 @@
   "override_kylin_properties": {
     "kylin.cube.algorithm": "INMEM"
   },
+  "snapshot_table_desc_list": [
+    {
+      "table_name": "DEFAULT.TEST_CATEGORY_GROUPINGS",
+      "storage_type": "hbase",
+      "local_cache_enable": true,
+      "global": true
+    }
+  ],
   "partition_date_start": 0
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
index 1aec429..5ee5c7a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
@@ -73,7 +73,7 @@ public class MetadataCleanupJob {
 
         // two level resources, snapshot tables and cube statistics
         for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT,
-                ResourceStore.CUBE_STATISTICS_ROOT }) {
+                ResourceStore.CUBE_STATISTICS_ROOT, ResourceStore.EXT_SNAPSHOT_RESOURCE_ROOT}) {
             for (String dir : noNull(store.listResources(resourceRoot))) {
                 for (String res : noNull(store.listResources(dir))) {
                     if (store.getResourceTimestamp(res) < newResourceTimeCut)
@@ -97,6 +97,7 @@ public class MetadataCleanupJob {
         // exclude resources in use
         Set<String> activeResources = Sets.newHashSet();
         for (CubeInstance cube : cubeManager.listAllCubes()) {
+            activeResources.addAll(cube.getSnapshots().values());
             for (CubeSegment segment : cube.getSegments()) {
                 activeResources.addAll(segment.getSnapshotPaths());
                 activeResources.addAll(segment.getDictionaryPaths());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
index 126c598..4c8c426 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanJobHbaseUtil.java
@@ -21,6 +21,7 @@ package org.apache.kylin.rest.job;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -28,6 +29,7 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -35,6 +37,8 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,10 +64,17 @@ public class StorageCleanJobHbaseUtil {
                 ? config.getHBaseTableNamePrefix()
                 : (namespace + ":" + config.getHBaseTableNamePrefix());
         HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
+
         List<String> allTablesNeedToBeDropped = new ArrayList<String>();
+
+        boolean hasExtLookupTable = false;
         for (HTableDescriptor desc : tableDescriptors) {
             String host = desc.getValue(IRealizationConstants.HTableTag);
             if (config.getMetadataUrlPrefix().equalsIgnoreCase(host)) {
+                // check if there are hbase lookup table
+                if (desc.getTableName().getNameAsString().contains(IRealizationConstants.LookupHbaseStorageLocationPrefix)) {
+                    hasExtLookupTable = true;
+                }
                 //only take care htables that belongs to self, and created more than 2 days
                 allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
             }
@@ -88,6 +99,11 @@ public class StorageCleanJobHbaseUtil {
         
         logger.info(allTablesNeedToBeDropped.size() + " HTable(s) to clean up");
 
+        if (hasExtLookupTable) {
+            List<String> useExtLookupTables = getAllUsedExtLookupTables();
+            logger.info("Exclude tables:{}, as they are referred by snapshots.", useExtLookupTables);
+            allTablesNeedToBeDropped.removeAll(useExtLookupTables);
+        }
         if (delete) {
             // drop tables
             ExecutorService executorService = Executors.newSingleThreadExecutor();
@@ -115,6 +131,27 @@ public class StorageCleanJobHbaseUtil {
         return allTablesNeedToBeDropped;
     }
 
+    private static List<String> getAllUsedExtLookupTables() throws IOException {
+        List<String> result = Lists.newArrayList();
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths();
+
+        for (String extSnapshotResource : activeSnapshotSet) {
+            try {
+                ExtTableSnapshotInfo extTableSnapshot = ExtTableSnapshotInfoManager.getInstance(config).getSnapshot(
+                        extSnapshotResource);
+                if (extTableSnapshot != null) {
+                    if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(extTableSnapshot.getStorageType())) {
+                        result.add(extTableSnapshot.getStorageLocationIdentifier());
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("error fetch ext table snapshot:" + extSnapshotResource, e);
+            }
+        }
+        return result;
+    }
+
     static class DeleteHTableRunnable implements Callable {
         HBaseAdmin hbaseAdmin;
         String htableName;
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index f9c0d71..c08ae70 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -250,6 +250,7 @@
             <scr:intercept-url pattern="/api/admin/public_config" access="permitAll"/>
             <scr:intercept-url pattern="/api/projects" access="permitAll"/>
             <scr:intercept-url pattern="/api/admin*/**" access="hasRole('ROLE_ADMIN')"/>
+            <scr:intercept-url pattern="/api/tables/**/snapshotLocalCache/**" access="permitAll"/>
             <scr:intercept-url pattern="/api/**" access="isAuthenticated()"/>
 
             <scr:form-login login-page="/login" />
@@ -295,6 +296,7 @@
             <scr:intercept-url pattern="/api/admin/config" access="permitAll"/>
             <scr:intercept-url pattern="/api/projects*/*" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/admin*/**" access="hasRole('ROLE_ADMIN')"/>
+            <scr:intercept-url pattern="/api/tables/**/snapshotLocalCache/**" access="permitAll"/>
             <scr:intercept-url pattern="/api/**" access="isAuthenticated()"/>
 
             <scr:form-login login-page="/login" />
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 9bf62f0..4709c08 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -96,6 +96,12 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4-rule-agent</artifactId>
+            <version>${powermock.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
new file mode 100644
index 0000000..757f6d0
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class HBaseLookupMRSteps {
+    protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupMRSteps.class);
+    private CubeInstance cube;
+    private JobEngineConfig config;
+
+    public HBaseLookupMRSteps(CubeInstance cube) {
+        this.cube = cube;
+        this.config = new JobEngineConfig(cube.getConfig());
+    }
+
+    public void addMaterializeLookupTablesSteps(DefaultChainedExecutable jobFlow) {
+        CubeDesc cubeDesc = cube.getDescriptor();
+        Set<String> allLookupTables = Sets.newHashSet();
+        for (DimensionDesc dim : cubeDesc.getDimensions()) {
+            TableRef table = dim.getTableRef();
+            if (cubeDesc.getModel().isLookupTable(table)) {
+                allLookupTables.add(table.getTableIdentity());
+            }
+        }
+        List<SnapshotTableDesc> snapshotTableDescs = cubeDesc.getSnapshotTableDescList();
+        for (SnapshotTableDesc snapshotTableDesc : snapshotTableDescs) {
+            if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(snapshotTableDesc.getStorageType())
+                    && allLookupTables.contains(snapshotTableDesc.getTableName())) {
+                addMaterializeLookupTableSteps(jobFlow, snapshotTableDesc.getTableName(), snapshotTableDesc);
+            }
+        }
+    }
+
+    public void addMaterializeLookupTableSteps(DefaultChainedExecutable jobFlow, String tableName, SnapshotTableDesc snapshotTableDesc) {
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+        TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName, cube.getProject());
+        IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc);
+        try {
+            if (extTableSnapshotInfoManager.hasLatestSnapshot(sourceTable.getSignature(), tableName)) {
+                logger.info("there is latest snapshot exist for table:{}, skip build snapshot step.", tableName);
+                return;
+            }
+        } catch (IOException ioException) {
+            throw new RuntimeException(ioException);
+        }
+        logger.info("add build snapshot steps for table:{}", tableName);
+        String snapshotID = genLookupSnapshotID();
+        addLookupTableConvertToHFilesStep(jobFlow, tableName, snapshotID);
+        addLookupTableHFilesBulkLoadStep(jobFlow, tableName, snapshotID);
+        if (snapshotTableDesc !=null && snapshotTableDesc.isEnableLocalCache()) {
+            addUpdateSnapshotQueryCacheStep(jobFlow, tableName, snapshotID);
+        }
+    }
+
+    private String genLookupSnapshotID() {
+        return UUID.randomUUID().toString();
+    }
+
+    private void addLookupTableConvertToHFilesStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) {
+        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+        createHFilesStep
+                .setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_CONVERT_HFILE + ":" + tableName);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, cube.getName());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT,
+                getLookupTableHFilePath(tableName, jobFlow.getId()));
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, tableName);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobFlow.getId());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_LOOKUP_SNAPSHOT_ID, snapshotID);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_LookupTable_HFile_Generator_" + tableName + "_Step");
+
+        createHFilesStep.setMapReduceParams(cmd.toString());
+        createHFilesStep.setMapReduceJobClass(LookupTableToHFileJob.class);
+        createHFilesStep.setCounterSaveAs(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName);
+
+        jobFlow.addTask(createHFilesStep);
+    }
+
+    private void addLookupTableHFilesBulkLoadStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) {
+        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_BULK_LOAD + ":" + tableName);
+
+        StringBuilder cmd = new StringBuilder();
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT,
+                getLookupTableHFilePath(tableName, jobFlow.getId()));
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, tableName);
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobFlow.getId());
+        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_LOOKUP_SNAPSHOT_ID, snapshotID);
+
+        bulkLoadStep.setJobParams(cmd.toString());
+        bulkLoadStep.setJobClass(LookupTableHFilesBulkLoadJob.class);
+        jobFlow.addTask(bulkLoadStep);
+    }
+
+    private void addUpdateSnapshotQueryCacheStep(DefaultChainedExecutable jobFlow, String tableName, String snapshotID) {
+        UpdateSnapshotCacheForQueryServersStep updateSnapshotCacheStep = new UpdateSnapshotCacheForQueryServersStep();
+        updateSnapshotCacheStep.setName(ExecutableConstants.STEP_NAME_LOOKUP_SNAPSHOT_CACHE_UPDATE + ":" + tableName);
+
+        LookupExecutableUtil.setProjectName(cube.getProject(), updateSnapshotCacheStep.getParams());
+        LookupExecutableUtil.setLookupTableName(tableName, updateSnapshotCacheStep.getParams());
+        LookupExecutableUtil.setLookupSnapshotID(snapshotID, updateSnapshotCacheStep.getParams());
+        jobFlow.addTask(updateSnapshotCacheStep);
+    }
+
+    private String getLookupTableHFilePath(String tableName, String jobId) {
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(JobBuilderSupport.getJobWorkingDir(config, jobId) + "/"
+                + tableName + "/hfile/");
+    }
+
+    public void appendMapReduceParameters(StringBuilder buf) {
+        appendMapReduceParameters(buf, JobEngineConfig.DEFAUL_JOB_CONF_SUFFIX);
+    }
+
+    public void appendMapReduceParameters(StringBuilder buf, String jobType) {
+        try {
+            String jobConf = config.getHadoopJobConfFilePath(jobType);
+            if (jobConf != null && jobConf.length() > 0) {
+                buf.append(" -conf ").append(jobConf);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java
new file mode 100644
index 0000000..cf28ed6
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMaterializer.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.engine.mr.ILookupMaterializer;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public class HBaseLookupMaterializer implements ILookupMaterializer{
+
+    @Override
+    public void materializeLookupTable(DefaultChainedExecutable jobFlow, CubeInstance cube, String lookupTableName) {
+        HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube);
+        SnapshotTableDesc snapshotTableDesc = cube.getDescriptor().getSnapshotTableDesc(lookupTableName);
+        lookupMRSteps.addMaterializeLookupTableSteps(jobFlow, lookupTableName, snapshotTableDesc);
+    }
+
+    @Override
+    public void materializeLookupTablesForCube(DefaultChainedExecutable jobFlow, CubeInstance cube) {
+        HBaseLookupMRSteps lookupMRSteps = new HBaseLookupMRSteps(cube);
+        lookupMRSteps.addMaterializeLookupTablesSteps(jobFlow);
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java
new file mode 100644
index 0000000..3e8c2c5
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.IExtLookupProvider;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.cache.RocksDBLookupTableCache;
+import org.apache.kylin.engine.mr.ILookupMaterializer;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Use HBase as lookup table storage
+ */
+public class HBaseLookupProvider implements IExtLookupProvider{
+    protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupProvider.class);
+
+
+    @Override
+    public ILookupTable getLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        return new HBaseLookupTable(tableDesc, extTableSnapshot);
+    }
+
+    @Override
+    public IExtLookupTableCache getLocalCache() {
+        return RocksDBLookupTableCache.getInstance(KylinConfig.getInstanceFromEnv());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+        if (engineInterface == ILookupMaterializer.class) {
+            return (I) new HBaseLookupMaterializer();
+        } else {
+            throw new RuntimeException("Cannot adapt to " + engineInterface);
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java
new file mode 100644
index 0000000..9ceabd2
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoder.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ShardingHash;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dict.lookup.AbstractLookupRowEncoder;
+import org.apache.kylin.metadata.model.TableDesc;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+
+/**
+ * encode/decode original table row to hBase row
+ * 
+ */
+public class HBaseLookupRowEncoder extends AbstractLookupRowEncoder<HBaseRow> {
+    public static final String CF_STRING = "F";
+    public static final byte[] CF = Bytes.toBytes(CF_STRING);
+
+    private int shardNum;
+
+    public HBaseLookupRowEncoder(TableDesc tableDesc, String[] keyColumns, int shardNum) {
+        super(tableDesc, keyColumns);
+        this.shardNum = shardNum;
+    }
+
+    @Override
+    public HBaseRow encode(String[] row) {
+        String[] keys = getKeyData(row);
+        String[] values = getValueData(row);
+        byte[] rowKey = encodeRowKey(keys);
+        NavigableMap<byte[], byte[]> qualifierValMap = Maps
+                .newTreeMap(org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR);
+        for (int i = 0; i < values.length; i++) {
+            byte[] qualifier = Bytes.toBytes(String.valueOf(valueIndexes[i]));
+            byte[] byteValue = toBytes(values[i]);
+            qualifierValMap.put(qualifier, byteValue);
+        }
+        return new HBaseRow(rowKey, qualifierValMap);
+    }
+
+    @Override
+    public String[] decode(HBaseRow hBaseRow) {
+        if (hBaseRow == null) {
+            return null;
+        }
+        String[] result = new String[columnsNum];
+        fillKeys(hBaseRow.rowKey, result);
+        fillValues(hBaseRow.qualifierValMap, result);
+
+        return result;
+    }
+
+    public byte[] encodeRowKey(String[] keys) {
+        keyByteBuffer.clear();
+        for (String key : keys) {
+            if (key == null) {
+                throw new IllegalArgumentException("key cannot be null:" + Arrays.toString(keys));
+            }
+            byte[] byteKey = Bytes.toBytes(key);
+            keyByteBuffer.putShort((short) byteKey.length);
+            keyByteBuffer.put(byteKey);
+        }
+        byte[] result = new byte[RowConstants.ROWKEY_SHARDID_LEN + keyByteBuffer.position()];
+        System.arraycopy(keyByteBuffer.array(), 0, result, RowConstants.ROWKEY_SHARDID_LEN, keyByteBuffer.position());
+        short shard = ShardingHash.getShard(result, RowConstants.ROWKEY_SHARDID_LEN, result.length, shardNum);
+        BytesUtil.writeShort(shard, result, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        return result;
+    }
+
+    private void fillKeys(byte[] rowKey, String[] result) {
+        int keyNum = keyIndexes.length;
+        ByteBuffer byteBuffer = ByteBuffer.wrap(rowKey);
+        byteBuffer.getShort(); // read shard
+        for (int i = 0; i < keyNum; i++) {
+            short keyLen = byteBuffer.getShort();
+            byte[] keyBytes = new byte[keyLen];
+            byteBuffer.get(keyBytes);
+            result[keyIndexes[i]] = Bytes.toString(keyBytes);
+        }
+    }
+
+    private void fillValues(Map<byte[], byte[]> qualifierValMap, String[] result) {
+        for (Entry<byte[], byte[]> qualifierValEntry : qualifierValMap.entrySet()) {
+            byte[] qualifier = qualifierValEntry.getKey();
+            byte[] value = qualifierValEntry.getValue();
+            int valIdx = Integer.valueOf(Bytes.toString(qualifier));
+            result[valIdx] = fromBytes(value);
+        }
+    }
+
+    public static class HBaseRow {
+        private byte[] rowKey;
+        private NavigableMap<byte[], byte[]> qualifierValMap;
+
+        public HBaseRow(byte[] rowKey, NavigableMap<byte[], byte[]> qualifierValMap) {
+            this.rowKey = rowKey;
+            this.qualifierValMap = qualifierValMap;
+        }
+
+        public byte[] getRowKey() {
+            return rowKey;
+        }
+
+        public NavigableMap<byte[], byte[]> getQualifierValMap() {
+            return qualifierValMap;
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java
new file mode 100644
index 0000000..78877f7
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupTable.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.dict.lookup.ILookupTable;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Use HBase as lookup table storage
+ */
+public class HBaseLookupTable implements ILookupTable{
+    protected static final Logger logger = LoggerFactory.getLogger(HBaseLookupTable.class);
+
+    private TableName lookupTableName;
+    private Table table;
+
+    private HBaseLookupRowEncoder encoder;
+
+    public HBaseLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshot) {
+        String tableName = extTableSnapshot.getStorageLocationIdentifier();
+        this.lookupTableName = TableName.valueOf(tableName);
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        Connection connection = HBaseConnection.get(kylinConfig.getStorageUrl());
+        try {
+            table = connection.getTable(lookupTableName);
+        } catch (IOException e) {
+            throw new RuntimeException("error when connect HBase", e);
+        }
+
+        String[] keyColumns = extTableSnapshot.getKeyColumns();
+        encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, extTableSnapshot.getShardNum());
+    }
+
+    @Override
+    public String[] getRow(Array<String> key) {
+        byte[] encodedKey = encoder.encodeRowKey(key.data);
+        Get get = new Get(encodedKey);
+        try {
+            Result result = table.get(get);
+            if (result.isEmpty()) {
+                return null;
+            }
+            return encoder.decode(new HBaseRow(result.getRow(), result.getFamilyMap(HBaseLookupRowEncoder.CF)));
+        } catch (IOException e) {
+            throw new RuntimeException("error when get row from hBase", e);
+        }
+    }
+
+    @Override
+    public Iterator<String[]> iterator() {
+        return new HBaseScanBasedIterator(table);
+    }
+
+    @Override
+    public void close() throws IOException{
+        table.close();
+    }
+
+    private class HBaseScanBasedIterator implements Iterator<String[]> {
+        private Iterator<Result> scannerIterator;
+        private long counter;
+
+        public HBaseScanBasedIterator(Table table) {
+            try {
+                Scan scan = new Scan();
+                scan.setCaching(1000);
+                ResultScanner scanner = table.getScanner(HBaseLookupRowEncoder.CF);
+                scannerIterator = scanner.iterator();
+            } catch (IOException e) {
+                logger.error("error when scan HBase", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return scannerIterator.hasNext();
+        }
+
+        @Override
+        public String[] next() {
+            counter ++;
+            if (counter % 100000 == 0) {
+                logger.info("scanned {} rows from hBase", counter);
+            }
+            Result result = scannerIterator.next();
+            byte[] rowKey = result.getRow();
+            NavigableMap<byte[], byte[]> qualifierValMap = result.getFamilyMap(HBaseLookupRowEncoder.CF);
+            return encoder.decode(new HBaseRow(rowKey, qualifierValMap));
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("remove is not supported");
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java
new file mode 100644
index 0000000..3daffa3
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/KVSortReducerWithDupKeyCheck.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Most code from {@link KeyValueSortReducer}, add logic to check whether the row key has duplicated
+ * if there is duplicated key, throws IllegalStateException
+ */
+public class KVSortReducerWithDupKeyCheck extends KeyValueSortReducer {
+    protected void reduce(
+            ImmutableBytesWritable row,
+            java.lang.Iterable<KeyValue> kvs,
+            org.apache.hadoop.mapreduce.Reducer<ImmutableBytesWritable, KeyValue, ImmutableBytesWritable, KeyValue>.Context context)
+            throws java.io.IOException, InterruptedException {
+        TreeSet<KeyValue> map = new TreeSet<>(KeyValue.COMPARATOR);
+
+        TreeSet<byte[]> qualifierSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
+        for (KeyValue kv : kvs) {
+            byte[] qualifier = CellUtil.cloneQualifier(kv);
+            if (qualifierSet.contains(qualifier)) {
+                throw new IllegalStateException("there is duplicate key:" + row);
+            }
+            qualifierSet.add(qualifier);
+            try {
+                map.add(kv.clone());
+            } catch (CloneNotSupportedException e) {
+                throw new java.io.IOException(e);
+            }
+        }
+        context.setStatus("Read " + map.getClass());
+        int index = 0;
+        for (KeyValue kv : map) {
+            context.write(row, kv);
+            if (++index % 100 == 0)
+                context.setStatus("Wrote " + index);
+        }
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java
new file mode 100644
index 0000000..5598ed9
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableHFilesBulkLoadJob.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LookupTableHFilesBulkLoadJob extends AbstractHadoopJob {
+
+    protected static final Logger logger = LoggerFactory.getLogger(LookupTableHFilesBulkLoadJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        options.addOption(OPTION_INPUT_PATH);
+        options.addOption(OPTION_TABLE_NAME);
+        options.addOption(OPTION_CUBING_JOB_ID);
+        options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
+        parseOptions(options, args);
+
+        String tableName = getOptionValue(OPTION_TABLE_NAME);
+        String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
+        String snapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
+        DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(cubingJobID);
+
+        ExtTableSnapshotInfoManager extTableSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+        ExtTableSnapshotInfo snapshot = extTableSnapshotInfoManager.getSnapshot(tableName, snapshotID);
+        long srcTableRowCnt = Long.valueOf(job.findExtraInfoBackward(BatchConstants.LOOKUP_EXT_SNAPSHOT_SRC_RECORD_CNT_PFX + tableName, "-1"));
+        logger.info("update table:{} snapshot row count:{}", tableName, srcTableRowCnt);
+        snapshot.setRowCnt(srcTableRowCnt);
+        snapshot.setLastBuildTime(System.currentTimeMillis());
+        extTableSnapshotInfoManager.updateSnapshot(snapshot);
+
+        String hTableName = snapshot.getStorageLocationIdentifier();
+        // e.g
+        // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
+        // end with "/"
+        String input = getOptionValue(OPTION_INPUT_PATH);
+
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        FsShell shell = new FsShell(conf);
+
+        int exitCode = -1;
+        int retryCount = 10;
+        while (exitCode != 0 && retryCount >= 1) {
+            exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
+            retryCount--;
+            Thread.sleep(5000);
+        }
+
+        if (exitCode != 0) {
+            logger.error("Failed to change the file permissions: " + input);
+            throw new IOException("Failed to change the file permissions: " + input);
+        }
+
+        String[] newArgs = new String[2];
+        newArgs[0] = input;
+        newArgs[1] = hTableName;
+
+        logger.debug("Start to run LoadIncrementalHFiles");
+        int ret = MRUtil.runMRJob(new LoadIncrementalHFiles(conf), newArgs);
+        logger.debug("End to run LoadIncrementalHFiles");
+        return ret;
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new LookupTableHFilesBulkLoadJob(), args);
+        System.exit(exitCode);
+    }
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
new file mode 100644
index 0000000..aac0108
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJob.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.apache.kylin.source.SourceManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LookupTableToHFileJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(LookupTableToHFileJob.class);
+
+    private static String ALPHA_NUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+
+    private static int HBASE_TABLE_LENGTH = 10;
+
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_TABLE_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_LOOKUP_SNAPSHOT_ID);
+            parseOptions(options, args);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            String tableName = getOptionValue(OPTION_TABLE_NAME);
+            String cubingJobID = getOptionValue(OPTION_CUBING_JOB_ID);
+            String lookupSnapshotID = getOptionValue(OPTION_LOOKUP_SNAPSHOT_ID);
+
+            KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+            CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+
+            TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(tableName,
+                    cube.getProject());
+
+            ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+            removeSnapshotIfExist(extSnapshotInfoManager, kylinConfig, tableName, lookupSnapshotID);
+
+            IReadableTable sourceTable = SourceManager.createReadableTable(tableDesc);
+
+            logger.info("create HTable for source table snapshot:{}", tableName);
+            Pair<String, Integer> hTableNameAndShard = createHTable(tableName, sourceTable, kylinConfig);
+            String[] keyColumns = getLookupKeyColumns(cube, tableName);
+            ExtTableSnapshotInfo snapshot = createSnapshotResource(extSnapshotInfoManager, tableName, lookupSnapshotID,
+                    keyColumns, hTableNameAndShard.getFirst(), hTableNameAndShard.getSecond(), sourceTable);
+            logger.info("created snapshot information at:{}", snapshot.getResourcePath());
+            saveSnapshotInfoToJobContext(kylinConfig, cubingJobID, snapshot);
+
+            job = Job.getInstance(HBaseConfiguration.create(getConf()), getOptionValue(OPTION_JOB_NAME));
+
+            setJobClasspath(job, cube.getConfig());
+            // For separate HBase cluster, note the output is a qualified HDFS path if "kylin.storage.hbase.cluster-fs" is configured, ref HBaseMRSteps.getHFilePath()
+            HBaseConnection.addHBaseClusterNNHAConfiguration(job.getConfiguration());
+
+            FileOutputFormat.setOutputPath(job, output);
+
+            IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
+            tableInputFormat.configureJob(job);
+            job.setMapperClass(LookupTableToHFileMapper.class);
+
+            // set job configuration
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, tableName);
+            // set block replication to 3 for hfiles
+            job.getConfiguration().set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
+            job.getConfiguration().set(BatchConstants.CFG_SHARD_NUM, String.valueOf(hTableNameAndShard.getSecond()));
+            // add metadata to distributed cache
+            attachCubeMetadata(cube, job.getConfiguration());
+
+            Connection conn = getHBaseConnection(kylinConfig);
+            HTable htable = (HTable) conn.getTable(TableName.valueOf(hTableNameAndShard.getFirst()));
+
+            // Automatic config !
+            HFileOutputFormat2.configureIncrementalLoad(job, htable, htable.getRegionLocator());
+
+            job.setReducerClass(KVSortReducerWithDupKeyCheck.class);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    private void removeSnapshotIfExist(ExtTableSnapshotInfoManager extSnapshotInfoManager, KylinConfig kylinConfig,
+            String tableName, String lookupSnapshotID) throws IOException {
+        ExtTableSnapshotInfo snapshotInfo = null;
+        try {
+            snapshotInfo = extSnapshotInfoManager.getSnapshot(tableName, lookupSnapshotID);
+        } catch (Exception e) {
+            // swallow the exception, means not snapshot exist of this snapshot id
+        }
+        if (snapshotInfo == null) {
+            return;
+        }
+        logger.info("the table:{} snapshot:{} exist, remove it", tableName, lookupSnapshotID);
+        extSnapshotInfoManager.removeSnapshot(tableName, lookupSnapshotID);
+        String hTableName = snapshotInfo.getStorageLocationIdentifier();
+        logger.info("remove related HBase table:{} for snapshot:{}", hTableName, lookupSnapshotID);
+        Connection conn = getHBaseConnection(kylinConfig);
+        Admin admin = conn.getAdmin();
+        admin.deleteTable(TableName.valueOf(hTableName));
+    }
+
+    private String[] getLookupKeyColumns(CubeInstance cube, String tableName) {
+        CubeDesc cubeDesc = cube.getDescriptor();
+        DataModelDesc modelDesc = cubeDesc.getModel();
+        TableRef lookupTableRef = null;
+        for (TableRef tableRef : modelDesc.getLookupTables()) {
+            if (tableRef.getTableIdentity().equalsIgnoreCase(tableName)) {
+                lookupTableRef = tableRef;
+                break;
+            }
+        }
+        if (lookupTableRef == null) {
+            throw new IllegalStateException("cannot find table in model:" + tableName);
+        }
+        JoinDesc joinDesc = modelDesc.getJoinByPKSide(lookupTableRef);
+        TblColRef[] keyColRefs = joinDesc.getPrimaryKeyColumns();
+        String[] result = new String[keyColRefs.length];
+        for (int i = 0; i < keyColRefs.length; i++) {
+            result[i] = keyColRefs[i].getName();
+        }
+        return result;
+    }
+
+    private void saveSnapshotInfoToJobContext(KylinConfig kylinConfig, String jobID, ExtTableSnapshotInfo snapshot) {
+        ExecutableManager execMgr = ExecutableManager.getInstance(kylinConfig);
+        DefaultChainedExecutable job = (DefaultChainedExecutable) execMgr.getJob(jobID);
+        job.addExtraInfo(BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + snapshot.getTableName(),
+                snapshot.getResourcePath());
+    }
+
+    /**
+     *
+     * @param sourceTableName
+     * @param sourceTable
+     * @param kylinConfig
+     * @return Pair of HTableName and shard number
+     * @throws IOException
+     */
+    private Pair<String, Integer> createHTable(String sourceTableName, IReadableTable sourceTable,
+            KylinConfig kylinConfig) throws IOException {
+        TableSignature signature = sourceTable.getSignature();
+        int shardNum = calculateShardNum(kylinConfig, signature.getSize());
+        Connection conn = getHBaseConnection(kylinConfig);
+        Admin admin = conn.getAdmin();
+        String hTableName = genHTableName(kylinConfig, admin, sourceTableName);
+
+        TableName tableName = TableName.valueOf(hTableName);
+        HTableDescriptor hTableDesc = new HTableDescriptor(tableName);
+        hTableDesc.setCompactionEnabled(false);
+        hTableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+        hTableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+        hTableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+        String commitInfo = KylinVersion.getGitCommitInfo();
+        if (!StringUtils.isEmpty(commitInfo)) {
+            hTableDesc.setValue(IRealizationConstants.HTableGitTag, commitInfo);
+        }
+
+        HColumnDescriptor cf = CubeHTableUtil.createColumnFamily(kylinConfig, HBaseLookupRowEncoder.CF_STRING, false);
+        hTableDesc.addFamily(cf);
+
+        try {
+            if (shardNum > 1) {
+                admin.createTable(hTableDesc, getSplitsByShardNum(shardNum));
+            } else {
+                admin.createTable(hTableDesc);
+            }
+        } finally {
+            IOUtils.closeQuietly(admin);
+        }
+        return new Pair<>(hTableName, shardNum);
+    }
+
+    private int calculateShardNum(KylinConfig kylinConfig, long dataSize) {
+        long shardSize = kylinConfig.getExtTableSnapshotShardingMB() * 1024 * 1024;
+        return dataSize < shardSize ? 1 : (int) (Math.ceil(dataSize / shardSize));
+    }
+
+    private byte[][] getSplitsByShardNum(int shardNum) {
+        byte[][] result = new byte[shardNum - 1][];
+        for (int i = 1; i < shardNum; ++i) {
+            byte[] split = new byte[RowConstants.ROWKEY_SHARDID_LEN];
+            BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
+            result[i - 1] = split;
+        }
+        return result;
+    }
+
+    private ExtTableSnapshotInfo createSnapshotResource(ExtTableSnapshotInfoManager extSnapshotInfoManager,
+            String tableName, String snapshotID, String[] keyColumns, String hTableName, int shardNum,
+            IReadableTable sourceTable) throws IOException {
+        return extSnapshotInfoManager.createSnapshot(sourceTable.getSignature(), tableName, snapshotID, keyColumns,
+                shardNum, ExtTableSnapshotInfo.STORAGE_TYPE_HBASE, hTableName);
+    }
+
+    private String genHTableName(KylinConfig kylinConfig, Admin admin, String tableName) throws IOException {
+        String namePrefix = kylinConfig.getHBaseTableNamePrefix()
+                + IRealizationConstants.LookupHbaseStorageLocationPrefix + tableName + "_";
+        String namespace = kylinConfig.getHBaseStorageNameSpace();
+        String hTableName;
+        Random ran = new Random();
+        do {
+            StringBuffer sb = new StringBuffer();
+            if ((namespace.equals("default") || namespace.equals("")) == false) {
+                sb.append(namespace).append(":");
+            }
+            sb.append(namePrefix);
+            for (int i = 0; i < HBASE_TABLE_LENGTH; i++) {
+                sb.append(ALPHA_NUM.charAt(ran.nextInt(ALPHA_NUM.length())));
+            }
+            hTableName = sb.toString();
+        } while (hTableExists(admin, hTableName));
+
+        return hTableName;
+    }
+
+    private boolean hTableExists(Admin admin, String hTableName) throws IOException {
+        return admin.tableExists(TableName.valueOf(hTableName));
+    }
+
+    private Connection getHBaseConnection(KylinConfig kylinConfig) throws IOException {
+        return HBaseConnection.get(kylinConfig.getStorageUrl());
+    }
+
+    public static void main(String[] args) throws Exception {
+        int exitCode = ToolRunner.run(new LookupTableToHFileJob(), args);
+        System.exit(exitCode);
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java
new file mode 100644
index 0000000..4be9533
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileMapper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+
+public class LookupTableToHFileMapper<KEYIN> extends KylinMapper<KEYIN, Object, ImmutableBytesWritable, KeyValue> {
+    ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+
+    private String cubeName;
+    private CubeDesc cubeDesc;
+    private String tableName;
+    private int shardNum;
+    private IMRTableInputFormat lookupTableInputFormat;
+    private long timestamp = 0;
+    private HBaseLookupRowEncoder encoder;
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+        tableName = context.getConfiguration().get(BatchConstants.CFG_TABLE_NAME);
+        shardNum = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_SHARD_NUM));
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        CubeManager cubeMgr = CubeManager.getInstance(config);
+        cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
+        DataModelDesc modelDesc = cubeDesc.getModel();
+        TableDesc tableDesc = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(
+                tableName, cubeDesc.getProject());
+        TableRef lookupTableRef = null;
+        for (TableRef tableRef : modelDesc.getLookupTables()) {
+            if (tableRef.getTableIdentity().equalsIgnoreCase(tableName)) {
+                lookupTableRef = tableRef;
+                break;
+            }
+        }
+        JoinDesc joinDesc = modelDesc.getJoinByPKSide(lookupTableRef);
+        TblColRef[] keyColRefs = joinDesc.getPrimaryKeyColumns();
+        String[] keyColumns = new String[keyColRefs.length];
+        for (int i = 0; i < keyColRefs.length; i++) {
+            keyColumns[i] = keyColRefs[i].getName();
+        }
+        encoder = new HBaseLookupRowEncoder(tableDesc, keyColumns, shardNum);
+        lookupTableInputFormat = MRUtil.getTableInputFormat(tableDesc);
+    }
+
+    @Override
+    public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
+        Collection<String[]> rowCollection = lookupTableInputFormat.parseMapperInput(value);
+        for (String[] row : rowCollection) {
+            HBaseRow hBaseRow = encoder.encode(row);
+
+            byte[] rowKey = hBaseRow.getRowKey();
+            Map<byte[], byte[]> qualifierValMap = hBaseRow.getQualifierValMap();
+            outputKey.set(rowKey);
+            for (Entry<byte[], byte[]> qualifierValEntry : qualifierValMap.entrySet()) {
+                KeyValue outputValue = createKeyValue(rowKey, qualifierValEntry.getKey(), qualifierValEntry.getValue());
+                context.write(outputKey, outputValue);
+            }
+        }
+    }
+
+    private KeyValue createKeyValue(byte[] keyBytes, byte[] qualifier, byte[] value) {
+        return new KeyValue(keyBytes, 0, keyBytes.length, //
+                HBaseLookupRowEncoder.CF, 0, HBaseLookupRowEncoder.CF.length, //
+                qualifier, 0, qualifier.length, //
+                timestamp, KeyValue.Type.Put, //
+                value, 0, value.length);
+    }
+
+}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
new file mode 100644
index 0000000..409116d
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStep.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.ExecuteResult.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateSnapshotCacheForQueryServersStep extends AbstractExecutable {
+    private static final Logger logger = LoggerFactory.getLogger(UpdateSnapshotCacheForQueryServersStep.class);
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final String tableName = LookupExecutableUtil.getLookupTableName(this.getParams());
+        final String snapshotID = LookupExecutableUtil.getLookupSnapshotID(this.getParams());
+        final String projectName = LookupExecutableUtil.getProjectName(this.getParams());
+
+        final KylinConfig config = KylinConfig.getInstanceFromEnv();
+        int checkInterval = 10 * 1000;
+        int maxCheckTime = 10 * 60 * 1000;
+
+        StringWriter outputWriter = new StringWriter();
+        PrintWriter pw = new PrintWriter(outputWriter);
+        String[] restServers = config.getRestServers();
+        List<String> serversNeedCheck = Lists.newArrayList();
+        for (String restServer : restServers) {
+            logger.info("send build lookup table cache request to server: " + restServer);
+            try {
+                RestClient restClient = new RestClient(restServer);
+                restClient.buildLookupSnapshotCache(projectName, tableName, snapshotID);
+                serversNeedCheck.add(restServer);
+            } catch (IOException e) {
+                logger.error("error when send build cache request to rest server:" + restServer, e);
+                pw.println("cache build fail for rest server:" + restServer);
+            }
+        }
+        if (serversNeedCheck.isEmpty()) {
+            return new ExecuteResult(State.SUCCEED, outputWriter.toString());
+        }
+
+        List<String> completeServers = Lists.newArrayList();
+        long startTime = System.currentTimeMillis();
+        while ((System.currentTimeMillis() - startTime) < maxCheckTime) {
+            serversNeedCheck.removeAll(completeServers);
+            if (serversNeedCheck.isEmpty()) {
+                break;
+            }
+            for (String restServer : serversNeedCheck) {
+                logger.info("check lookup table cache build status for server: " + restServer);
+                try {
+                    RestClient restClient = new RestClient(restServer);
+                    String stateName = restClient.getLookupSnapshotCacheState(tableName, snapshotID);
+                    if (!stateName.equals(CacheState.IN_BUILDING.name())) {
+                        completeServers.add(restServer);
+                        pw.println("cache build complete for rest server:" + restServer + " cache state:" + stateName);
+                    }
+                } catch (IOException e) {
+                    logger.error("error when send build cache request to rest server:" + restServer, e);
+                }
+            }
+            try {
+                Thread.sleep(checkInterval);
+            } catch (InterruptedException e) {
+                logger.error("interrupted", e);
+            }
+        }
+        serversNeedCheck.removeAll(completeServers);
+        if (!serversNeedCheck.isEmpty()) {
+            pw.println();
+            pw.println("check timeout!");
+            pw.println("servers not complete:" + serversNeedCheck);
+        }
+        return new ExecuteResult(State.SUCCEED, outputWriter.toString());
+    }
+
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java
new file mode 100644
index 0000000..2b21ae3
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupRowEncoderTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.NavigableMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.storage.hbase.lookup.HBaseLookupRowEncoder.HBaseRow;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HBaseLookupRowEncoderTest extends LocalFileMetadataTestCase {
+    private TableDesc tableDesc;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        TableMetadataManager metadataManager = TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        tableDesc = metadataManager.getTableDesc("TEST_COUNTRY", "default");
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testEnDeCode() {
+        HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }, 1);
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        HBaseRow hBaseRow = lookupRowEncoder.encode(row);
+
+        assertEquals(6, hBaseRow.getRowKey().length);
+        assertEquals(3, hBaseRow.getQualifierValMap().size());
+        NavigableMap<byte[], byte[]> qualifierMap = hBaseRow.getQualifierValMap();
+        assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1"))));
+        assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2"))));
+        String[] decodeRow = lookupRowEncoder.decode(hBaseRow);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithNullValue() {
+        HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc, new String[] { "COUNTRY" }, 1);
+        String[] row = new String[] { "AD", "42.546245", "1.601554", null };
+        HBaseRow hBaseRow = lookupRowEncoder.encode(row);
+
+        assertEquals(6, hBaseRow.getRowKey().length);
+        assertEquals(3, hBaseRow.getQualifierValMap().size());
+        NavigableMap<byte[], byte[]> qualifierMap = hBaseRow.getQualifierValMap();
+        assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1"))));
+        assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2"))));
+        String[] decodeRow = lookupRowEncoder.decode(hBaseRow);
+        assertNull(decodeRow[3]);
+        assertArrayEquals(row, decodeRow);
+    }
+
+    @Test
+    public void testEnDeCodeWithMultiKeys() {
+        HBaseLookupRowEncoder lookupRowEncoder = new HBaseLookupRowEncoder(tableDesc,
+                new String[] { "COUNTRY", "NAME" }, 1);
+        String[] row = new String[] { "AD", "42.546245", "1.601554", "Andorra" };
+        HBaseRow hBaseRow = lookupRowEncoder.encode(row);
+
+        assertEquals(2, hBaseRow.getQualifierValMap().size());
+        NavigableMap<byte[], byte[]> qualifierMap = hBaseRow.getQualifierValMap();
+        assertEquals("42.546245", Bytes.toString(qualifierMap.get(Bytes.toBytes("1"))));
+        assertEquals("1.601554", Bytes.toString(qualifierMap.get(Bytes.toBytes("2"))));
+        String[] decodeRow = lookupRowEncoder.decode(hBaseRow);
+        assertArrayEquals(row, decodeRow);
+    }
+
+}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java
new file mode 100644
index 0000000..cbd0461
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/LookupTableToHFileJobTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+//package org.apache.kylin.storage.hbase.lookup;
+//
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.mapreduce.Job;
+//import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+//import org.apache.kylin.cube.CubeSegment;
+//import org.apache.kylin.engine.mr.CubingJob;
+//import org.apache.kylin.job.engine.JobEngineConfig;
+//import org.junit.After;
+//import org.junit.Before;
+//import org.junit.Rule;
+//import org.junit.Test;
+//import org.powermock.api.mockito.PowerMockito;
+//import org.powermock.core.classloader.annotations.PrepareForTest;
+//import org.powermock.modules.junit4.rule.PowerMockRule;
+//
+//@PrepareForTest({ LookupTableToHFileJob.class, Job.class})
+//public class LookupTableToHFileJobTest extends LocalFileMetadataTestCase {
+//
+//    @Rule
+//    public PowerMockRule rule = new PowerMockRule();
+//
+//    @Before
+//    public void setup() throws Exception {
+//        createTestMetadata();
+//    }
+//
+//    @After
+//    public void after() throws Exception {
+//        cleanupTestMetadata();
+//    }
+//
+//    @Test
+//    public void testRun() throws Exception {
+//        String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+//        String segmentID = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b";
+//        CubeInstance cubeInstance = CubeManager.getInstance(getTestConfig()).getCube(cubeName);
+//        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentID);
+//
+//        Configuration conf = HadoopUtil.getCurrentConfiguration();
+//        conf.set("fs.defaultFS", "file:///");
+//        conf.set("mapreduce.framework.name", "local");
+//        conf.set("mapreduce.application.framework.path", "");
+//        conf.set("fs.file.impl.disable.cache", "true");
+//
+//        FileSystem localFileSystem = new LocalFileSystem();
+//        URI uri = URI.create("file:///");
+//        localFileSystem.initialize(uri, conf);
+//
+//        Job mockedJob = createMockMRJob(conf);
+//        PowerMockito.stub(PowerMockito.method(Job.class, "getInstance", Configuration.class, String.class))
+//                .toReturn(mockedJob);
+//        PowerMockito.stub(PowerMockito.method(Job.class, "getInstance", Configuration.class, String.class))
+//                .toReturn(mockedJob);
+//
+//        StringBuilder cmd = new StringBuilder();
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+//                "Build_Lookup_Table_For_Segment_20130331080000_20131212080000_Step");
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME,
+//                cubeName);
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID,
+//                segmentID);
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getOutputPath());
+//        JobBuilderSupport.appendExecCmdParameters(cmd, BatchConstants.ARG_TABLE_NAME, "EDW.TEST_SITES");
+//
+//        LookupTableToHFileJob job = new LookupTableToHFileJob();
+//        job.setConf(conf);
+//        job.setAsync(true);
+//
+//        String[] args = cmd.toString().trim().split("\\s+");
+//        job.run(args);
+//    }
+//
+//    private String getOutputPath() {
+//        return "_tmp_output";
+//    }
+//
+//    private CubingJob createMockCubingJob(CubeSegment cubeSeg) {
+//        JobEngineConfig jobEngineConfig = new JobEngineConfig(getTestConfig());
+//        CubingJob cubingJob = CubingJob.createBuildJob(cubeSeg, "unitTest", jobEngineConfig);
+//
+//        return cubingJob;
+//    }
+//
+//    private Job createMockMRJob(Configuration conf) throws Exception {
+//        Job job = PowerMockito.mock(Job.class);
+//        PowerMockito.when(job.getConfiguration()).thenReturn(conf);
+//        PowerMockito.doNothing().when(job).submit();
+//        return job;
+//    }
+
+//}
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
new file mode 100644
index 0000000..e98762d
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/lookup/UpdateSnapshotCacheForQueryServersStepTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.lookup;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.DefaultContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class UpdateSnapshotCacheForQueryServersStepTest extends LocalFileMetadataTestCase {
+    private KylinConfig kylinConfig;
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testExecute() throws ExecuteException {
+        UpdateSnapshotCacheForQueryServersStep step = new UpdateSnapshotCacheForQueryServersStep();
+        ExecuteResult result = step.doWork(new DefaultContext(Maps.<String, Executable>newConcurrentMap(), kylinConfig));
+        System.out.println(result.output());
+        assertTrue(result.succeed());
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.