You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/15 08:33:44 UTC

[1/3] incubator-kylin git commit: KYLIN-877 Abstract Input Source interface and make hive a default impl

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 111d8d206 -> d109e27bb


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
new file mode 100644
index 0000000..975d69f
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITSnapshotManagerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.*;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+import org.apache.kylin.source.TableSourceFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
+
+    SnapshotManager snapshotMgr;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        snapshotMgr = SnapshotManager.getInstance(getTestConfig());
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void basicTest() throws Exception {
+        String tableName = "EDW.TEST_SITES";
+        TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
+        ReadableTable hiveTable = TableSourceFactory.createReadableTable(tableDesc);
+        String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
+
+        snapshotMgr.wipeoutCache();
+
+        SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
+
+        // compare hive & snapshot
+        TableReader hiveReader = hiveTable.getReader();
+        TableReader snapshotReader = snapshot.getReader();
+
+        while (true) {
+            boolean hiveNext = hiveReader.next();
+            boolean snapshotNext = snapshotReader.next();
+            assertEquals(hiveNext, snapshotNext);
+
+            if (hiveNext == false)
+                break;
+
+            String[] hiveRow = hiveReader.getRow();
+            String[] snapshotRow = snapshotReader.getRow();
+            assertArrayEquals(hiveRow, snapshotRow);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index e1ab0af..0124b9b 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -24,7 +24,7 @@ import java.util.List;
  */
 public interface IJoinedFlatTableDesc {
 
-    public String getTableName(String jobUUID);
+    public String getTableName();
 
     public List<IntermediateColumnDesc> getColumnList();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java b/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
deleted file mode 100644
index fe5c2b3..0000000
--- a/metadata/src/main/java/org/apache/kylin/metadata/util/HiveSourceTableLoader.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.util;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.HiveClient;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-
-/**
- * Management class to sync hive table metadata with command See main method for
- * how to use the class
- *
- * @author jianliu
- */
-public class HiveSourceTableLoader {
-
-    @SuppressWarnings("unused")
-    private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
-
-    public static final String OUTPUT_SURFIX = "json";
-    public static final String TABLE_FOLDER_NAME = "table";
-    public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
-
-    public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
-
-        Map<String, Set<String>> db2tables = Maps.newHashMap();
-        for (String table : hiveTables) {
-            String[] parts = HadoopUtil.parseHiveTableName(table);
-            Set<String> set = db2tables.get(parts[0]);
-            if (set == null) {
-                set = Sets.newHashSet();
-                db2tables.put(parts[0], set);
-            }
-            set.add(parts[1]);
-        }
-
-        // extract from hive
-        Set<String> loadedTables = Sets.newHashSet();
-        for (String database : db2tables.keySet()) {
-            List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
-            loadedTables.addAll(loaded);
-        }
-
-        return loadedTables;
-    }
-
-    private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
-
-        List<String> loadedTables = Lists.newArrayList();
-        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-        for (String tableName : tables) {
-            Table table = null;
-            HiveClient hiveClient = new HiveClient();
-            List<FieldSchema> partitionFields = null;
-            List<FieldSchema> fields = null;
-            try {
-                table = hiveClient.getHiveTable(database, tableName);
-                partitionFields = table.getPartitionKeys();
-                fields = hiveClient.getHiveTableFields(database, tableName);
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new IOException(e);
-            }
-
-            if (fields != null && partitionFields != null && partitionFields.size() > 0) {
-                fields.addAll(partitionFields);
-            }
-
-            long tableSize = hiveClient.getFileSizeForTable(table);
-            long tableFileNum = hiveClient.getFileNumberForTable(table);
-            TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
-            if (tableDesc == null) {
-                tableDesc = new TableDesc();
-                tableDesc.setDatabase(database.toUpperCase());
-                tableDesc.setName(tableName.toUpperCase());
-                tableDesc.setUuid(UUID.randomUUID().toString());
-                tableDesc.setLastModified(0);
-            }
-
-            int columnNumber = fields.size();
-            List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
-            for (int i = 0; i < columnNumber; i++) {
-                FieldSchema field = fields.get(i);
-                ColumnDesc cdesc = new ColumnDesc();
-                cdesc.setName(field.getName().toUpperCase());
-                cdesc.setDatatype(field.getType());
-                cdesc.setId(String.valueOf(i + 1));
-                columns.add(cdesc);
-            }
-            tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
-
-            StringBuffer partitionColumnString = new StringBuffer();
-            for (int i = 0, n = partitionFields.size(); i < n; i++) {
-                if (i > 0)
-                    partitionColumnString.append(", ");
-                partitionColumnString.append(partitionFields.get(i).getName().toUpperCase());
-            }
-
-            Map<String, String> map = metaMgr.getTableDescExd(tableDesc.getIdentity());
-
-            if (map == null) {
-                map = Maps.newHashMap();
-            }
-            map.put(MetadataConstants.TABLE_EXD_TABLENAME, table.getTableName());
-            map.put(MetadataConstants.TABLE_EXD_LOCATION, table.getSd().getLocation());
-            map.put(MetadataConstants.TABLE_EXD_IF, table.getSd().getInputFormat());
-            map.put(MetadataConstants.TABLE_EXD_OF, table.getSd().getOutputFormat());
-            map.put(MetadataConstants.TABLE_EXD_OWNER, table.getOwner());
-            map.put(MetadataConstants.TABLE_EXD_LAT, String.valueOf(table.getLastAccessTime()));
-            map.put(MetadataConstants.TABLE_EXD_PC, partitionColumnString.toString());
-            map.put(MetadataConstants.TABLE_EXD_TFS, String.valueOf(tableSize));
-            map.put(MetadataConstants.TABLE_EXD_TNF, String.valueOf(tableFileNum));
-            map.put(MetadataConstants.TABLE_EXD_PARTITIONED, Boolean.valueOf(partitionFields != null && partitionFields.size() > 0).toString());
-
-            metaMgr.saveSourceTable(tableDesc);
-            metaMgr.saveTableExd(tableDesc.getIdentity(), map);
-            loadedTables.add(tableDesc.getIdentity());
-        }
-
-
-        return loadedTables;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/ITableSource.java b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
new file mode 100644
index 0000000..b2cd3b8
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/ITableSource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.metadata.model.TableDesc;
+
+public interface ITableSource {
+
+    public <I> I adaptToBuildEngine(Class<I> engineInterface);
+    
+    public ReadableTable createReadableTable(TableDesc tableDesc);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
new file mode 100644
index 0000000..19c7790
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/ReadableTable.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+
+/**
+ */
+public interface ReadableTable {
+
+    /** Returns a reader to read the table. */
+    public TableReader getReader() throws IOException;
+
+    /** Used to detect table modifications mainly. Return null in case table does not exist. */
+    public TableSignature getSignature() throws IOException;
+
+    public interface TableReader extends Closeable {
+
+        /** Move to the next row, return false if no more record. */
+        public boolean next() throws IOException;
+
+        /** Get the current row. */
+        public String[] getRow();
+        
+    }
+    
+    // ============================================================================
+    
+    @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+    public class TableSignature {
+
+        @JsonProperty("path")
+        private String path;
+        @JsonProperty("size")
+        private long size;
+        @JsonProperty("last_modified_time")
+        private long lastModifiedTime;
+
+        // for JSON serialization
+        public TableSignature() {
+        }
+
+        public TableSignature(String path, long size, long lastModifiedTime) {
+            super();
+            this.path = path;
+            this.size = size;
+            this.lastModifiedTime = lastModifiedTime;
+        }
+
+        public void setPath(String path) {
+            this.path = path;
+        }
+
+        public void setSize(long size) {
+            this.size = size;
+        }
+
+        public void setLastModifiedTime(long lastModifiedTime) {
+            this.lastModifiedTime = lastModifiedTime;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public long getLastModifiedTime() {
+            return lastModifiedTime;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
+            result = prime * result + ((path == null) ? 0 : path.hashCode());
+            result = prime * result + (int) (size ^ (size >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            TableSignature other = (TableSignature) obj;
+            if (lastModifiedTime != other.lastModifiedTime)
+                return false;
+            if (path == null) {
+                if (other.path != null)
+                    return false;
+            } else if (!path.equals(other.path))
+                return false;
+            if (size != other.size)
+                return false;
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
new file mode 100644
index 0000000..ec75c44
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/source/TableSourceFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source;
+
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.metadata.model.TableDesc;
+
+public class TableSourceFactory {
+
+    private static ITableSource dft = (ITableSource) ClassUtil.newInstance("org.apache.kylin.source.hive.HiveTableSource");
+    
+    public static ReadableTable createReadableTable(TableDesc table) {
+        return dft.createReadableTable(table);
+    }
+    
+    public static <T> T createEngineAdapter(TableDesc table, Class<T> engineInterface) {
+        return dft.adaptToBuildEngine(engineInterface);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java b/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
deleted file mode 100644
index f357855..0000000
--- a/metadata/src/test/java/org/apache/kylin/metadata/tool/ITHiveSourceTableLoaderTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metadata.tool;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.metadata.util.HiveSourceTableLoader;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
-
-    @Before
-    public void setup() throws Exception {
-        super.createTestMetadata();
-    }
-
-    @After
-    public  void after() throws Exception {
-        super.cleanupTestMetadata();
-    }
-
-    @Test
-    public void test() throws IOException {
-        if (!useSandbox())
-            return;
-
-        KylinConfig config = getTestConfig();
-        String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
-        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
-
-        assertTrue(loaded.size() == toLoad.length);
-        for (String str : toLoad)
-            assertTrue(loaded.contains(str));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 92ea678..6dc9fb7 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -56,13 +56,13 @@ import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.metadata.util.HiveSourceTableLoader;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.MetricsRequest;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.response.MetricsResponse;
 import org.apache.kylin.rest.security.AclPermission;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;


[3/3] incubator-kylin git commit: KYLIN-877 Abstract Input Source interface and make hive a default impl

Posted by li...@apache.org.
KYLIN-877 Abstract Input Source interface and make hive a default impl


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d109e27b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d109e27b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d109e27b

Branch: refs/heads/0.8
Commit: d109e27bb37f0dac6bdc318d472b1f739c7df794
Parents: 111d8d2
Author: Li, Yang <ya...@ebay.com>
Authored: Wed Jul 15 14:32:14 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Jul 15 14:33:11 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeBuilder.java |   2 -
 .../java/org/apache/kylin/cube/CubeManager.java |  38 ++--
 .../org/apache/kylin/cube/model/CubeDesc.java   |   2 +-
 .../cube/model/CubeJoinedFlatTableDesc.java     |   5 +-
 .../apache/kylin/dict/DictionaryGenerator.java  |   4 +-
 .../org/apache/kylin/dict/DictionaryInfo.java   |   2 +-
 .../apache/kylin/dict/DictionaryManager.java    |   8 +-
 .../org/apache/kylin/dict/lookup/FileTable.java |   1 +
 .../kylin/dict/lookup/FileTableReader.java      |   2 +-
 .../org/apache/kylin/dict/lookup/HiveTable.java | 100 -----------
 .../kylin/dict/lookup/HiveTableReader.java      | 175 ------------------
 .../kylin/dict/lookup/LookupBytesTable.java     |   2 +-
 .../kylin/dict/lookup/LookupStringTable.java    |   1 +
 .../apache/kylin/dict/lookup/LookupTable.java   |   3 +-
 .../apache/kylin/dict/lookup/ReadableTable.java | 133 --------------
 .../apache/kylin/dict/lookup/SnapshotCLI.java   |   3 +-
 .../kylin/dict/lookup/SnapshotManager.java      |   3 +-
 .../apache/kylin/dict/lookup/SnapshotTable.java |   2 +
 .../kylin/dict/ITHiveTableReaderTest.java       |  51 ------
 .../kylin/dict/ITSnapshotManagerTest.java       |  81 ---------
 .../model/IIJoinedFlatTableDesc.java            |   4 +-
 .../apache/kylin/engine/BuildEngineFactory.java |   1 -
 .../kylin/engine/mr/BatchCubingJobBuilder.java  |  23 +--
 .../kylin/engine/mr/GarbageCollectionStep.java  |   1 +
 .../org/apache/kylin/engine/mr/IMRInput.java    |  26 ++-
 .../kylin/engine/mr/JobBuilderSupport.java      |  36 +---
 .../kylin/engine/mr/MRBatchCubingEngine.java    |  27 +++
 .../apache/kylin/job/AbstractJobBuilder.java    |  64 -------
 .../org/apache/kylin/job/JoinedFlatTable.java   |  18 +-
 .../kylin/job/hadoop/AbstractHadoopJob.java     |  81 +++++----
 .../cardinality/ColumnCardinalityMapper.java    |  48 ++---
 .../cardinality/HiveColumnCardinalityJob.java   |  15 +-
 .../job/hadoop/cube/FactDistinctColumnsJob.java |  25 ++-
 .../cube/FactDistinctColumnsMapperBase.java     |  23 ++-
 .../cube/FactDistinctHiveColumnsMapper.java     |  48 +++--
 .../cube/FactDistinctIIColumnsMapper.java       | 126 -------------
 .../job/hadoop/cube/NewBaseCuboidMapper.java    |  21 +--
 .../kylin/job/hadoop/cubev2/InMemCuboidJob.java |  24 +--
 .../job/hadoop/cubev2/InMemCuboidMapper.java    |  17 +-
 .../hadoop/invertedindex/IIFlattenHiveJob.java  |  94 ----------
 .../kylin/job/invertedindex/IIJobBuilder.java   |   6 +-
 .../kylin/job/streaming/CubeStreamConsumer.java |   2 +-
 .../org/apache/kylin/source/ITableSource.java   |  24 ---
 .../apache/kylin/source/hive/HiveMRInput.java   | 150 +++++++++++++++-
 .../source/hive/HiveSourceTableLoader.java      | 155 ++++++++++++++++
 .../org/apache/kylin/source/hive/HiveTable.java | 100 +++++++++++
 .../kylin/source/hive/HiveTableReader.java      | 176 +++++++++++++++++++
 .../kylin/source/hive/HiveTableSource.java      |   7 +
 .../apache/kylin/job/BuildIIWithStreamTest.java |  45 +++--
 .../job/hadoop/cube/MergeCuboidMapperTest.java  |   2 +-
 .../job/hadoop/hive/JoinedFlatTableTest.java    |   6 +-
 .../kylin/job/hadoop/invertedindex/IITest.java  |  67 +++----
 .../job/tools/ColumnCardinalityJobTest.java     |  63 -------
 .../job/tools/ColumnCardinalityMapperTest.java  | 126 -------------
 .../job/tools/ColumnCardinalityReducerTest.java |  12 +-
 .../hive/ITHiveSourceTableLoaderTest.java       |  59 +++++++
 .../source/hive/ITHiveTableReaderTest.java      |  51 ++++++
 .../source/hive/ITSnapshotManagerTest.java      |  82 +++++++++
 .../metadata/model/IJoinedFlatTableDesc.java    |   2 +-
 .../metadata/util/HiveSourceTableLoader.java    | 155 ----------------
 .../org/apache/kylin/source/ITableSource.java   |  28 +++
 .../org/apache/kylin/source/ReadableTable.java  | 133 ++++++++++++++
 .../apache/kylin/source/TableSourceFactory.java |  35 ++++
 .../tool/ITHiveSourceTableLoaderTest.java       |  59 -------
 .../apache/kylin/rest/service/CubeService.java  |   2 +-
 65 files changed, 1308 insertions(+), 1579 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java b/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
index 538f9c9..a828252 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
@@ -20,8 +20,6 @@ package org.apache.kylin.cube;
 
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 
-import java.util.List;
-
 /**
  */
 public class CubeBuilder {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index ff4a0ae..90fade3 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -18,11 +18,19 @@
 
 package org.apache.kylin.cube;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.Nullable;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -36,7 +44,6 @@ import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.HiveTable;
 import org.apache.kylin.dict.lookup.LookupStringTable;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
@@ -46,14 +53,21 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.realization.*;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.metadata.realization.IRealizationProvider;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.TableSourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 
 /**
  * @author yangli9
@@ -192,8 +206,8 @@ public class CubeManager implements IRealizationProvider {
         MetadataManager metaMgr = getMetadataManager();
         SnapshotManager snapshotMgr = getSnapshotManager();
 
-        HiveTable hiveTable = new HiveTable(metaMgr, lookupTable);
         TableDesc tableDesc = metaMgr.getTableDesc(lookupTable);
+        ReadableTable hiveTable = TableSourceFactory.createReadableTable(tableDesc);
         SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
 
         cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 24f9db4..56bb78f 100644
--- a/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -326,7 +326,7 @@ public class CubeDesc extends RootPersistentEntity {
     public String getFactTable() {
         return model.getFactTable().toUpperCase();
     }
-
+    
     public String[] getNullStrings() {
         return nullStrings;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 0a8b8ed..960ddca 100644
--- a/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -11,7 +11,6 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * @author George Song (ysong1)
  */
 public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
 
@@ -127,8 +126,8 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     }
 
     @Override
-    public String getTableName(String jobUUID) {
-        return tableName + "_" + jobUUID.replace("-", "_");
+    public String getTableName() {
+        return tableName;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 60b2a8d..d022885 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -31,9 +31,9 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
 import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index 802f6e7..2eca3ea 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
+import org.apache.kylin.source.ReadableTable.TableSignature;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class DictionaryInfo extends RootPersistentEntity {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 2e1e3db..611251a 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -35,13 +35,13 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.dict.lookup.FileTable;
-import org.apache.kylin.dict.lookup.HiveTable;
-import org.apache.kylin.dict.lookup.ReadableTable;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.apache.kylin.source.TableSourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -272,7 +272,7 @@ public class DictionaryManager {
             if (model.isFactTable(col.getTable())) {
                 table = new FileTable(factColumnsPath + "/" + col.getName(), -1);
             } else {
-                table = new HiveTable(metaMgr, col.getTable());
+                table = TableSourceFactory.createReadableTable(metaMgr.getTableDesc(col.getTable()));
             }
         }
         // otherwise could refer to a data set, e.g. common_indicators.txt

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
index 92f0c10..59eca4a 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTable.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.source.ReadableTable;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
index bf46963..4e04c93 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/FileTableReader.java
@@ -39,7 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
+import org.apache.kylin.source.ReadableTable.TableReader;
 
 /**
  * Tables are typically CSV or SEQ file.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
deleted file mode 100644
index 68e9b82..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HiveClient;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class HiveTable implements ReadableTable {
-
-    private static final Logger logger = LoggerFactory.getLogger(HiveTable.class);
-
-    final private String database;
-    final private String hiveTable;
-    
-    private HiveClient hiveClient;
-
-    public HiveTable(MetadataManager metaMgr, String table) {
-        TableDesc tableDesc = metaMgr.getTableDesc(table);
-        this.database = tableDesc.getDatabase();
-        this.hiveTable = tableDesc.getName();
-    }
-
-    @Override
-    public TableReader getReader() throws IOException {
-        return new HiveTableReader(database, hiveTable);
-    }
-
-    @Override
-    public TableSignature getSignature() throws IOException {
-        try {
-            String path = computeHDFSLocation();
-            Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path);
-            long size = sizeAndLastModified.getFirst();
-            long lastModified = sizeAndLastModified.getSecond();
-
-            // for non-native hive table, cannot rely on size & last modified on HDFS
-            if (getHiveClient().isNativeTable(database, hiveTable) == false) {
-                lastModified = System.currentTimeMillis(); // assume table is ever changing
-            }
-
-            return new TableSignature(path, size, lastModified);
-
-        } catch (Exception e) {
-            if (e instanceof IOException)
-                throw (IOException) e;
-            else
-                throw new IOException(e);
-        }
-    }
-
-    private String computeHDFSLocation() throws Exception {
-
-        String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable);
-        if (override != null) {
-            logger.debug("Override hive table location " + hiveTable + " -- " + override);
-            return override;
-        }
-
-        return getHiveClient().getHiveTableLocation(database, hiveTable);
-    }
-
-    public HiveClient getHiveClient() {
-
-        if (hiveClient == null) {
-            hiveClient = new HiveClient();
-        }
-        return hiveClient;
-    }
-
-    @Override
-    public String toString() {
-        return "hive: database=[" + database + "], table=[" + hiveTable + "]";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
deleted file mode 100644
index 96aa0d1..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTableReader.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
-import org.apache.hive.hcatalog.data.transfer.HCatReader;
-import org.apache.hive.hcatalog.data.transfer.ReadEntity;
-import org.apache.hive.hcatalog.data.transfer.ReaderContext;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
-
-/**
- * An implementation of TableReader with HCatalog for Hive table.
- */
-public class HiveTableReader implements TableReader {
-
-    private String dbName;
-    private String tableName;
-    private int currentSplit = -1;
-    private ReaderContext readCntxt = null;
-    private Iterator<HCatRecord> currentHCatRecordItr = null;
-    private HCatRecord currentHCatRecord;
-    private int numberOfSplits = 0;
-    private Map<String, String> partitionKV = null;
-
-    /**
-     * Constructor for reading whole hive table
-     * @param dbName
-     * @param tableName
-     * @throws IOException
-     */
-    public HiveTableReader(String dbName, String tableName) throws IOException {
-        this(dbName, tableName, null);
-    }
-
-    /**
-     * Constructor for reading a partition of the hive table
-     * @param dbName
-     * @param tableName
-     * @param partitionKV key-value pairs condition on the partition
-     * @throws IOException
-     */
-    public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV) throws IOException {
-        this.dbName = dbName;
-        this.tableName = tableName;
-        this.partitionKV = partitionKV;
-        initialize();
-    }
-
-    private void initialize() throws IOException {
-        try {
-            this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new IOException(e);
-        }
-
-        this.numberOfSplits = readCntxt.numSplits();
-
-//        HCatTableInfo tableInfo = HCatTableInfo.
-//        HCatSchema schema = HCatBaseInputFormat.getTableSchema(context.getConfiguration);
-    }
-
-    @Override
-    public boolean next() throws IOException {
-
-        while (currentHCatRecordItr == null || !currentHCatRecordItr.hasNext()) {
-            currentSplit++;
-            if (currentSplit == numberOfSplits) {
-                return false;
-            }
-
-            currentHCatRecordItr = loadHCatRecordItr(readCntxt, currentSplit);
-        }
-
-        currentHCatRecord = currentHCatRecordItr.next();
-
-        return true;
-    }
-
-    @Override
-    public String[] getRow() {
-        return getRowAsStringArray(currentHCatRecord);
-    }
-
-    public List<String> getRowAsList() {
-        return getRowAsList(currentHCatRecord);
-    }
-
-    public static List<String> getRowAsList(HCatRecord record) {
-        List<String> rowValues = new ArrayList<String>(record.size());
-        return getRowAsList(record, rowValues);
-    }
-
-    public static List<String> getRowAsList(HCatRecord record, List<String> rowValues) {
-        List<Object> allFields = record.getAll();
-        for (Object o : allFields) {
-            rowValues.add(o != null ? o.toString() : null);
-        }
-
-        return rowValues;
-    }
-
-    public static String[] getRowAsStringArray(HCatRecord record) {
-        List<String> row = getRowAsList(record);
-
-        return row.toArray(new String[row.size()]);
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.readCntxt = null;
-        this.currentHCatRecordItr = null;
-        this.currentHCatRecord = null;
-        this.currentSplit = -1;
-    }
-
-    public String toString() {
-        return "hive table reader for: " + dbName + "." + tableName;
-    }
-
-    private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
-        HiveConf hiveConf = new HiveConf(HiveTableReader.class);
-        Iterator<Entry<String, String>> itr = hiveConf.iterator();
-        Map<String, String> map = new HashMap<String, String>();
-        while (itr.hasNext()) {
-            Entry<String, String> kv = itr.next();
-            map.put(kv.getKey(), kv.getValue());
-        }
-
-        ReadEntity entity;
-        if (partitionKV == null || partitionKV.size() == 0) {
-            entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
-        } else {
-            entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
-        }
-
-        HCatReader reader = DataTransferFactory.getHCatReader(entity, map);
-        ReaderContext cntxt = reader.prepareRead();
-
-        return cntxt;
-    }
-
-    private static Iterator<HCatRecord> loadHCatRecordItr(ReaderContext readCntxt, int dataSplit) throws HCatException {
-        HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit);
-
-        return currentHCatReader.read();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
index ee71e78..c5a75f5 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupBytesTable.java
@@ -21,9 +21,9 @@ package org.apache.kylin.dict.lookup;
 import java.io.IOException;
 
 import org.apache.kylin.common.util.Bytes;
-
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
index f9b9779..2d92d68 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup;
 import java.io.IOException;
 
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index 7e83197..fd0c37f 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -30,8 +30,9 @@ import org.apache.kylin.common.util.Pair;
 import com.google.common.collect.Sets;
 
 import org.apache.kylin.common.util.Array;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
 
 /**
  * An in-memory lookup table, in which each cell is an object of type T. The

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
deleted file mode 100644
index 2e6af14..0000000
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/ReadableTable.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-
-/**
- */
-public interface ReadableTable {
-
-    /** Returns a reader to read the table. */
-    public TableReader getReader() throws IOException;
-
-    /** Used to detect table modifications mainly. Return null in case table does not exist. */
-    public TableSignature getSignature() throws IOException;
-
-    public interface TableReader extends Closeable {
-
-        /** Move to the next row, return false if no more record. */
-        public boolean next() throws IOException;
-
-        /** Get the current row. */
-        public String[] getRow();
-        
-    }
-    
-    // ============================================================================
-    
-    @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-    public class TableSignature {
-
-        @JsonProperty("path")
-        private String path;
-        @JsonProperty("size")
-        private long size;
-        @JsonProperty("last_modified_time")
-        private long lastModifiedTime;
-
-        // for JSON serialization
-        public TableSignature() {
-        }
-
-        public TableSignature(String path, long size, long lastModifiedTime) {
-            super();
-            this.path = path;
-            this.size = size;
-            this.lastModifiedTime = lastModifiedTime;
-        }
-
-        public void setPath(String path) {
-            this.path = path;
-        }
-
-        public void setSize(long size) {
-            this.size = size;
-        }
-
-        public void setLastModifiedTime(long lastModifiedTime) {
-            this.lastModifiedTime = lastModifiedTime;
-        }
-
-        public String getPath() {
-            return path;
-        }
-
-        public long getSize() {
-            return size;
-        }
-
-        public long getLastModifiedTime() {
-            return lastModifiedTime;
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + (int) (lastModifiedTime ^ (lastModifiedTime >>> 32));
-            result = prime * result + ((path == null) ? 0 : path.hashCode());
-            result = prime * result + (int) (size ^ (size >>> 32));
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null)
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            TableSignature other = (TableSignature) obj;
-            if (lastModifiedTime != other.lastModifiedTime)
-                return false;
-            if (path == null) {
-                if (other.path != null)
-                    return false;
-            } else if (!path.equals(other.path))
-                return false;
-            if (size != other.size)
-                return false;
-            return true;
-        }
-
-        @Override
-        public String toString() {
-            return "FileSignature [path=" + path + ", size=" + size + ", lastModifiedTime=" + lastModifiedTime + "]";
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
index bb0f9ce..01c4fbd 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotCLI.java
@@ -5,6 +5,7 @@ import java.io.IOException;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
 
 public class SnapshotCLI {
 
@@ -22,7 +23,7 @@ public class SnapshotCLI {
         if (tableDesc == null)
             throw new IllegalArgumentException("Not table found by " + table);
         
-        SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(new HiveTable(metaMgr, table), tableDesc, overwriteUUID);
+        SnapshotTable snapshot = snapshotMgr.rebuildSnapshot(TableSourceFactory.createReadableTable(tableDesc), tableDesc, overwriteUUID);
         System.out.println("resource path updated: " + snapshot.getResourcePath());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index e9d74bb..7822154 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -26,9 +26,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableSignature;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index aa46212..e2205b9 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.dict.lookup;
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -30,6 +31,7 @@ import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.dict.TrieDictionary;
 import org.apache.kylin.dict.TrieDictionaryBuilder;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
 
 import java.io.DataInput;
 import java.io.DataOutput;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/test/java/org/apache/kylin/dict/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/ITHiveTableReaderTest.java b/dictionary/src/test/java/org/apache/kylin/dict/ITHiveTableReaderTest.java
deleted file mode 100644
index 8559f8b..0000000
--- a/dictionary/src/test/java/org/apache/kylin/dict/ITHiveTableReaderTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.dict.lookup.HiveTableReader;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-
-/**
- * This test case need the hive runtime; Please run it with sandbox;
- * @author shaoshi
- *
- * It is in the exclude list of default profile in pom.xml
- */
-public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
-
-
-    @Test
-    public void test() throws IOException {
-        HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
-        int rowNumber = 0;
-        while (reader.next()) {
-            String[] row = reader.getRow();
-            Assert.assertEquals(9, row.length);
-            //System.out.println(ArrayUtils.toString(row));
-            rowNumber++;
-        }
-
-        reader.close();
-        Assert.assertEquals(10000, rowNumber);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java b/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
deleted file mode 100644
index 7fc37a6..0000000
--- a/dictionary/src/test/java/org/apache/kylin/dict/ITSnapshotManagerTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import static org.junit.Assert.*;
-
-import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.dict.lookup.HiveTable;
-import org.apache.kylin.dict.lookup.ReadableTable.TableReader;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * @author yangli9
- * 
- */
-public class ITSnapshotManagerTest extends HBaseMetadataTestCase {
-
-    SnapshotManager snapshotMgr;
-
-    @Before
-    public void setup() throws Exception {
-        createTestMetadata();
-        snapshotMgr = SnapshotManager.getInstance(getTestConfig());
-    }
-
-    @After
-    public void after() throws Exception {
-        cleanupTestMetadata();
-    }
-
-    @Test
-    public void basicTest() throws Exception {
-        String tableName = "EDW.TEST_SITES";
-        HiveTable hiveTable = new HiveTable(MetadataManager.getInstance(getTestConfig()), tableName);
-        TableDesc tableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc(tableName);
-        String snapshotPath = snapshotMgr.buildSnapshot(hiveTable, tableDesc).getResourcePath();
-
-        snapshotMgr.wipeoutCache();
-
-        SnapshotTable snapshot = snapshotMgr.getSnapshotTable(snapshotPath);
-
-        // compare hive & snapshot
-        TableReader hiveReader = hiveTable.getReader();
-        TableReader snapshotReader = snapshot.getReader();
-
-        while (true) {
-            boolean hiveNext = hiveReader.next();
-            boolean snapshotNext = snapshotReader.next();
-            assertEquals(hiveNext, snapshotNext);
-
-            if (hiveNext == false)
-                break;
-
-            String[] hiveRow = hiveReader.getRow();
-            String[] snapshotRow = snapshotReader.getRow();
-            assertArrayEquals(hiveRow, snapshotRow);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
index b01ffa7..4ee1ccb 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIJoinedFlatTableDesc.java
@@ -51,8 +51,8 @@ public class IIJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     }
 
     @Override
-    public String getTableName(String jobUUID) {
-        return tableName + "_" + jobUUID.replace("-", "_");
+    public String getTableName() {
+        return tableName + "_" + "II_Flat";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index 0e31f6b..c536deb 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -36,5 +36,4 @@ public class BuildEngineFactory {
         return defaultBatch.createBatchMergeJob(mergeSegment, submitter);
     }
     
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 8f16775..c1524e7 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
 import org.apache.kylin.job.common.HadoopShellExecutable;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -31,17 +32,12 @@ import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
 import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
 
 public class BatchCubingJobBuilder extends JobBuilderSupport {
-    
-    public static interface IMRBuildInputParticipant {
-        
-    }
-    
-    public static interface IMRBuildOutputParticipant {
-        
-    }
+
+    private final IMRBatchCubingInputSide inputSide;
     
     public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
         super(newSegment, submitter);
+        this.inputSide = MRBatchCubingEngine.getBatchCubingInputSide(seg);
     }
 
     public CubingJob build() {
@@ -50,7 +46,8 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         final String cuboidRootPath = getCuboidRootPath(jobId);
         final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
 
-        result.addTask(createFlatHiveTableStep(flatHiveTableDesc, jobId));
+        // Phase 1: Create Flat Table
+        inputSide.addStepPhase1_CreateFlatTable(result);
         
         result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
         result.addTask(createBuildDictionaryStep(jobId));
@@ -85,9 +82,9 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
             result.addTask(createBulkLoadStep(jobId));
         }
 
+        // Phase 4: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
-
-        result.addTask(createGarbageCollectionStep(null, flatHiveTableDesc.getTableName(jobId)));
+        inputSide.addStepPhase4_UpdateMetadataAndCleanup(result);
 
         return result;
     }
@@ -105,7 +102,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
         appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
         appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
-        appendExecCmdParameters(cmd, "tablename", flatHiveTableDesc.getTableName(jobId));
 
         result.setMapReduceParams(cmd.toString());
         return result;
@@ -192,7 +188,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
         appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
         appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
         appendExecCmdParameters(cmd, "level", "0");
-        appendExecCmdParameters(cmd, "tablename", flatHiveTableDesc.getTableName(jobId));
         appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
@@ -211,7 +206,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
     }
     
     private String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName(jobId);
+        return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
     }
     
     private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
index 2f7bf21..d79f35d 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
@@ -44,6 +44,7 @@ import com.google.common.collect.Lists;
 /**
  * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
  */
+@Deprecated // only exists for backward compatibility
 public class GarbageCollectionStep extends AbstractExecutable {
 
     private static final String OLD_HTABLES = "oldHTables";

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 3097a4b..c170b47 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -18,8 +18,30 @@
 
 package org.apache.kylin.engine.mr;
 
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.TableDesc;
+
 public interface IMRInput {
-    
-    public IMRJobFlowParticipant createBuildFlowParticipant();
 
+    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg);
+
+    public IMRTableInputFormat getTableInputFormat(TableDesc table);
+    
+    public interface IMRTableInputFormat {
+        
+        public void configureJob(Job job);
+        
+        public String[] parseMapperInput(Object mapperInput);
+    }
+    
+    public interface IMRBatchCubingInputSide {
+        
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+        
+        public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow);
+        
+        public IMRTableInputFormat getFlatTableInputFormat();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index b1f3c4e..79430f6 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -23,18 +23,14 @@ import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.common.HadoopShellExecutable;
 import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
 import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
 import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
 import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 
 import com.google.common.base.Preconditions;
 
@@ -54,10 +50,6 @@ abstract public class JobBuilderSupport {
         this.submitter = submitter;
     }
     
-    protected AbstractExecutable createFlatHiveTableStep(IJoinedFlatTableDesc flatTableDesc, String jobId) {
-        return createFlatHiveTableStep(config, flatTableDesc, jobId);
-    }
-    
     protected MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
         MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
         rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
@@ -174,35 +166,9 @@ abstract public class JobBuilderSupport {
     }
 
     // ============================================================================
-    // static methods also shared by IIJobBuilder
+    // static methods also shared by other job flow participant
     // ----------------------------------------------------------------------------
 
-    public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
-
-        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc, jobId);
-        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, getJobWorkingDir(conf, jobId), jobId);
-        String insertDataHqls;
-        try {
-            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, jobId, conf);
-        } catch (IOException e1) {
-            e1.printStackTrace();
-            throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
-        }
-
-        ShellExecutable step = new ShellExecutable();
-        StringBuffer buf = new StringBuffer();
-        buf.append("hive -e \"");
-        buf.append(dropTableHql + "\n");
-        buf.append(createTableHql + "\n");
-        buf.append(insertDataHqls + "\n");
-        buf.append("\"");
-
-        step.setCmd(buf.toString());
-        step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-
-        return step;
-    }
-
     public static String getJobWorkingDir(JobEngineConfig conf, String jobId) {
         return conf.getHdfsWorkingDirectory() + "/" + "kylin-" + jobId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index 332f20e..b374a99 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -18,9 +18,15 @@
 
 package org.apache.kylin.engine.mr;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.IBatchCubingEngine;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.TableSourceFactory;
 
 public class MRBatchCubingEngine implements IBatchCubingEngine {
 
@@ -33,5 +39,26 @@ public class MRBatchCubingEngine implements IBatchCubingEngine {
     public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
         return new BatchMergeJobBuilder(mergeSegment, submitter).build();
     }
+    
+    public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+        TableDesc tableDesc = getTableDesc(seg.getCubeDesc().getFactTable());
+        return getMRInput(tableDesc).getBatchCubingInputSide(seg);
+    }
+
+    public static IMRTableInputFormat getTableInputFormat(String tableName) {
+        return getTableInputFormat(getTableDesc(tableName));
+    }
+
+    public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
+        return getMRInput(tableDesc).getTableInputFormat(tableDesc);
+    }
+
+    private static IMRInput getMRInput(TableDesc tableDesc) {
+        return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class);
+    }
+
+    private static TableDesc getTableDesc(String tableName) {
+        return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
deleted file mode 100644
index 48766b7..0000000
--- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.IOException;
-
-import org.apache.kylin.job.common.ShellExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-public abstract class AbstractJobBuilder {
-
-    protected static final String JOB_WORKING_DIR_PREFIX = "kylin-";
-
-    protected JobEngineConfig engineConfig;
-    protected String submitter;
-    
-    public AbstractJobBuilder(JobEngineConfig engineConfig) {
-        this.engineConfig = engineConfig;
-    }
-
-    public AbstractJobBuilder setSubmitter(String submitter) {
-        this.submitter = submitter;
-        return this;
-    }
-
-    public String getSubmitter() {
-        return submitter;
-    }
-
-    protected StringBuilder appendExecCmdParameters(StringBuilder cmd, String paraName, String paraValue) {
-        return cmd.append(" -").append(paraName).append(" ").append(paraValue);
-    }
-
-    protected String getIntermediateHiveTableName(IJoinedFlatTableDesc intermediateTableDesc, String jobUuid) {
-        return intermediateTableDesc.getTableName(jobUuid);
-    }
-
-    protected String getIntermediateHiveTableLocation(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID) {
-        return getJobWorkingDir(jobUUID) + "/" + intermediateTableDesc.getTableName(jobUUID);
-    }
-
-    protected String getJobWorkingDir(String uuid) {
-        return engineConfig.getHdfsWorkingDirectory() + "/" + JOB_WORKING_DIR_PREFIX + uuid;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 283f462..eba6bd4 100644
--- a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -56,14 +56,14 @@ public class JoinedFlatTable {
 
     public static final String LOOKUP_TABLE_ALAIS_PREFIX = "LOOKUP_";
 
-    public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir, String jobUUID) {
-        return storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID);
+    public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
+        return storageDfsDir + "/" + intermediateTableDesc.getTableName();
     }
 
-    public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir, String jobUUID) {
+    public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
         StringBuilder ddl = new StringBuilder();
 
-        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName(jobUUID) + "\n");
+        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
 
         ddl.append("(" + "\n");
         for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
@@ -77,19 +77,19 @@ public class JoinedFlatTable {
 
         ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
         ddl.append("STORED AS SEQUENCEFILE" + "\n");
-        ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID) + "';").append("\n");
+        ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
         // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
         // ";\n");
         return ddl.toString();
     }
 
-    public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID) {
+    public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
         StringBuilder ddl = new StringBuilder();
-        ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName(jobUUID) + ";").append("\n");
+        ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
         return ddl.toString();
     }
 
-    public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, String jobUUID, JobEngineConfig engineConfig) throws IOException {
+    public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
         StringBuilder sql = new StringBuilder();
 
         File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
@@ -117,7 +117,7 @@ public class JoinedFlatTable {
             }
         }
 
-        sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName(jobUUID) + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
+        sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
 
         return sql.toString();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
index 919af67..7d69fd9 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
@@ -232,52 +232,44 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         }
     }
 
-    protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
-        File tmp = File.createTempFile("kylin_job_meta", "");
-        tmp.delete(); // we need a directory, so delete the file first
-
-        File metaDir = new File(tmp, "meta");
-        metaDir.mkdirs();
-        metaDir.getParentFile().deleteOnExit();
-
-        // write kylin.properties
+    public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
+        File metaDir = new File("meta");
+        System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
+        logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        File kylinPropsFile = new File(metaDir, "kylin.properties");
-        kylinConfig.writeProperties(kylinPropsFile);
+        kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
+        return kylinConfig;
+    }
 
+    protected void attachKylinPropsAndMetadata(TableDesc table, Configuration conf) throws IOException {
+        ArrayList<String> dumpList = new ArrayList<String>();
+        dumpList.add(table.getResourcePath());
+        attachKylinPropsAndMetadata(dumpList, conf);
+    }
+    
+    protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
+        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        
         // write cube / model_desc / cube_desc / dict / table
         ArrayList<String> dumpList = new ArrayList<String>();
         dumpList.add(cube.getResourcePath());
         dumpList.add(cube.getDescriptor().getModel().getResourcePath());
         dumpList.add(cube.getDescriptor().getResourcePath());
+        
         for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
+            TableDesc table = metaMgr.getTableDesc(tableName);
             dumpList.add(table.getResourcePath());
         }
-
         for (CubeSegment segment : cube.getSegments()) {
             dumpList.addAll(segment.getDictionaryPaths());
         }
-
-        dumpResources(kylinConfig, metaDir, dumpList);
-
-        // hadoop distributed cache
-        conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
+        
+        attachKylinPropsAndMetadata(dumpList, conf);
     }
 
     protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
-        File tmp = File.createTempFile("kylin_job_meta", "");
-        tmp.delete(); // we need a directory, so delete the file first
-
-        File metaDir = new File(tmp, "meta");
-        metaDir.mkdirs();
-        metaDir.getParentFile().deleteOnExit();
-
-        // write kylin.properties
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        File kylinPropsFile = new File(metaDir, "kylin.properties");
-        kylinConfig.writeProperties(kylinPropsFile);
-
+        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        
         // write II / model_desc / II_desc / dict / table
         ArrayList<String> dumpList = new ArrayList<String>();
         dumpList.add(ii.getResourcePath());
@@ -285,14 +277,30 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         dumpList.add(ii.getDescriptor().getResourcePath());
 
         for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
+            TableDesc table = metaMgr.getTableDesc(tableName);
             dumpList.add(table.getResourcePath());
         }
-
         for (IISegment segment : ii.getSegments()) {
             dumpList.addAll(segment.getDictionaryPaths());
         }
 
+        attachKylinPropsAndMetadata(dumpList, conf);
+    }
+
+    private void attachKylinPropsAndMetadata(ArrayList<String> dumpList, Configuration conf) throws IOException {
+        File tmp = File.createTempFile("kylin_job_meta", "");
+        tmp.delete(); // we need a directory, so delete the file first
+
+        File metaDir = new File(tmp, "meta");
+        metaDir.mkdirs();
+        metaDir.getParentFile().deleteOnExit();
+
+        // write kylin.properties
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        File kylinPropsFile = new File(metaDir, "kylin.properties");
+        kylinConfig.writeProperties(kylinPropsFile);
+
+        // write resources
         dumpResources(kylinConfig, metaDir, dumpList);
 
         // hadoop distributed cache
@@ -346,15 +354,6 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         return input.getSplits(job).size();
     }
 
-    public static KylinConfig loadKylinPropsAndMetadata() throws IOException {
-        File metaDir = new File("meta");
-        System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
-        logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
-        return kylinConfig;
-    }
-
     public void kill() throws JobException {
         if (job != null) {
             try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index b32baf9..8da6cde 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -24,52 +24,60 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.kylin.common.util.Bytes;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.mr.KylinMapper;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
 
 /**
  * @author Jack
  * 
  */
-public class ColumnCardinalityMapper<T> extends KylinMapper<T, HCatRecord, IntWritable, BytesWritable> {
+public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritable, BytesWritable> {
 
     private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
     public static final String DEFAULT_DELIM = ",";
 
     private int counter = 0;
     
-    private HCatSchema schema = null;
-    private int columnSize = 0;
+    private TableDesc tableDesc;
+    private IMRTableInputFormat tableInputFormat;
     
     @Override
     protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
-        columnSize = schema.getFields().size();
+        Configuration conf = context.getConfiguration();
+        bindCurrentConfiguration(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        
+        String tableName = conf.get(BatchConstants.TABLE_NAME);
+        tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName);
+        tableInputFormat = MRBatchCubingEngine.getTableInputFormat(tableDesc);
     }
 
     @Override
-    public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException {
-
-        HCatFieldSchema field;
-        Object fieldValue;
-        for (int m = 0; m < columnSize; m++) {
-            field = schema.get(m);
-            fieldValue = value.get(field.getName(), schema);
+    public void map(T key, Object value, Context context) throws IOException, InterruptedException {
+        ColumnDesc[] columns = tableDesc.getColumns();
+        String[] values = tableInputFormat.parseMapperInput(value);
+        
+        for (int m = 0; m < columns.length; m++) {
+            String field = columns[m].getName();
+            String fieldValue = values[m];
             if (fieldValue == null)
                 fieldValue = "NULL";
             
             if (counter < 5 && m < 10) {
-                System.out.println("Get row " + counter + " column '" + field.getName() + "'  value: " + fieldValue);
+                System.out.println("Get row " + counter + " column '" + field + "'  value: " + fieldValue);
             }
 
             if (fieldValue != null)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
index abf3e2f..58fd509 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-
-import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 
 /**
@@ -69,16 +69,17 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob {
 
             setJobClasspath(job);
             
+            String table = getOptionValue(OPTION_TABLE);
+            job.getConfiguration().set(BatchConstants.TABLE_NAME, table);
+            
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             FileOutputFormat.setOutputPath(job, output);
             job.getConfiguration().set("dfs.block.size", "67108864");
 
             // Mapper
-            String table = getOptionValue(OPTION_TABLE);
-            String[] dbTableNames = HadoopUtil.parseHiveTableName(table);
-            HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
+            IMRTableInputFormat tableInputFormat = MRBatchCubingEngine.getTableInputFormat(table);
+            tableInputFormat.configureJob(job);
 
-            job.setInputFormatClass(HCatInputFormat.class);
             job.setMapperClass(ColumnCardinalityMapper.class);
             job.setMapOutputKeyClass(IntWritable.class);
             job.setMapOutputValueClass(BytesWritable.class);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
index 5ec963e..77c4f7e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
@@ -29,18 +29,19 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author yangli9
  */
 public class FactDistinctColumnsJob extends AbstractHadoopJob {
     protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class);
@@ -53,7 +54,6 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             options.addOption(OPTION_JOB_NAME);
             options.addOption(OPTION_CUBE_NAME);
             options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_TABLE_NAME);
             options.addOption(OPTION_SEGMENT_NAME);
             options.addOption(OPTION_STATISTICS_ENABLED);
             options.addOption(OPTION_STATISTICS_OUTPUT);
@@ -63,8 +63,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             String cubeName = getOptionValue(OPTION_CUBE_NAME);
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
-            ;
+            
             String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
             String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
             String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
@@ -73,7 +72,7 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
             // ----------------------------------------------------------------------------
             // add metadata to distributed cache
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
 
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
@@ -84,11 +83,11 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
 
             setJobClasspath(job);
 
-            setupMapper(intermediateTable);
+            setupMapper(cube.getSegment(segmentName, SegmentStatusEnum.NEW));
             setupReducer(output);
 
             // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
-            attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration());
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
             return waitForCompletion(job);
 
@@ -100,12 +99,10 @@ public class FactDistinctColumnsJob extends AbstractHadoopJob {
 
     }
 
-    private void setupMapper(String intermediateTable) throws IOException {
-        String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-        HCatInputFormat.setInput(job, dbTableNames[0],
-                dbTableNames[1]);
+    private void setupMapper(CubeSegment cubeSeg) throws IOException {
+        IMRTableInputFormat flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+        flatTableInputFormat.configureJob(job);
 
-        job.setInputFormatClass(HCatInputFormat.class);
         job.setMapperClass(FactDistinctHiveColumnsMapper.class);
         job.setCombinerClass(FactDistinctColumnsCombiner.class);
         job.setMapOutputKeyClass(LongWritable.class);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
index 6bd25a8..ee8ab3e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
@@ -1,35 +1,41 @@
 package org.apache.kylin.job.hadoop.cube;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinMapper;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.RowKeyDesc;
 import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  */
 public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, LongWritable, Text> {
 
     protected String cubeName;
     protected CubeInstance cube;
+    protected CubeSegment cubeSeg;
     protected CubeDesc cubeDesc;
     protected long baseCuboidId;
     protected List<TblColRef> columns;
     protected ArrayList<Integer> factDictCols;
+    protected IMRTableInputFormat flatTableInputFormat;
 
     protected LongWritable outputKey = new LongWritable();
     protected Text outputValue = new Text();
@@ -43,6 +49,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
 
         cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeSeg = cube.getSegment(conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME), SegmentStatusEnum.NEW);
         cubeDesc = cube.getDescriptor();
         baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         columns = Cuboid.findById(cubeDesc, baseCuboidId).getColumns();
@@ -60,11 +67,13 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
                 factDictCols.add(i);
             }
         }
+        
+        flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
     }
 
-    protected void handleErrorRecord(HCatRecord record, Exception ex) throws IOException {
+    protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
 
-        System.err.println("Insane record: " + record.getAll());
+        System.err.println("Insane record: " + Arrays.toString(record));
         ex.printStackTrace(System.err);
 
         errorRecordCounter++;


[2/3] incubator-kylin git commit: KYLIN-877 Abstract Input Source interface and make hive a default impl

Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index a884465..c19af69 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -18,35 +18,30 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.lookup.HiveTableReader;
 import org.apache.kylin.job.constant.BatchConstants;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.List;
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 
 /**
  * @author yangli9
  */
-public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, HCatRecord> {
+public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
 
-    private HCatSchema schema = null;
     private CubeJoinedFlatTableDesc intermediateTableDesc;
 
     protected boolean collectStatistics = false;
@@ -55,7 +50,6 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     private Integer[][] allCuboidsBitSet = null;
     private HyperLogLogPlusCounter[] allCuboidsHLL = null;
     private Long[] cuboidIds;
-    private List<String> rowArray;
     private HashFunction hf = null;
     private int rowCount = 0;
     private int SAMPING_PERCENTAGE = 5;
@@ -64,9 +58,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     @Override
     protected void setup(Context context) throws IOException {
         super.setup(context);
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
         intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-        rowArray = new ArrayList<String>(schema.getFields().size());
         collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
         if (collectStatistics) {
             SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
@@ -116,13 +108,12 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     }
 
     @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-        rowArray.clear();
-        HiveTableReader.getRowAsList(record, rowArray);
+    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+        String[] row = flatTableInputFormat.parseMapperInput(record);
         try {
             for (int i : factDictCols) {
                 outputKey.set((long) i);
-                String fieldValue = rowArray.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+                String fieldValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
                 if (fieldValue == null)
                     continue;
                 byte[] bytes = Bytes.toBytes(fieldValue);
@@ -130,24 +121,25 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
                 context.write(outputKey, outputValue);
             }
         } catch (Exception ex) {
-            handleErrorRecord(record, ex);
+            handleErrorRecord(row, ex);
         }
 
         if (collectStatistics && rowCount < SAMPING_PERCENTAGE) {
-            putRowKeyToHLL(rowArray);
+            putRowKeyToHLL(row);
         }
 
         if (rowCount++ == 100)
             rowCount = 0;
     }
 
-    private void putRowKeyToHLL(List<String> row) {
+    private void putRowKeyToHLL(String[] row) {
 
         //generate hash for each row key column
         for (int i = 0; i < nRowKey; i++) {
             Hasher hc = hf.newHasher();
-            if (row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]) != null) {
-                row_hashcodes[i].set(hc.putString(row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i])).hash().asBytes());
+            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+            if (colValue != null) {
+                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
             } else {
                 row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
deleted file mode 100644
index d3c3249..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.FIFOIterable;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.*;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<ImmutableBytesWritable, Result> {
-
-    private Queue<IIRow> buffer = Lists.newLinkedList();
-    private Iterator<Slice> slices;
-
-    private int[] baseCuboidCol2FlattenTableCol;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.setup(context);
-
-        Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        String iiName = conf.get(BatchConstants.CFG_II_NAME);
-        IIInstance ii = IIManager.getInstance(config).getII(iiName);
-        IIDesc iiDesc = ii.getDescriptor();
-
-        IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
-        TableRecordInfo info = new TableRecordInfo(iiDesc);
-        KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
-        slices = codec.decodeKeyValue(new FIFOIterable<IIRow>(buffer)).iterator();
-
-        baseCuboidCol2FlattenTableCol = new int[factDictCols.size()];
-        for (int i = 0; i < factDictCols.size(); ++i) {
-            int index = findTblCol(intermediateTableDesc.getColumnList(), columns.get(factDictCols.get(i)));
-            baseCuboidCol2FlattenTableCol[i] = index;
-        }
-    }
-
-    private int findTblCol(List<IntermediateColumnDesc> columns, final TblColRef col) {
-        return Iterators.indexOf(columns.iterator(), new Predicate<IntermediateColumnDesc>() {
-            @Override
-            public boolean apply(IntermediateColumnDesc input) {
-                return input.getColRef().equals(col);
-            }
-        });
-    }
-
-    @Override
-    public void map(ImmutableBytesWritable key, Result cells, Context context) throws IOException, InterruptedException {
-        IIRow iiRow = new IIRow();
-        for (Cell c : cells.rawCells()) {
-            iiRow.updateWith(c);
-        }
-        buffer.add(iiRow);
-
-        if (slices.hasNext()) {
-            byte[] vBytesBuffer = null;
-            Slice slice = slices.next();
-
-            for (RawTableRecord record : slice) {
-                for (int i = 0; i < factDictCols.size(); ++i) {
-                    int baseCuboidIndex = factDictCols.get(i);
-                    outputKey.set((short) baseCuboidIndex);
-                    int indexInRecord = baseCuboidCol2FlattenTableCol[i];
-
-                    Dictionary<?> dictionary = slice.getLocalDictionaries()[indexInRecord];
-                    if (vBytesBuffer == null || dictionary.getSizeOfValue() > vBytesBuffer.length) {
-                        vBytesBuffer = new byte[dictionary.getSizeOfValue() * 2];
-                    }
-
-                    int vid = record.getValueID(indexInRecord);
-                    if (vid == dictionary.nullId()) {
-                        continue;
-                    }
-                    int vBytesSize = dictionary.getValueBytesFromId(vid, vBytesBuffer, 0);
-
-                    outputValue.set(vBytesBuffer, 0, vBytesSize);
-                    context.write(outputKey, outputValue);
-                }
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
index f5ee5b9..20f2474 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
@@ -27,19 +27,14 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
 import org.apache.hadoop.io.Text;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinMapper;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -49,8 +44,9 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.HiveTable;
 import org.apache.kylin.dict.lookup.LookupBytesTable;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -60,6 +56,10 @@ import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.TableSourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author George Song (ysong1),honma
@@ -170,8 +170,9 @@ public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, T
 
                 // load lookup tables
                 if (!lookupTables.containsKey(lookupTableName)) {
-                    HiveTable htable = new HiveTable(metadataManager, lookupTableName);
-                    LookupBytesTable btable = new LookupBytesTable(metadataManager.getTableDesc(lookupTableName), join.getPrimaryKey(), htable);
+                    TableDesc tableDesc = metadataManager.getTableDesc(lookupTableName);
+                    ReadableTable htable = TableSourceFactory.createReadableTable(tableDesc);
+                    LookupBytesTable btable = new LookupBytesTable(tableDesc, join.getPrimaryKey(), htable);
                     lookupTables.put(lookupTableName, btable);
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
index db690b9..f89b143 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
@@ -31,21 +31,21 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author shaoshi
  */
-
 public class InMemCuboidJob extends AbstractHadoopJob {
 
     protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
@@ -62,7 +62,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_NCUBOID_LEVEL);
             options.addOption(OPTION_INPUT_FORMAT);
-            options.addOption(OPTION_TABLE_NAME);
             options.addOption(OPTION_HTABLE_NAME);
             options.addOption(OPTION_STATISTICS_OUTPUT);
             parseOptions(options, args);
@@ -71,14 +70,12 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
             String htableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
 
-
             KylinConfig config = KylinConfig.getInstanceFromEnv();
             CubeManager cubeMgr = CubeManager.getInstance(config);
             CubeInstance cube = cubeMgr.getCube(cubeName);
-
+            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             logger.info("Starting: " + job.getJobName());
@@ -89,13 +86,9 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             DataModelDesc.RealizationCapacity realizationCapacity = cube.getDescriptor().getModel().getCapacity();
             job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, realizationCapacity.toString());
 
-            String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-            HCatInputFormat.setInput(job, dbTableNames[0],
-                    dbTableNames[1]);
-
-            job.setInputFormatClass(HCatInputFormat.class);
-
             // set Mapper
+            IMRTableInputFormat flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
             job.setMapperClass(InMemCuboidMapper.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
             job.setMapOutputValueClass(Text.class);
@@ -112,8 +105,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
             HTable htable = new HTable(conf, htableName);
-            HFileOutputFormat.configureIncrementalLoad(job,
-                    htable);
+            HFileOutputFormat.configureIncrementalLoad(job, htable);
 
 
             // set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index ba87dfe..3eb29df 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -2,6 +2,7 @@ package org.apache.kylin.job.hadoop.cubev2;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -16,7 +17,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinMapper;
 import org.apache.kylin.cube.CubeInstance;
@@ -25,7 +25,8 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
@@ -36,12 +37,13 @@ import com.google.common.collect.Maps;
 
 /**
  */
-public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ImmutableBytesWritable, Text> {
+public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ImmutableBytesWritable, Text> {
 
     private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
     private CubeInstance cube;
     private CubeDesc cubeDesc;
     private CubeSegment cubeSegment;
+    private IMRTableInputFormat flatTableInputFormat;
 
     private int counter;
     private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
@@ -59,6 +61,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
         cubeDesc = cube.getDescriptor();
         String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
         cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
 
         Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
 
@@ -83,11 +86,13 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
     }
 
     @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
         // put each row to the queue
-        List<String> row = HiveTableReader.getRowAsList(record);
+        String[] row = flatTableInputFormat.parseMapperInput(record);
+        List<String> rowAsList = Arrays.asList(row);
+        
         while (!future.isDone()) {
-            if (queue.offer(row, 1, TimeUnit.SECONDS)) {
+            if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
                 counter++;
                 if (counter % BatchConstants.COUNTER_MAX == 0) {
                     logger.info("Handled " + counter + " records!");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java
deleted file mode 100644
index c25b164..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.invertedindex.IIDescManager;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.cmd.ICommandOutput;
-import org.apache.kylin.job.cmd.ShellCmd;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-
-/**
- */
-public class IIFlattenHiveJob extends AbstractHadoopJob {
-
-    protected static final Logger logger = LoggerFactory.getLogger(IIFlattenHiveJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-        try {
-            options.addOption(OPTION_II_NAME);
-            parseOptions(options, args);
-
-            String iiname = getOptionValue(OPTION_II_NAME);
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-
-            IIInstance iiInstance = IIManager.getInstance(config).getII(iiname);
-            IIDesc iidesc = IIDescManager.getInstance(config).getIIDesc(iiInstance.getDescName());
-
-            String jobUUID = "00bf87b5-c7b5-4420-a12a-07f6b37b3187";
-            JobEngineConfig engineConfig = new JobEngineConfig(config);
-            IJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(iidesc);
-            String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobUUID);
-            String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, //
-                    JobInstance.getJobWorkingDir(jobUUID, engineConfig.getHdfsWorkingDirectory()), jobUUID);
-            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobUUID, engineConfig);
-
-            StringBuffer buf = new StringBuffer();
-            buf.append("hive -e \"");
-            buf.append(dropTableHql + "\n");
-            buf.append(createTableHql + "\n");
-            buf.append(insertDataHqls + "\n");
-            buf.append("\"");
-            
-            System.out.println(buf.toString());
-            System.out.println("========================");
-
-            ShellCmd cmd = new ShellCmd(buf.toString(), null, 0, null, null, false);
-            ICommandOutput output = cmd.execute();
-            System.out.println(output.getOutput());
-            System.out.println(output.getExitCode());
-            
-            return 0;
-        } catch (Exception e) {
-            logger.error("error execute IIFlattenHiveJob", e);
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        IIFlattenHiveJob job = new IIFlattenHiveJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
index 1ae101d..cc68e1b 100644
--- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
@@ -23,7 +23,6 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.TimeZone;
 
-import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.common.HadoopShellExecutable;
@@ -38,6 +37,7 @@ import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob;
 import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob;
 import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
+import org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide;
 
 import com.google.common.base.Preconditions;
 
@@ -57,7 +57,7 @@ public final class IIJobBuilder {
         IIJob result = initialJob(seg, "BUILD", submitter);
         final String jobId = result.getId();
         final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
-        final String intermediateHiveTableName = intermediateTableDesc.getTableName(jobId);
+        final String intermediateHiveTableName = intermediateTableDesc.getTableName();
         final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
         final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
         final String iiPath = iiRootPath + "*";
@@ -84,7 +84,7 @@ public final class IIJobBuilder {
     }
 
     private AbstractExecutable createFlatHiveTableStep(IIJoinedFlatTableDesc intermediateTableDesc, String jobId) {
-        return JobBuilderSupport.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
+        return BatchCubingInputSide.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
     }
 
     private IIJob initialJob(IISegment seg, String type, String submitter) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 980b375..5637a09 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -43,7 +43,6 @@ import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
 import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
@@ -52,6 +51,7 @@ import org.apache.kylin.job.inmemcubing.ICuboidWriter;
 import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable.TableSignature;
 import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
 import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.streaming.MicroStreamBatch;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/ITableSource.java b/job/src/main/java/org/apache/kylin/source/ITableSource.java
deleted file mode 100644
index ae1dccc..0000000
--- a/job/src/main/java/org/apache/kylin/source/ITableSource.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-public interface ITableSource {
-
-    public <I> I adaptToBuildEngine(Class<I> engineInterface);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index a0cd62e..4b01f24 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -18,15 +18,157 @@
 
 package org.apache.kylin.source.hive;
 
-import org.apache.kylin.engine.mr.IMRJobFlowParticipant;
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
 
 public class HiveMRInput implements IMRInput {
 
     @Override
-    public IMRJobFlowParticipant createBuildFlowParticipant() {
-        // TODO Auto-generated method stub
-        return null;
+    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+        return new BatchCubingInputSide(seg);
+    }
+    
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table) {
+        return new HiveTableInputFormat(table.getIdentity());
+    }
+    
+    public static class HiveTableInputFormat implements IMRTableInputFormat {
+        final String dbName;
+        final String tableName;
+
+        public HiveTableInputFormat(String hiveTable) {
+            String[] parts = HadoopUtil.parseHiveTableName(hiveTable);
+            dbName = parts[0];
+            tableName = parts[1];
+        }
+
+        @Override
+        public void configureJob(Job job) {
+            try {
+                HCatInputFormat.setInput(job, dbName, tableName);
+                job.setInputFormatClass(HCatInputFormat.class);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public String[] parseMapperInput(Object mapperInput) {
+            return HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput);
+        }
+        
+    }
+
+    public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
+        
+        final JobEngineConfig conf;
+        final CubeSegment seg;
+        final CubeJoinedFlatTableDesc flatHiveTableDesc;
+
+        public BatchCubingInputSide(CubeSegment seg) {
+            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.seg = seg;
+            this.flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId()));
+        }
+        
+        public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
+
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+            String insertDataHqls;
+            try {
+                insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to generate insert data SQL for intermediate table.", e);
+            }
+
+            ShellExecutable step = new ShellExecutable();
+            StringBuffer buf = new StringBuffer();
+            buf.append("hive -e \"");
+            buf.append(dropTableHql + "\n");
+            buf.append(createTableHql + "\n");
+            buf.append(insertDataHqls + "\n");
+            buf.append("\"");
+
+            step.setCmd(buf.toString());
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+            return step;
+        }
+
+        @Override
+        public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow) {
+            GarbageCollectionStep step = new GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+            step.setOldHiveTable(flatHiveTableDesc.getTableName());
+            jobFlow.addTask(step);
+        }
+
+        @Override
+        public IMRTableInputFormat getFlatTableInputFormat() {
+            return new HiveTableInputFormat(flatHiveTableDesc.getTableName());
+        }
+        
+    }
+    
+    public static class GarbageCollectionStep extends AbstractExecutable {
+
+        @Override
+        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+            StringBuffer output = new StringBuffer();
+
+            final String hiveTable = this.getOldHiveTable();
+            if (StringUtils.isNotEmpty(hiveTable)) {
+                final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS  " + hiveTable + ";\"";
+                ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
+                try {
+                    context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+                    output.append("Hive table " + hiveTable + " is dropped. \n");
+                } catch (IOException e) {
+                    logger.error("job:" + getId() + " execute finished with exception", e);
+                    output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
+                    return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+                }
+            }
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+        }
+
+        public void setOldHiveTable(String hiveTable) {
+            setParam("oldHiveTable", hiveTable);
+        }
+
+        private String getOldHiveTable() {
+            return getParam("oldHiveTable");
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
new file mode 100644
index 0000000..76b9bab
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Management class to sync hive table metadata with command See main method for
+ * how to use the class
+ *
+ * @author jianliu
+ */
+public class HiveSourceTableLoader {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
+
+    public static final String OUTPUT_SURFIX = "json";
+    public static final String TABLE_FOLDER_NAME = "table";
+    public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
+
+    public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
+
+        Map<String, Set<String>> db2tables = Maps.newHashMap();
+        for (String table : hiveTables) {
+            String[] parts = HadoopUtil.parseHiveTableName(table);
+            Set<String> set = db2tables.get(parts[0]);
+            if (set == null) {
+                set = Sets.newHashSet();
+                db2tables.put(parts[0], set);
+            }
+            set.add(parts[1]);
+        }
+
+        // extract from hive
+        Set<String> loadedTables = Sets.newHashSet();
+        for (String database : db2tables.keySet()) {
+            List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
+            loadedTables.addAll(loaded);
+        }
+
+        return loadedTables;
+    }
+
+    private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
+
+        List<String> loadedTables = Lists.newArrayList();
+        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        for (String tableName : tables) {
+            Table table = null;
+            HiveClient hiveClient = new HiveClient();
+            List<FieldSchema> partitionFields = null;
+            List<FieldSchema> fields = null;
+            try {
+                table = hiveClient.getHiveTable(database, tableName);
+                partitionFields = table.getPartitionKeys();
+                fields = hiveClient.getHiveTableFields(database, tableName);
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new IOException(e);
+            }
+
+            if (fields != null && partitionFields != null && partitionFields.size() > 0) {
+                fields.addAll(partitionFields);
+            }
+
+            long tableSize = hiveClient.getFileSizeForTable(table);
+            long tableFileNum = hiveClient.getFileNumberForTable(table);
+            TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
+            if (tableDesc == null) {
+                tableDesc = new TableDesc();
+                tableDesc.setDatabase(database.toUpperCase());
+                tableDesc.setName(tableName.toUpperCase());
+                tableDesc.setUuid(UUID.randomUUID().toString());
+                tableDesc.setLastModified(0);
+            }
+
+            int columnNumber = fields.size();
+            List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
+            for (int i = 0; i < columnNumber; i++) {
+                FieldSchema field = fields.get(i);
+                ColumnDesc cdesc = new ColumnDesc();
+                cdesc.setName(field.getName().toUpperCase());
+                cdesc.setDatatype(field.getType());
+                cdesc.setId(String.valueOf(i + 1));
+                columns.add(cdesc);
+            }
+            tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
+
+            StringBuffer partitionColumnString = new StringBuffer();
+            for (int i = 0, n = partitionFields.size(); i < n; i++) {
+                if (i > 0)
+                    partitionColumnString.append(", ");
+                partitionColumnString.append(partitionFields.get(i).getName().toUpperCase());
+            }
+
+            Map<String, String> map = metaMgr.getTableDescExd(tableDesc.getIdentity());
+
+            if (map == null) {
+                map = Maps.newHashMap();
+            }
+            map.put(MetadataConstants.TABLE_EXD_TABLENAME, table.getTableName());
+            map.put(MetadataConstants.TABLE_EXD_LOCATION, table.getSd().getLocation());
+            map.put(MetadataConstants.TABLE_EXD_IF, table.getSd().getInputFormat());
+            map.put(MetadataConstants.TABLE_EXD_OF, table.getSd().getOutputFormat());
+            map.put(MetadataConstants.TABLE_EXD_OWNER, table.getOwner());
+            map.put(MetadataConstants.TABLE_EXD_LAT, String.valueOf(table.getLastAccessTime()));
+            map.put(MetadataConstants.TABLE_EXD_PC, partitionColumnString.toString());
+            map.put(MetadataConstants.TABLE_EXD_TFS, String.valueOf(tableSize));
+            map.put(MetadataConstants.TABLE_EXD_TNF, String.valueOf(tableFileNum));
+            map.put(MetadataConstants.TABLE_EXD_PARTITIONED, Boolean.valueOf(partitionFields != null && partitionFields.size() > 0).toString());
+
+            metaMgr.saveSourceTable(tableDesc);
+            metaMgr.saveTableExd(tableDesc.getIdentity(), map);
+            loadedTables.add(tableDesc.getIdentity());
+        }
+
+
+        return loadedTables;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
new file mode 100644
index 0000000..02e7c45
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.dict.lookup.FileTable;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class HiveTable implements ReadableTable {
+
+    private static final Logger logger = LoggerFactory.getLogger(HiveTable.class);
+
+    final private String database;
+    final private String hiveTable;
+    
+    private HiveClient hiveClient;
+
+    public HiveTable(TableDesc tableDesc) {
+        this.database = tableDesc.getDatabase();
+        this.hiveTable = tableDesc.getName();
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        return new HiveTableReader(database, hiveTable);
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        try {
+            String path = computeHDFSLocation();
+            Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path);
+            long size = sizeAndLastModified.getFirst();
+            long lastModified = sizeAndLastModified.getSecond();
+
+            // for non-native hive table, cannot rely on size & last modified on HDFS
+            if (getHiveClient().isNativeTable(database, hiveTable) == false) {
+                lastModified = System.currentTimeMillis(); // assume table is ever changing
+            }
+
+            return new TableSignature(path, size, lastModified);
+
+        } catch (Exception e) {
+            if (e instanceof IOException)
+                throw (IOException) e;
+            else
+                throw new IOException(e);
+        }
+    }
+
+    private String computeHDFSLocation() throws Exception {
+
+        String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable);
+        if (override != null) {
+            logger.debug("Override hive table location " + hiveTable + " -- " + override);
+            return override;
+        }
+
+        return getHiveClient().getHiveTableLocation(database, hiveTable);
+    }
+
+    public HiveClient getHiveClient() {
+
+        if (hiveClient == null) {
+            hiveClient = new HiveClient();
+        }
+        return hiveClient;
+    }
+
+    @Override
+    public String toString() {
+        return "hive: database=[" + database + "], table=[" + hiveTable + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
new file mode 100644
index 0000000..35e24fe
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hive.hcatalog.data.transfer.HCatReader;
+import org.apache.hive.hcatalog.data.transfer.ReadEntity;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.apache.kylin.source.ReadableTable.TableReader;
+
+/**
+ * An implementation of TableReader with HCatalog for Hive table.
+ */
+public class HiveTableReader implements TableReader {
+
+    private String dbName;
+    private String tableName;
+    private int currentSplit = -1;
+    private ReaderContext readCntxt = null;
+    private Iterator<HCatRecord> currentHCatRecordItr = null;
+    private HCatRecord currentHCatRecord;
+    private int numberOfSplits = 0;
+    private Map<String, String> partitionKV = null;
+
+    /**
+     * Constructor for reading whole hive table
+     * @param dbName
+     * @param tableName
+     * @throws IOException
+     */
+    public HiveTableReader(String dbName, String tableName) throws IOException {
+        this(dbName, tableName, null);
+    }
+
+    /**
+     * Constructor for reading a partition of the hive table
+     * @param dbName
+     * @param tableName
+     * @param partitionKV key-value pairs condition on the partition
+     * @throws IOException
+     */
+    public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV) throws IOException {
+        this.dbName = dbName;
+        this.tableName = tableName;
+        this.partitionKV = partitionKV;
+        initialize();
+    }
+
+    private void initialize() throws IOException {
+        try {
+            this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new IOException(e);
+        }
+
+        this.numberOfSplits = readCntxt.numSplits();
+
+//        HCatTableInfo tableInfo = HCatTableInfo.
+//        HCatSchema schema = HCatBaseInputFormat.getTableSchema(context.getConfiguration);
+    }
+
+    @Override
+    public boolean next() throws IOException {
+
+        while (currentHCatRecordItr == null || !currentHCatRecordItr.hasNext()) {
+            currentSplit++;
+            if (currentSplit == numberOfSplits) {
+                return false;
+            }
+
+            currentHCatRecordItr = loadHCatRecordItr(readCntxt, currentSplit);
+        }
+
+        currentHCatRecord = currentHCatRecordItr.next();
+
+        return true;
+    }
+
+    @Override
+    public String[] getRow() {
+        return getRowAsStringArray(currentHCatRecord);
+    }
+
+    public List<String> getRowAsList() {
+        return getRowAsList(currentHCatRecord);
+    }
+
+    public static List<String> getRowAsList(HCatRecord record, List<String> rowValues) {
+        List<Object> allFields = record.getAll();
+        for (Object o : allFields) {
+            rowValues.add((o == null) ? null : o.toString());
+        }
+        return rowValues;
+    }
+
+    public static List<String> getRowAsList(HCatRecord record) {
+        return Arrays.asList(getRowAsStringArray(record));
+    }
+
+    public static String[] getRowAsStringArray(HCatRecord record) {
+        String[] arr = new String[record.size()];
+        for (int i = 0; i < arr.length; i++) {
+            Object o = record.get(i);
+            arr[i] = (o == null) ? null : o.toString();
+        }
+        return arr;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.readCntxt = null;
+        this.currentHCatRecordItr = null;
+        this.currentHCatRecord = null;
+        this.currentSplit = -1;
+    }
+
+    public String toString() {
+        return "hive table reader for: " + dbName + "." + tableName;
+    }
+
+    private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
+        HiveConf hiveConf = new HiveConf(HiveTableReader.class);
+        Iterator<Entry<String, String>> itr = hiveConf.iterator();
+        Map<String, String> map = new HashMap<String, String>();
+        while (itr.hasNext()) {
+            Entry<String, String> kv = itr.next();
+            map.put(kv.getKey(), kv.getValue());
+        }
+
+        ReadEntity entity;
+        if (partitionKV == null || partitionKV.size() == 0) {
+            entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
+        } else {
+            entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
+        }
+
+        HCatReader reader = DataTransferFactory.getHCatReader(entity, map);
+        ReaderContext cntxt = reader.prepareRead();
+
+        return cntxt;
+    }
+
+    private static Iterator<HCatRecord> loadHCatRecordItr(ReaderContext readCntxt, int dataSplit) throws HCatException {
+        HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit);
+
+        return currentHCatReader.read();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
index 4265519..a9e95a4 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
@@ -19,7 +19,9 @@
 package org.apache.kylin.source.hive;
 
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.ITableSource;
+import org.apache.kylin.source.ReadableTable;
 
 public class HiveTableSource implements ITableSource {
 
@@ -33,4 +35,9 @@ public class HiveTableSource implements ITableSource {
         }
     }
 
+    @Override
+    public ReadableTable createReadableTable(TableDesc tableDesc) {
+        return new HiveTable(tableDesc);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 9693771..1f91b25 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -34,18 +34,28 @@
 
 package org.apache.kylin.job;
 
-import com.google.common.collect.Lists;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.dict.lookup.HiveTableReader;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -58,6 +68,7 @@ import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
 import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.source.hive.HiveTableReader;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamMessage;
 import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
@@ -68,16 +79,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
 
 /**
  */
@@ -141,11 +143,11 @@ public class BuildIIWithStreamTest {
         IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
         JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
         final String uuid = UUID.randomUUID().toString();
-        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, uuid);
-        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid, uuid);
+        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
+        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid);
         String insertDataHqls;
         try {
-            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, uuid, jobEngineConfig);
+            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig);
         } catch (IOException e1) {
             e1.printStackTrace();
             throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
@@ -163,7 +165,7 @@ public class BuildIIWithStreamTest {
         logger.info(step.getCmd());
         step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
         kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
-        return intermediateTableDesc.getTableName(uuid);
+        return intermediateTableDesc.getTableName();
     }
 
     private void clearSegment(String iiName) throws Exception {
@@ -194,10 +196,7 @@ public class BuildIIWithStreamTest {
         final IIDesc desc = iiManager.getII(iiName).getDescriptor();
         final String tableName = createIntermediateTable(desc, kylinConfig);
         logger.info("intermediate table name:" + tableName);
-        final Configuration conf = new Configuration();
-        HCatInputFormat.setInput(conf, "default", tableName);
-        final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
-        logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
+        
         HiveTableReader reader = new HiveTableReader("default", tableName);
         final List<TblColRef> tblColRefs = desc.listAllColumns();
         for (TblColRef tblColRef : tblColRefs) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 5886324..32e5aff 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -39,11 +39,11 @@ import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.TrieDictionary;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.source.ReadableTable.TableSignature;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
index 15fe340..21f6a71 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
@@ -63,7 +63,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testGenCreateTableDDL() {
-        String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp", fakeJobUUID);
+        String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp");
         System.out.println(ddl);
 
         System.out.println("The length for the ddl is " + ddl.length());
@@ -71,14 +71,14 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testGenDropTableDDL() {
-        String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, fakeJobUUID);
+        String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
         System.out.println(ddl);
         assertEquals(107, ddl.length());
     }
 
     @Test
     public void testGenerateInsertSql() throws IOException {
-        String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, fakeJobUUID, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+        String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
         System.out.println(sqls);
 
         int length = sqls.length();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 9a28ea4..811948b 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -1,21 +1,19 @@
 package org.apache.kylin.job.hadoop.invertedindex;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
-import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
 import org.apache.kylin.common.util.FIFOIterable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
@@ -23,9 +21,11 @@ import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.*;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.cube.FactDistinctIIColumnsMapper;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.model.KeyValueCodec;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -41,16 +41,20 @@ import org.apache.kylin.storage.hbase.coprocessor.endpoint.ClearTextDictionary;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointAggregators;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.*;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.ParsedStreamMessage;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamParser;
+import org.apache.kylin.streaming.StringStreamParser;
 import org.apache.kylin.streaming.invertedindex.SliceBuilder;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  */
@@ -233,29 +237,4 @@ public class IITest extends LocalFileMetadataTestCase {
         }
     }
 
-    @Test
-    public void factDistinctIIColumnsMapperTest() throws IOException {
-        MapDriver<ImmutableBytesWritable, Result, LongWritable, Text> mapDriver;
-        FactDistinctIIColumnsMapper mapper = new FactDistinctIIColumnsMapper();
-        mapDriver = MapDriver.newMapDriver(mapper);
-
-        mapDriver.getConfiguration().set(BatchConstants.CFG_II_NAME, iiName);
-        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-        mapDriver.getConfiguration().setStrings("io.serializations", mapDriver.getConfiguration().get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName());
-        mapDriver.addAll(Lists.newArrayList(Collections2.transform(iiRows, new Function<IIRow, Pair<ImmutableBytesWritable, Result>>() {
-            @Nullable
-            @Override
-            public Pair<ImmutableBytesWritable, Result> apply(@Nullable IIRow input) {
-                return new Pair<ImmutableBytesWritable, Result>(new ImmutableBytesWritable(new byte[] { 1 }), Result.create(input.makeCells()));
-            }
-        })));
-
-        List<Pair<LongWritable, Text>> result = mapDriver.run();
-        Set<String> lstgNames = Sets.newHashSet("FP-non GTC", "ABIN");
-        for (Pair<LongWritable, Text> pair : result) {
-            Assert.assertEquals(pair.getFirst().get(), 6);
-            Assert.assertTrue(lstgNames.contains(pair.getSecond().toString()));
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java
deleted file mode 100644
index c9ef366..0000000
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.kylin.job.hadoop.cardinality.HiveColumnCardinalityJob;
-
-/**
- * @author ysong1
- * 
- */
-@Ignore("This test is invalid now as the mapper uses HCatalog to fetch the data which need a hive env")
-public class ColumnCardinalityJobTest {
-
-    private Configuration conf;
-
-    @Before
-    public void setup() throws IOException {
-        conf = new Configuration();
-        conf.set("fs.default.name", "file:///");
-        conf.set("mapred.job.tracker", "local");
-    }
-
-    @Test
-    @Ignore("not maintaining")
-    public void testJob() throws Exception {
-        final String input = "src/test/resources/data/test_cal_dt/";
-        final String output = "target/test-output/column-cardinality/";
-
-        FileUtil.fullyDelete(new File(output));
-
-        String[] args = { "-input", input, "-output", output, "-cols", "1,2,3,4,5,6,9,0" };
-        assertEquals("Job failed", 0, ToolRunner.run(new HiveColumnCardinalityJob(), args));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
deleted file mode 100644
index e13289a..0000000
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.job.hadoop.cardinality.ColumnCardinalityMapper;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- * @author ysong1
- * 
- */
-@Ignore("This test is invalid now as the mapper uses HCatalog to fetch the data which need a hive env")
-public class ColumnCardinalityMapperTest {
-
-    @SuppressWarnings("rawtypes")
-    MapDriver mapDriver;
-    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Before
-    public void setUp() {
-        ColumnCardinalityMapper mapper = new ColumnCardinalityMapper();
-        mapDriver = MapDriver.newMapDriver(mapper);
-    }
-
-    public final static String strArr = "abc,tests,test,test,as,sts,test,tss,sets";
-
-    @SuppressWarnings({ "unchecked" })
-    @Test
-    @Ignore
-    public void testMapperOn177() throws IOException {
-        mapDriver.clearInput();
-        File file = new File("src/test/resources/data/test_cal_dt/part-r-00000");
-        FileReader reader = new FileReader(file);
-        BufferedReader breader = new BufferedReader(reader);
-        String s = breader.readLine();
-        int i = 0;
-        while (s != null) {
-            LongWritable inputKey = new LongWritable(i++);
-            mapDriver.addInput(inputKey, new Text(s));
-            s = breader.readLine();
-        }
-        // breader.close();
-        List<Pair<IntWritable, BytesWritable>> result = mapDriver.run();
-        breader.close();
-        assertEquals(9, result.size());
-
-        int key1 = result.get(0).getFirst().get();
-        BytesWritable value1 = result.get(0).getSecond();
-        byte[] bytes = value1.getBytes();
-        HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
-        hllc.readRegisters(ByteBuffer.wrap(bytes));
-        assertTrue(key1 > 0);
-        assertEquals(8, hllc.getCountEstimate());
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testMapperOnComma() throws IOException {
-        mapDriver.clearInput();
-        LongWritable inputKey1 = new LongWritable(1);
-        LongWritable inputKey2 = new LongWritable(2);
-        LongWritable inputKey3 = new LongWritable(3);
-        LongWritable inputKey4 = new LongWritable(4);
-        LongWritable inputKey5 = new LongWritable(5);
-        LongWritable inputKey6 = new LongWritable(6);
-        LongWritable inputKey7 = new LongWritable(7);
-
-        mapDriver.addInput(inputKey1, new Text());
-        mapDriver.addInput(inputKey2, new Text(strArr));
-        mapDriver.addInput(inputKey3, new Text(strArr));
-        mapDriver.addInput(inputKey4, new Text(strArr));
-        mapDriver.addInput(inputKey5, new Text(strArr));
-        mapDriver.addInput(inputKey6, new Text(strArr));
-        mapDriver.addInput(inputKey7, new Text(strArr));
-
-        List<Pair<IntWritable, BytesWritable>> result = mapDriver.run();
-
-        assertEquals(9, result.size());
-
-        int key1 = result.get(0).getFirst().get();
-        BytesWritable value1 = result.get(0).getSecond();
-        byte[] bytes = value1.getBytes();
-        HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
-        hllc.readRegisters(ByteBuffer.wrap(bytes));
-        System.out.println("ab\177ab".length());
-        assertTrue(key1 > 0);
-        assertEquals(1, hllc.getCountEstimate());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
index 2c01fb6..867ee82 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
@@ -46,6 +46,8 @@ import org.apache.kylin.cube.kv.RowConstants;
  * 
  */
 public class ColumnCardinalityReducerTest {
+    
+    public final static String strArr = "abc,tests,test,test,as,sts,test,tss,sets";
 
     ReduceDriver<IntWritable, BytesWritable, IntWritable, LongWritable> reduceDriver;
     String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
@@ -76,23 +78,23 @@ public class ColumnCardinalityReducerTest {
     public void testReducer() throws IOException {
         IntWritable key1 = new IntWritable(1);
         List<BytesWritable> values1 = new ArrayList<BytesWritable>();
-        values1.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr)));
+        values1.add(new BytesWritable(getBytes(strArr)));
 
         IntWritable key2 = new IntWritable(2);
         List<BytesWritable> values2 = new ArrayList<BytesWritable>();
-        values2.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " x")));
+        values2.add(new BytesWritable(getBytes(strArr + " x")));
 
         IntWritable key3 = new IntWritable(3);
         List<BytesWritable> values3 = new ArrayList<BytesWritable>();
-        values3.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " xx")));
+        values3.add(new BytesWritable(getBytes(strArr + " xx")));
 
         IntWritable key4 = new IntWritable(4);
         List<BytesWritable> values4 = new ArrayList<BytesWritable>();
-        values4.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " xxx")));
+        values4.add(new BytesWritable(getBytes(strArr + " xxx")));
 
         IntWritable key5 = new IntWritable(5);
         List<BytesWritable> values5 = new ArrayList<BytesWritable>();
-        values5.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " xxxx")));
+        values5.add(new BytesWritable(getBytes(strArr + " xxxx")));
 
         reduceDriver.withInput(key1, values1);
         reduceDriver.withInput(key2, values2);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
new file mode 100644
index 0000000..de6df2c
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
+
+    @Before
+    public void setup() throws Exception {
+        super.createTestMetadata();
+    }
+
+    @After
+    public  void after() throws Exception {
+        super.cleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws IOException {
+        if (!useSandbox())
+            return;
+
+        KylinConfig config = getTestConfig();
+        String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
+        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+
+        assertTrue(loaded.size() == toLoad.length);
+        for (String str : toLoad)
+            assertTrue(loaded.contains(str));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
new file mode 100644
index 0000000..624f158
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.source.hive.HiveTableReader;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * This test case need the hive runtime; Please run it with sandbox;
+ * @author shaoshi
+ *
+ * It is in the exclude list of default profile in pom.xml
+ */
+public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
+
+
+    @Test
+    public void test() throws IOException {
+        HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
+        int rowNumber = 0;
+        while (reader.next()) {
+            String[] row = reader.getRow();
+            Assert.assertEquals(9, row.length);
+            //System.out.println(ArrayUtils.toString(row));
+            rowNumber++;
+        }
+
+        reader.close();
+        Assert.assertEquals(10000, rowNumber);
+    }
+}