You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:12 UTC

[10/52] [abbrv] incubator-kylin git commit: KYLIN-875 Refactor core-common, remove dependency on hadoop/hbase

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java b/core-common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
deleted file mode 100644
index fb0d313..0000000
--- a/core-common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseResourceStore;
-
-/**
- * a helper class to start and shutdown hbase mini cluster
- *
- * @author shaoshi
- */
-public class HBaseMiniclusterHelper {
-
-    public static final String SHARED_STORAGE_PREFIX = "KYLIN_";
-    public static final String CUBE_STORAGE_PREFIX = "KYLIN_";
-    public static final String II_STORAGE_PREFIX = "KYLIN_II_";
-    public static final String TEST_METADATA_TABLE = "kylin_metadata";
-
-    private static final String hbaseTarLocation = "../examples/test_case_data/minicluster/hbase-export.tar.gz";
-    private static final String iiEndpointClassName = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
-
-    public static HBaseTestingUtility UTIL = new HBaseTestingUtility();
-    private static volatile boolean clusterStarted = false;
-    private static String hbaseconnectionUrl = "";
-
-    private static final Log logger = LogFactory.getLog(HBaseMiniclusterHelper.class);
-
-    static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                shutdownMiniCluster();
-            }
-        });
-    }
-
-    /**
-     * Start the minicluster; Sub-classes should invoke this in BeforeClass method.
-     *
-     * @throws Exception
-     */
-    public static void startupMinicluster() throws Exception {
-
-        if (!clusterStarted) {
-            synchronized (HBaseMiniclusterHelper.class) {
-                if (!clusterStarted) {
-                    startupMiniClusterAndImportData();
-                    clusterStarted = true;
-                }
-            }
-        } else {
-            updateKylinConfigWithMinicluster();
-        }
-    }
-
-    private static void updateKylinConfigWithMinicluster() {
-
-        KylinConfig.getInstanceFromEnv().setMetadataUrl(TEST_METADATA_TABLE + "@" + hbaseconnectionUrl);
-        KylinConfig.getInstanceFromEnv().setStorageUrl(hbaseconnectionUrl);
-    }
-
-    private static void startupMiniClusterAndImportData() throws Exception {
-
-        logger.info("Going to start mini cluster.");
-
-        if (existInClassPath(iiEndpointClassName)) {
-            HBaseMiniclusterHelper.UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, iiEndpointClassName);
-        }
-
-        //https://issues.apache.org/jira/browse/HBASE-11711
-        UTIL.getConfiguration().setInt("hbase.master.info.port", -1);//avoid port clobbering
-
-        MiniHBaseCluster hbaseCluster = UTIL.startMiniCluster();
-
-        Configuration config = hbaseCluster.getConf();
-        String host = config.get(HConstants.ZOOKEEPER_QUORUM);
-        String port = config.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        String parent = config.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
-
-        // see in: https://hbase.apache.org/book.html#trouble.rs.runtime.zkexpired
-        config.set("zookeeper.session.timeout", "1200000");
-        config.set("hbase.zookeeper.property.tickTime", "6000");
-        // reduce rpc retry
-        config.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
-        config.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
-        config.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
-
-        hbaseconnectionUrl = "hbase:" + host + ":" + port + ":" + parent;
-        updateKylinConfigWithMinicluster();
-
-        UTIL.startMiniMapReduceCluster();
-
-        // create the metadata htables;
-        @SuppressWarnings("unused")
-        HBaseResourceStore store = new HBaseResourceStore(KylinConfig.getInstanceFromEnv());
-
-        // import the table content
-        HbaseImporter.importHBaseData(hbaseTarLocation, UTIL.getConfiguration());
-
-    }
-
-    private static boolean existInClassPath(String className) {
-        try {
-            Class.forName(className);
-        } catch (ClassNotFoundException e) {
-            return false;
-        }
-        return true;
-    }
-
-    /**
-     * Shutdown the minicluster; 
-     */
-    public static void shutdownMiniCluster() {
-
-        logger.info("Going to shutdown mini cluster.");
-
-        try {
-            UTIL.shutdownMiniMapReduceCluster();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        try {
-            UTIL.shutdownMiniCluster();
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public static void main(String[] args) {
-        HBaseMiniclusterHelper t = new HBaseMiniclusterHelper();
-        logger.info(t);
-        try {
-            HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.MINICLUSTER_TEST_DATA);
-            HBaseMiniclusterHelper.startupMinicluster();
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            HBaseMiniclusterHelper.shutdownMiniCluster();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/HbaseImporter.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HbaseImporter.java b/core-common/src/test/java/org/apache/kylin/common/util/HbaseImporter.java
deleted file mode 100644
index 1647d54..0000000
--- a/core-common/src/test/java/org/apache/kylin/common/util/HbaseImporter.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.mapreduce.Import;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class HbaseImporter {
-
-    private static final Log logger = LogFactory.getLog(HbaseImporter.class);
-
-    public static void importHBaseData(String hbaseTarLocation, Configuration conf) throws IOException, ClassNotFoundException, InterruptedException {
-
-        if (System.getenv("JAVA_HOME") == null) {
-            logger.error("Didn't find $JAVA_HOME, this will cause HBase data import failed. Please set $JAVA_HOME.");
-            logger.error("Skipping table import...");
-            return;
-        }
-
-        File exportFile = new File(hbaseTarLocation);
-        if (!exportFile.exists()) {
-            logger.error("Didn't find the export achieve file on " + exportFile.getAbsolutePath());
-            return;
-        }
-
-        File folder = new File("/tmp/hbase-export/");
-        if (folder.exists()) {
-            FileUtils.deleteDirectory(folder);
-        }
-        folder.mkdirs();
-        folder.deleteOnExit();
-
-        //TarGZUtil.uncompressTarGZ(exportFile, folder);
-        FileUtil.unTar(exportFile, folder);
-        String[] child = folder.list();
-        Preconditions.checkState(child.length == 1);
-        String backupFolderName = child[0];
-        File backupFolder = new File(folder, backupFolderName);
-        String[] tableNames = backupFolder.list();
-
-        for (String table : tableNames) {
-
-            if (!(table.equalsIgnoreCase(HBaseMiniclusterHelper.TEST_METADATA_TABLE) || table.startsWith(HBaseMiniclusterHelper.SHARED_STORAGE_PREFIX))) {
-                continue;
-            }
-
-            // create the htable; otherwise the import will fail.
-            if (table.startsWith(HBaseMiniclusterHelper.II_STORAGE_PREFIX)) {
-                HBaseConnection.createHTableIfNeeded(KylinConfig.getInstanceFromEnv().getStorageUrl(), table, "f");
-            } else if (table.startsWith(HBaseMiniclusterHelper.CUBE_STORAGE_PREFIX)) {
-                HBaseConnection.createHTableIfNeeded(KylinConfig.getInstanceFromEnv().getStorageUrl(), table, "F1", "F2");
-            }
-
-            // directly import from local fs, no need to copy to hdfs
-            String importLocation = "file://" + backupFolder.getAbsolutePath() + "/" + table;
-            String[] args = new String[] { table, importLocation };
-            boolean result = runImport(args, conf);
-            logger.info("importing table '" + table + "' with result:" + result);
-
-            if (!result)
-                break;
-        }
-
-    }
-
-    private static boolean runImport(String[] args, Configuration configuration) throws IOException, InterruptedException, ClassNotFoundException {
-        // need to make a copy of the configuration because to make sure different temp dirs are used.
-        GenericOptionsParser opts = new GenericOptionsParser(new Configuration(configuration), args);
-        Configuration newConf = opts.getConfiguration();
-        args = opts.getRemainingArgs();
-        Job job = Import.createSubmittableJob(newConf, args);
-        job.waitForCompletion(false);
-        return job.isSuccessful();
-    }
-
-    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-        if (args.length != 1) {
-            logger.error("Usage: HbaseImporter hbase_tar_lcoation");
-            System.exit(-1);
-        }
-
-        logger.info("The KylinConfig being used:");
-        logger.info("=================================================");
-        KylinConfig.getInstanceFromEnv().printProperties();
-        logger.info("=================================================");
-
-        importHBaseData(args[0], HBaseConfiguration.create());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java
index 7e802a4..452c5e6 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/IdentityUtilTest.java
@@ -1,11 +1,11 @@
 package org.apache.kylin.common.util;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.collections.CollectionUtils;
+import java.util.List;
+
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.List;
+import com.google.common.collect.Lists;
 
 /**
  */
@@ -20,9 +20,6 @@ public class IdentityUtilTest {
         List<String> c2 = Lists.newArrayList(s2);
         List<String> c3 = Lists.newArrayList(s2);
 
-        Assert.assertTrue(CollectionUtils.isEqualCollection(c1,c2));
-        Assert.assertTrue(CollectionUtils.isEqualCollection(c3,c2));
-
         Assert.assertFalse(IdentityUtils.collectionReferenceEquals(c1,c2));
         Assert.assertTrue(IdentityUtils.collectionReferenceEquals(c3,c2));
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java b/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java
index 89d5701..1cf1e8b 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/RandomSamplerTest.java
@@ -23,24 +23,21 @@ import static org.junit.Assert.*;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.hadoop.io.Text;
 import org.junit.Test;
 
 /**
- * @author ysong1
- * 
  */
 public class RandomSamplerTest {
 
     @Test
     public void test() {
-        RandomSampler<Text> s = new RandomSampler<Text>();
-        List<Text> data = new ArrayList<Text>();
+        RandomSampler<String> s = new RandomSampler<String>();
+        List<String> data = new ArrayList<String>();
         for (int i = 0; i < 1000; i++) {
-            data.add(new Text(String.valueOf(i)));
+            data.add(String.valueOf(i));
         }
 
-        List<Text> result = s.sample(data, 50);
+        List<String> result = s.sample(data, 50);
         System.out.println(result);
         assertEquals(50, result.size());
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java b/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java
index 2a9ce40..e18b3e7 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/RangeTest.java
@@ -9,6 +9,7 @@ import com.google.common.collect.Ranges;
 
 /**
  */
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class RangeTest {
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java b/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java
index 5680234..3f32b86 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/SSHClientTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.*;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.junit.After;
 import org.junit.Before;
@@ -80,7 +80,9 @@ public class SSHClientTest extends LocalFileMetadataTestCase {
             return;
 
         SSHClient ssh = new SSHClient(this.hostname, this.port, this.username, this.password);
-        File tmpFile = FileUtil.createLocalTempFile(new File("/tmp/test_scp"), "temp_", false);
+        File tmpFile = File.createTempFile("test_scp", "", new File("/tmp"));
+        tmpFile.deleteOnExit();
+        FileUtils.write(tmpFile, "test_scp");
         ssh.scpFileToRemote(tmpFile.getAbsolutePath(), "/tmp");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-cube/pom.xml
----------------------------------------------------------------------
diff --git a/core-cube/pom.xml b/core-cube/pom.xml
index 711e4db..8751a78 100644
--- a/core-cube/pom.xml
+++ b/core-cube/pom.xml
@@ -43,6 +43,8 @@
             <artifactId>kylin-core-dictionary</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+
+        <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-common</artifactId>
@@ -50,8 +52,6 @@
             <scope>test</scope>
             <version>${project.parent.version}</version>
         </dependency>
-
-        <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index cc61dac..6ed4100 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -44,11 +44,11 @@ import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
 import org.apache.kylin.dict.lookup.LookupStringTable;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
 import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -161,13 +161,13 @@ public class CubeManager implements IRealizationProvider {
         return result;
     }
 
-    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, String factColumnsPath) throws IOException {
+    public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
         CubeDesc cubeDesc = cubeSeg.getCubeDesc();
         if (!cubeDesc.getRowkey().isUseDictionary(col))
             return null;
 
         DictionaryManager dictMgr = getDictionaryManager();
-        DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factColumnsPath);
+        DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factTableValueProvider);
 
         if (dictInfo != null) {
             cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
@@ -708,8 +708,6 @@ public class CubeManager implements IRealizationProvider {
      * - Favors big segments over the small
      */
     private List<CubeSegment> calculateToBeSegments(CubeInstance cube, boolean strictChecking, CubeSegment... newSegments) {
-        CubeDesc cubeDesc = cube.getDescriptor();
-        PartitionDesc partDesc = cubeDesc.getModel().getPartitionDesc();
 
         List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
         if (newSegments != null)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index feb0f1a..e375a19 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -23,11 +23,11 @@ import java.io.IOException;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -35,14 +35,14 @@ public class DictionaryGeneratorCLI {
 
     private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
 
-    public static void processSegment(KylinConfig config, String cubeName, String segmentName, String factColumnsPath) throws IOException {
+    public static void processSegment(KylinConfig config, String cubeName, String segmentName, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
 
-        processSegment(config, segment, factColumnsPath);
+        processSegment(config, segment, factTableValueProvider);
     }
 
-    private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String factColumnsPath) throws IOException {
+    private static void processSegment(KylinConfig config, CubeSegment cubeSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
         CubeManager cubeMgr = CubeManager.getInstance(config);
 
         for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
@@ -50,7 +50,7 @@ public class DictionaryGeneratorCLI {
             for (TblColRef col : dim.getColumnRefs()) {
                 if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
                     logger.info("Building dictionary for " + col);
-                    cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath);
+                    cubeMgr.buildDictionary(cubeSeg, col, factTableValueProvider);
                 }
             }
 
@@ -60,8 +60,7 @@ public class DictionaryGeneratorCLI {
                 logger.info("Building snapshot of " + dim.getTable());
                 cubeMgr.buildSnapshotTable(cubeSeg, dim.getTable());
                 logger.info("Checking snapshot of " + dim.getTable());
-                cubeMgr.getLookupTable(cubeSeg, dim); // load the table for
-                                                      // sanity check
+                cubeMgr.getLookupTable(cubeSeg, dim); // load the table for sanity check
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/pom.xml
----------------------------------------------------------------------
diff --git a/core-dictionary/pom.xml b/core-dictionary/pom.xml
index 14b54a0..649f58b 100644
--- a/core-dictionary/pom.xml
+++ b/core-dictionary/pom.xml
@@ -40,6 +40,8 @@
             <artifactId>kylin-core-metadata</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+
+        <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-common</artifactId>
@@ -47,8 +49,6 @@
             <scope>test</scope>
             <version>${project.parent.version}</version>
         </dependency>
-
-        <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 611251a..a579abf 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,23 +18,14 @@
 
 package org.apache.kylin.dict;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.dict.lookup.FileTable;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.DataType;
@@ -195,11 +186,11 @@ public class DictionaryManager {
         }
     }
 
-    public DictionaryInfo buildDictionary(DataModelDesc model, String dict, TblColRef col, String factColumnsPath) throws IOException {
+    public DictionaryInfo buildDictionary(DataModelDesc model, String dict, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
 
         logger.info("building dictionary for " + col);
 
-        Object[] tmp = decideSourceData(model, dict, col, factColumnsPath);
+        Object[] tmp = decideSourceData(model, dict, col, factTableValueProvider);
         String srcTable = (String) tmp[0];
         String srcCol = (String) tmp[1];
         int srcColIdx = (Integer) tmp[2];
@@ -232,93 +223,42 @@ public class DictionaryManager {
      * 4. ReadableTable object
      */
 
-    public Object[] decideSourceData(DataModelDesc model, String dict, TblColRef col, String factColumnsPath) throws IOException {
+    public Object[] decideSourceData(DataModelDesc model, String dict, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
         String srcTable;
         String srcCol;
         int srcColIdx;
         ReadableTable table;
         MetadataManager metaMgr = MetadataManager.getInstance(config);
 
-        // case of full table (dict on fact table)
-        if (model == null) {
-            srcTable = col.getTable();
-            srcCol = col.getName();
-            srcColIdx = col.getColumnDesc().getZeroBasedIndex();
-            int nColumns = metaMgr.getTableDesc(col.getTable()).getColumnCount();
-            table = new FileTable(factColumnsPath + "/" + col.getName(), nColumns);
-            return new Object[] { srcTable, srcCol, srcColIdx, table };
-        }
-
         // Decide source data of dictionary:
         // 1. If 'useDict' specifies pre-defined data set, use that
         // 2. Otherwise find a lookup table to scan through
 
         // Note FK on fact table is supported by scan the related PK on lookup table
 
-        //String useDict = cube.getRowkey().getDictionary(col);
-
         // normal case, source from lookup table
         if ("true".equals(dict) || "string".equals(dict) || "number".equals(dict) || "any".equals(dict)) {
             // FK on fact table and join type is inner, use PK from lookup instead
             if (model.isFactTable(col.getTable())) {
                 TblColRef pkCol = model.findPKByFK(col, "inner");
                 if (pkCol != null)
-                    col = pkCol; // scan the counterparty PK on lookup table
-                // instead
+                    col = pkCol; // scan the counterparty PK on lookup table instead
             }
             srcTable = col.getTable();
             srcCol = col.getName();
             srcColIdx = col.getColumnDesc().getZeroBasedIndex();
             if (model.isFactTable(col.getTable())) {
-                table = new FileTable(factColumnsPath + "/" + col.getName(), -1);
+                table = factTableValueProvider.getDistinctValuesFor(col);
             } else {
                 table = TableSourceFactory.createReadableTable(metaMgr.getTableDesc(col.getTable()));
             }
         }
-        // otherwise could refer to a data set, e.g. common_indicators.txt
-        // (LEGACY PATH, since distinct values are collected from fact table)
-        else {
-            String dictDataSetPath = unpackDataSet(this.config.getTempHDFSDir(), dict);
-            if (dictDataSetPath == null)
-                throw new IllegalArgumentException("Unknown dictionary data set '" + dict + "', referred from " + col);
-            srcTable = "PREDEFINED";
-            srcCol = dict;
-            srcColIdx = 0;
-            table = new FileTable(dictDataSetPath, -1);
-        }
+        else
+            throw new IllegalArgumentException("Unknown dictionary value: " + dict);
 
         return new Object[] { srcTable, srcCol, srcColIdx, table };
     }
 
-    private String unpackDataSet(String tempHDFSDir, String dataSetName) throws IOException {
-
-        InputStream in = this.getClass().getResourceAsStream("/org/apache/kylin/dict/" + dataSetName + ".txt");
-        if (in == null) // data set resource not found
-            return null;
-
-        ByteArrayOutputStream buf = new ByteArrayOutputStream();
-        IOUtils.copy(in, buf);
-        in.close();
-        byte[] bytes = buf.toByteArray();
-
-        Path tmpDataSetPath = new Path(tempHDFSDir + "/dict/temp_dataset/" + dataSetName + "_" + bytes.length + ".txt");
-
-        FileSystem fs = HadoopUtil.getFileSystem(tempHDFSDir);
-        boolean writtenNewFile = false;
-        if (fs.exists(tmpDataSetPath) == false || fs.getFileStatus(tmpDataSetPath).getLen() != bytes.length) {
-            fs.mkdirs(tmpDataSetPath.getParent());
-            FSDataOutputStream out = fs.create(tmpDataSetPath);
-            IOUtils.copy(new ByteArrayInputStream(bytes), out);
-            out.close();
-            writtenNewFile = true;
-        }
-
-        String qualifiedPath = tmpDataSetPath.makeQualified(fs.getUri(), new Path("/")).toString();
-        if (writtenNewFile)
-            logger.info("Dictionary temp data set file written to " + qualifiedPath);
-        return qualifiedPath;
-    }
-
     private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException {
         ResourceStore store = MetadataManager.getInstance(config).getStore();
         ArrayList<String> existings = store.listResources(dictInfo.getResourceDir());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
new file mode 100644
index 0000000..66511be
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DistinctColumnValuesProvider.java
@@ -0,0 +1,17 @@
+package org.apache.kylin.dict;
+
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ * To build dictionary, we need a list of distinct values on a column.
+ * For column on lookup table, simply scan the whole table since the table is small.
+ * For column on fact table, the fact table is too big to iterate. So the build
+ * engine will first extract distinct values (by a MR job for example), and
+ * implement this interface to provide the result to DictionaryManager.
+ */
+public interface DistinctColumnValuesProvider {
+
+    /** Return a ReadableTable contains only one column, each row being a distinct value. */
+    public ReadableTable getDistinctValuesFor(TblColRef col);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
deleted file mode 100644
index 59eca4a..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- */
-public class FileTable implements ReadableTable {
-
-    public static final String DELIM_AUTO = "auto";
-    public static final String DELIM_COMMA = ",";
-
-    String path;
-    String delim;
-    int nColumns;
-
-    public FileTable(String path, int nColumns) {
-        this(path, DELIM_AUTO, nColumns);
-    }
-
-    public FileTable(String path, String delim, int nColumns) {
-        this.path = path;
-        this.delim = delim;
-        this.nColumns = nColumns;
-    }
-
-    public String getColumnDelimeter() {
-        return delim;
-    }
-
-    @Override
-    public TableReader getReader() throws IOException {
-        return new FileTableReader(path, delim, nColumns);
-    }
-
-    @Override
-    public TableSignature getSignature() throws IOException {
-        try {
-            Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path);
-            return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond());
-        } catch (FileNotFoundException ex) {
-            return null;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return path;
-    }
-
-    public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException {
-        FileSystem fs = HadoopUtil.getFileSystem(path);
-
-        // get all contained files if path is directory
-        ArrayList<FileStatus> allFiles = new ArrayList<>();
-        FileStatus status = fs.getFileStatus(new Path(path));
-        if (status.isFile()) {
-            allFiles.add(status);
-        } else {
-            FileStatus[] listStatus = fs.listStatus(new Path(path));
-            allFiles.addAll(Arrays.asList(listStatus));
-        }
-
-        long size = 0;
-        long lastModified = 0;
-        for (FileStatus file : allFiles) {
-            size += file.getLen();
-            lastModified = Math.max(lastModified, file.getModificationTime());
-        }
-
-        return new Pair<Long, Long>(size, lastModified);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
deleted file mode 100644
index 4e04c93..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.BufferedReader;
-import java.io.Closeable;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.source.ReadableTable.TableReader;
-
-/**
- * Tables are typically CSV or SEQ file.
- * 
- * @author yangli9
- */
-public class FileTableReader implements TableReader {
-
-    private static final Logger logger = LoggerFactory.getLogger(FileTableReader.class);
-    private static final char CSV_QUOTE = '"';
-    private static final String[] DETECT_DELIMS = new String[] { "\177", "|", "\t", "," };
-
-    private String filePath;
-    private String delim;
-    private RowReader reader;
-
-    private String curLine;
-    private String[] curColumns;
-    private int expectedColumnNumber = -1; // helps delimiter detection
-
-    public FileTableReader(String filePath, int expectedColumnNumber) throws IOException {
-        this(filePath, FileTable.DELIM_AUTO, expectedColumnNumber);
-    }
-    
-    public FileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
-        filePath = HadoopUtil.fixWindowsPath(filePath);
-        this.filePath = filePath;
-        this.delim = delim;
-        this.expectedColumnNumber = expectedColumnNumber;
-
-        FileSystem fs = HadoopUtil.getFileSystem(filePath);
-
-        try {
-            this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
-
-        } catch (IOException e) {
-            if (isExceptionSayingNotSeqFile(e) == false)
-                throw e;
-
-            this.reader = new CsvRowReader(fs, filePath);
-        }
-    }
-
-    private boolean isExceptionSayingNotSeqFile(IOException e) {
-        if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile"))
-            return true;
-
-        if (e instanceof EOFException) // in case the file is very very small
-            return true;
-
-        return false;
-    }
-
-    @Override
-    public boolean next() throws IOException {
-        curLine = reader.nextLine();
-        curColumns = null;
-        return curLine != null;
-    }
-
-    public String getLine() {
-        return curLine;
-    }
-
-    @Override
-    public String[] getRow() {
-        if (curColumns == null) {
-            if (FileTable.DELIM_AUTO.equals(delim))
-                delim = autoDetectDelim(curLine);
-
-            if (delim == null)
-                curColumns = new String[] { curLine };
-            else
-                curColumns = split(curLine, delim);
-        }
-        return curColumns;
-    }
-
-    private String[] split(String line, String delim) {
-        // FIXME CVS line should be parsed considering escapes
-        String str[] = StringSplitter.split(line, delim);
-
-        // un-escape CSV
-        if (FileTable.DELIM_COMMA.equals(delim)) {
-            for (int i = 0; i < str.length; i++) {
-                str[i] = unescapeCsv(str[i]);
-            }
-        }
-
-        return str;
-    }
-
-    private String unescapeCsv(String str) {
-        if (str == null || str.length() < 2)
-            return str;
-
-        str = StringEscapeUtils.unescapeCsv(str);
-
-        // unescapeCsv may not remove the outer most quotes
-        if (str.charAt(0) == CSV_QUOTE && str.charAt(str.length() - 1) == CSV_QUOTE)
-            str = str.substring(1, str.length() - 1);
-
-        return str;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (reader != null)
-            reader.close();
-    }
-
-    private String autoDetectDelim(String line) {
-        if (expectedColumnNumber > 0) {
-            for (String delim : DETECT_DELIMS) {
-                if (StringSplitter.split(line, delim).length == expectedColumnNumber) {
-                    logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber + " columns -- " + line);
-                    return delim;
-                }
-            }
-        }
-
-        logger.info("Auto detect delim to be null, will take THE-WHOLE-LINE as a single value, for " + filePath);
-        return null;
-    }
-
-    // ============================================================================
-
-    private interface RowReader extends Closeable {
-        String nextLine() throws IOException; // return null on EOF
-    }
-
-    private class SeqRowReader implements RowReader {
-        Reader reader;
-        Writable key;
-        Text value;
-
-        SeqRowReader(Configuration hconf, FileSystem fs, String path) throws IOException {
-            reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
-            key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
-            value = new Text();
-        }
-
-        @Override
-        public String nextLine() throws IOException {
-            boolean hasNext = reader.next(key, value);
-            if (hasNext)
-                return Bytes.toString(value.getBytes(), 0, value.getLength());
-            else
-                return null;
-        }
-
-        @Override
-        public void close() throws IOException {
-            reader.close();
-        }
-    }
-
-    private class CsvRowReader implements RowReader {
-        BufferedReader reader;
-
-        CsvRowReader(FileSystem fs, String path) throws IOException {
-            FSDataInputStream in = fs.open(new Path(path));
-            reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
-        }
-
-        @Override
-        public String nextLine() throws IOException {
-            return reader.readLine();
-        }
-
-        @Override
-        public void close() throws IOException {
-            reader.close();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java
deleted file mode 100644
index 86fa635..0000000
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/LookupTableTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import java.io.File;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.dict.lookup.FileTable;
-import org.apache.kylin.dict.lookup.LookupBytesTable;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-
-/**
- * @author yangli9
- */
-public class LookupTableTest extends LocalFileMetadataTestCase {
-
-    @Before
-    public void setup() throws Exception {
-        createTestMetadata();
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    @Test
-    public void testBasic() throws Exception {
-        TableDesc siteTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("EDW.TEST_SITES");
-        TableDesc categoryTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.test_category_groupings");
-        LookupBytesTable lookup;
-
-        System.out.println("============================================================================");
-
-        File f = new File(LOCALMETA_TEST_DATA + "/data/EDW.TEST_SITES.csv");
-        lookup = new LookupBytesTable(siteTable, new String[] { "SITE_ID" }, new FileTable("file://" + f.getAbsolutePath(), 10));
-        lookup.dump();
-
-        System.out.println("============================================================================");
-
-        f = new File(LOCALMETA_TEST_DATA + "/data/DEFAULT.TEST_CATEGORY_GROUPINGS.csv");
-        lookup = new LookupBytesTable(categoryTable, new String[] { "leaf_categ_id", "site_id" }, new FileTable("file://" + f.getAbsolutePath(), 36));
-        lookup.dump();
-
-        System.out.println("============================================================================");
-
-        ByteArray k1 = new ByteArray(Bytes.toBytes("533"));
-        ByteArray k2 = new ByteArray(Bytes.toBytes("0"));
-        Array<ByteArray> key = new Array<ByteArray>(new ByteArray[] { k1, k2 });
-        System.out.println(lookup.getRow(key));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
deleted file mode 100644
index cfecaee..0000000
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/TableReaderTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-
-import org.apache.kylin.dict.lookup.FileTable;
-import org.apache.kylin.dict.lookup.FileTableReader;
-import org.junit.Test;
-
-/**
- * @author yangli9
- * 
- */
-public class TableReaderTest {
-
-    @Test
-    public void testBasicReader() throws IOException {
-        File f = new File("src/test/resources/dict/DW_SITES");
-        FileTableReader reader = new FileTableReader("file://" + f.getAbsolutePath(), FileTable.DELIM_AUTO, 10);
-        while (reader.next()) {
-            assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", Arrays.toString(reader.getRow()));
-            break;
-        }
-        reader.close();
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
new file mode 100644
index 0000000..490e3f7
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/JobLock.java
@@ -0,0 +1,9 @@
+package org.apache.kylin.job.lock;
+
+/**
+ */
+public interface JobLock {
+    boolean lock();
+
+    void unlock();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
new file mode 100644
index 0000000..f5cea4e
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/MockJobLock.java
@@ -0,0 +1,15 @@
+package org.apache.kylin.job.lock;
+
+/**
+ */
+public class MockJobLock implements JobLock {
+    @Override
+    public boolean lock() {
+        return true;
+    }
+
+    @Override
+    public void unlock() {
+        return;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/core-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/core-metadata/pom.xml b/core-metadata/pom.xml
index 3c4f180..71457bc 100644
--- a/core-metadata/pom.xml
+++ b/core-metadata/pom.xml
@@ -40,6 +40,8 @@
             <artifactId>kylin-core-common</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
+
+        <!-- Env & Test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-common</artifactId>
@@ -47,9 +49,6 @@
             <scope>test</scope>
             <version>${project.parent.version}</version>
         </dependency>
-
-        <!-- Env & Test -->
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/pom.xml
----------------------------------------------------------------------
diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index 748a9df..a226e80 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -53,6 +53,46 @@
 
         <!-- Env & Test -->
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>provided</scope>
+            <!-- protobuf version conflict with hbase -->
+            <exclusions>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-app</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
new file mode 100644
index 0000000..152b4af
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ */
+public class DFSFileTable implements ReadableTable {
+
+    public static final String DELIM_AUTO = "auto";
+    public static final String DELIM_COMMA = ",";
+
+    String path;
+    String delim;
+    int nColumns;
+
+    public DFSFileTable(String path, int nColumns) {
+        this(path, DELIM_AUTO, nColumns);
+    }
+
+    public DFSFileTable(String path, String delim, int nColumns) {
+        this.path = path;
+        this.delim = delim;
+        this.nColumns = nColumns;
+    }
+
+    public String getColumnDelimeter() {
+        return delim;
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        return new DFSFileTableReader(path, delim, nColumns);
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        try {
+            Pair<Long, Long> sizeAndLastModified = getSizeAndLastModified(path);
+            return new TableSignature(path, sizeAndLastModified.getFirst(), sizeAndLastModified.getSecond());
+        } catch (FileNotFoundException ex) {
+            return null;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return path;
+    }
+
+    public static Pair<Long, Long> getSizeAndLastModified(String path) throws IOException {
+        FileSystem fs = HadoopUtil.getFileSystem(path);
+
+        // get all contained files if path is directory
+        ArrayList<FileStatus> allFiles = new ArrayList<>();
+        FileStatus status = fs.getFileStatus(new Path(path));
+        if (status.isFile()) {
+            allFiles.add(status);
+        } else {
+            FileStatus[] listStatus = fs.listStatus(new Path(path));
+            allFiles.addAll(Arrays.asList(listStatus));
+        }
+
+        long size = 0;
+        long lastModified = 0;
+        for (FileStatus file : allFiles) {
+            size += file.getLen();
+            lastModified = Math.max(lastModified, file.getModificationTime());
+        }
+
+        return new Pair<Long, Long>(size, lastModified);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
new file mode 100644
index 0000000..51fb734
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.source.ReadableTable.TableReader;
+
+/**
+ * Tables are typically CSV or SEQ file.
+ * 
+ * @author yangli9
+ */
+public class DFSFileTableReader implements TableReader {
+
+    private static final Logger logger = LoggerFactory.getLogger(DFSFileTableReader.class);
+    private static final char CSV_QUOTE = '"';
+    private static final String[] DETECT_DELIMS = new String[] { "\177", "|", "\t", "," };
+
+    private String filePath;
+    private String delim;
+    private RowReader reader;
+
+    private String curLine;
+    private String[] curColumns;
+    private int expectedColumnNumber = -1; // helps delimiter detection
+
+    public DFSFileTableReader(String filePath, int expectedColumnNumber) throws IOException {
+        this(filePath, DFSFileTable.DELIM_AUTO, expectedColumnNumber);
+    }
+    
+    public DFSFileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
+        filePath = HadoopUtil.fixWindowsPath(filePath);
+        this.filePath = filePath;
+        this.delim = delim;
+        this.expectedColumnNumber = expectedColumnNumber;
+
+        FileSystem fs = HadoopUtil.getFileSystem(filePath);
+
+        try {
+            this.reader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, filePath);
+
+        } catch (IOException e) {
+            if (isExceptionSayingNotSeqFile(e) == false)
+                throw e;
+
+            this.reader = new CsvRowReader(fs, filePath);
+        }
+    }
+
+    private boolean isExceptionSayingNotSeqFile(IOException e) {
+        if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile"))
+            return true;
+
+        if (e instanceof EOFException) // in case the file is very very small
+            return true;
+
+        return false;
+    }
+
+    @Override
+    public boolean next() throws IOException {
+        curLine = reader.nextLine();
+        curColumns = null;
+        return curLine != null;
+    }
+
+    public String getLine() {
+        return curLine;
+    }
+
+    @Override
+    public String[] getRow() {
+        if (curColumns == null) {
+            if (DFSFileTable.DELIM_AUTO.equals(delim))
+                delim = autoDetectDelim(curLine);
+
+            if (delim == null)
+                curColumns = new String[] { curLine };
+            else
+                curColumns = split(curLine, delim);
+        }
+        return curColumns;
+    }
+
+    private String[] split(String line, String delim) {
+        // FIXME CVS line should be parsed considering escapes
+        String str[] = StringSplitter.split(line, delim);
+
+        // un-escape CSV
+        if (DFSFileTable.DELIM_COMMA.equals(delim)) {
+            for (int i = 0; i < str.length; i++) {
+                str[i] = unescapeCsv(str[i]);
+            }
+        }
+
+        return str;
+    }
+
+    private String unescapeCsv(String str) {
+        if (str == null || str.length() < 2)
+            return str;
+
+        str = StringEscapeUtils.unescapeCsv(str);
+
+        // unescapeCsv may not remove the outer most quotes
+        if (str.charAt(0) == CSV_QUOTE && str.charAt(str.length() - 1) == CSV_QUOTE)
+            str = str.substring(1, str.length() - 1);
+
+        return str;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (reader != null)
+            reader.close();
+    }
+
+    private String autoDetectDelim(String line) {
+        if (expectedColumnNumber > 0) {
+            for (String delim : DETECT_DELIMS) {
+                if (StringSplitter.split(line, delim).length == expectedColumnNumber) {
+                    logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber + " columns -- " + line);
+                    return delim;
+                }
+            }
+        }
+
+        logger.info("Auto detect delim to be null, will take THE-WHOLE-LINE as a single value, for " + filePath);
+        return null;
+    }
+
+    // ============================================================================
+
+    private interface RowReader extends Closeable {
+        String nextLine() throws IOException; // return null on EOF
+    }
+
+    private class SeqRowReader implements RowReader {
+        Reader reader;
+        Writable key;
+        Text value;
+
+        SeqRowReader(Configuration hconf, FileSystem fs, String path) throws IOException {
+            reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
+            key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
+            value = new Text();
+        }
+
+        @Override
+        public String nextLine() throws IOException {
+            boolean hasNext = reader.next(key, value);
+            if (hasNext)
+                return Bytes.toString(value.getBytes(), 0, value.getLength());
+            else
+                return null;
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+        }
+    }
+
+    private class CsvRowReader implements RowReader {
+        BufferedReader reader;
+
+        CsvRowReader(FileSystem fs, String path) throws IOException {
+            FSDataInputStream in = fs.open(new Path(path));
+            reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+        }
+
+        @Override
+        public String nextLine() throws IOException {
+            return reader.readLine();
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
new file mode 100644
index 0000000..1c00993
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.io.Writable;
+
+public class HadoopUtil {
+
+    private static ThreadLocal<Configuration> hadoopConfig = new ThreadLocal<>();
+
+    public static void setCurrentConfiguration(Configuration conf) {
+        hadoopConfig.set(conf);
+    }
+
+    public static Configuration getCurrentConfiguration() {
+        if (hadoopConfig.get() == null) {
+            hadoopConfig.set(new Configuration());
+        }
+        return hadoopConfig.get();
+    }
+
+    public static FileSystem getFileSystem(String path) throws IOException {
+        return FileSystem.get(makeURI(path), getCurrentConfiguration());
+    }
+
+    public static URI makeURI(String filePath) {
+        try {
+            return new URI(fixWindowsPath(filePath));
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Cannot create FileSystem from URI: " + filePath, e);
+        }
+    }
+
+    public static String fixWindowsPath(String path) {
+        // fix windows path
+        if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) {
+            path = path.replace("file://", "file:///");
+        }
+        if (path.startsWith("file:///")) {
+            path = path.replace('\\', '/');
+        }
+        return path;
+    }
+
+    public static Configuration newHadoopJobConfiguration() {
+        Configuration conf = new Configuration();
+        conf.set(DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, "8");
+        return conf;
+    }
+
+    /**
+     * @param table the identifier of hive table, in format <db_name>.<table_name>
+     * @return a string array with 2 elements: {"db_name", "table_name"}
+     */
+    public static String[] parseHiveTableName(String table) {
+        int cut = table.indexOf('.');
+        String database = cut >= 0 ? table.substring(0, cut).trim() : "DEFAULT";
+        String tableName = cut >= 0 ? table.substring(cut + 1).trim() : table.trim();
+
+        return new String[] { database, tableName };
+    }
+
+    public static void deletePath(Configuration conf, Path path) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+        }
+    }
+
+    public static byte[] toBytes(Writable writable) {
+        try {
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            DataOutputStream out = new DataOutputStream(bout);
+            writable.write(out);
+            out.close();
+            bout.close();
+            return bout.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
new file mode 100644
index 0000000..a678e70
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ */
+public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+    
+    protected void bindCurrentConfiguration(Configuration conf) {
+        HadoopUtil.setCurrentConfiguration(conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
new file mode 100644
index 0000000..846c849
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/KylinReducer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ */
+public class KylinReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+    protected void bindCurrentConfiguration(Configuration conf) {
+        HadoopUtil.setCurrentConfiguration(conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/test/java/org/apache/kylin/engine/mr/LookupTableTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/LookupTableTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/LookupTableTest.java
new file mode 100644
index 0000000..87f9133
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/LookupTableTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.io.File;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.dict.lookup.LookupBytesTable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ */
+public class LookupTableTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testBasic() throws Exception {
+        TableDesc siteTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("EDW.TEST_SITES");
+        TableDesc categoryTable = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.test_category_groupings");
+        LookupBytesTable lookup;
+
+        System.out.println("============================================================================");
+
+        File f = new File(LOCALMETA_TEST_DATA + "/data/EDW.TEST_SITES.csv");
+        lookup = new LookupBytesTable(siteTable, new String[] { "SITE_ID" }, new DFSFileTable("file://" + f.getAbsolutePath(), 10));
+        lookup.dump();
+
+        System.out.println("============================================================================");
+
+        f = new File(LOCALMETA_TEST_DATA + "/data/DEFAULT.TEST_CATEGORY_GROUPINGS.csv");
+        lookup = new LookupBytesTable(categoryTable, new String[] { "leaf_categ_id", "site_id" }, new DFSFileTable("file://" + f.getAbsolutePath(), 36));
+        lookup.dump();
+
+        System.out.println("============================================================================");
+
+        ByteArray k1 = new ByteArray(Bytes.toBytes("533"));
+        ByteArray k2 = new ByteArray(Bytes.toBytes("0"));
+        Array<ByteArray> key = new Array<ByteArray>(new ByteArray[] { k1, k2 });
+        System.out.println(lookup.getRow(key));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
new file mode 100644
index 0000000..2db139a
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/TableReaderTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class TableReaderTest {
+
+    @Test
+    public void testBasicReader() throws IOException {
+        File f = new File("src/test/resources/dict/DW_SITES");
+        DFSFileTableReader reader = new DFSFileTableReader("file://" + f.getAbsolutePath(), DFSFileTable.DELIM_AUTO, 10);
+        while (reader.next()) {
+            assertEquals("[-1, Korea Auction.co.kr, S, 48, 0, 111, 2009-02-11, , DW_OFFPLAT, ]", Arrays.toString(reader.getRow()));
+            break;
+        }
+        reader.close();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/invertedindex/pom.xml
----------------------------------------------------------------------
diff --git a/invertedindex/pom.xml b/invertedindex/pom.xml
index 2fc0220..2eec2ce 100644
--- a/invertedindex/pom.xml
+++ b/invertedindex/pom.xml
@@ -30,18 +30,15 @@
 
     <dependencies>
 
+        <!--Kylin Jar -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-common</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
+            <artifactId>kylin-core-metadata</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
-
-        <!--Kylin Jar -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-metadata</artifactId>
+            <artifactId>kylin-core-dictionary</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
@@ -51,48 +48,18 @@
         </dependency>
 
         <dependency>
-            <groupId>commons-cli</groupId>
-            <artifactId>commons-cli</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-lang</groupId>
-            <artifactId>commons-lang</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-configuration</groupId>
-            <artifactId>commons-configuration</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-databind</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>commons-httpclient</groupId>
-            <artifactId>commons-httpclient</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.ning</groupId>
-            <artifactId>compress-lzf</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.n3twork.druid</groupId>
             <artifactId>extendedset</artifactId>
         </dependency>
 
         <!-- Env & Test -->
-
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <version>${project.parent.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index faf9079..a8a2244 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -18,25 +18,29 @@
 
 package org.apache.kylin.invertedindex;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonManagedReference;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.metadata.realization.SQLDigest;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonManagedReference;
+import com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
  * @author honma

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f37095c6/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index f478caf..547ff15 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -20,6 +20,7 @@ package org.apache.kylin.invertedindex;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -30,6 +31,7 @@ import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.DistinctColumnValuesProvider;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -130,7 +132,7 @@ public class IIManager implements IRealizationProvider {
         return result;
     }
 
-    public void buildInvertedIndexDictionary(IISegment iiSeg, String factColumnsPath) throws IOException {
+    public void buildInvertedIndexDictionary(IISegment iiSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
         logger.info("Start building ii dictionary");
         DictionaryManager dictMgr = getDictionaryManager();
         IIDesc iiDesc = iiSeg.getIIInstance().getDescriptor();
@@ -140,7 +142,7 @@ public class IIManager implements IRealizationProvider {
                 continue;
             }
 
-            DictionaryInfo dict = dictMgr.buildDictionary(iiDesc.getModel(), "true", column, factColumnsPath);
+            DictionaryInfo dict = dictMgr.buildDictionary(iiDesc.getModel(), "true", column, factTableValueProvider);
             iiSeg.putDictResPath(column, dict.getResourcePath());
         }
         updateII(iiSeg.getIIInstance());