You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/16 05:27:49 UTC

[09/11] incubator-kylin git commit: KYLIN-877 Abstract Input Source interface and make hive a default impl

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index a884465..c19af69 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -18,35 +18,30 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hasher;
-import com.google.common.hash.Hashing;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.List;
+
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.lookup.HiveTableReader;
 import org.apache.kylin.job.constant.BatchConstants;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.List;
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
 
 /**
  * @author yangli9
  */
-public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, HCatRecord> {
+public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMapperBase<KEYIN, Object> {
 
-    private HCatSchema schema = null;
     private CubeJoinedFlatTableDesc intermediateTableDesc;
 
     protected boolean collectStatistics = false;
@@ -55,7 +50,6 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     private Integer[][] allCuboidsBitSet = null;
     private HyperLogLogPlusCounter[] allCuboidsHLL = null;
     private Long[] cuboidIds;
-    private List<String> rowArray;
     private HashFunction hf = null;
     private int rowCount = 0;
     private int SAMPING_PERCENTAGE = 5;
@@ -64,9 +58,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     @Override
     protected void setup(Context context) throws IOException {
         super.setup(context);
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
         intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-        rowArray = new ArrayList<String>(schema.getFields().size());
         collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
         if (collectStatistics) {
             SAMPING_PERCENTAGE = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "5"));
@@ -116,13 +108,12 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     }
 
     @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-        rowArray.clear();
-        HiveTableReader.getRowAsList(record, rowArray);
+    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+        String[] row = flatTableInputFormat.parseMapperInput(record);
         try {
             for (int i : factDictCols) {
                 outputKey.set((long) i);
-                String fieldValue = rowArray.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+                String fieldValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
                 if (fieldValue == null)
                     continue;
                 byte[] bytes = Bytes.toBytes(fieldValue);
@@ -130,24 +121,25 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
                 context.write(outputKey, outputValue);
             }
         } catch (Exception ex) {
-            handleErrorRecord(record, ex);
+            handleErrorRecord(row, ex);
         }
 
         if (collectStatistics && rowCount < SAMPING_PERCENTAGE) {
-            putRowKeyToHLL(rowArray);
+            putRowKeyToHLL(row);
         }
 
         if (rowCount++ == 100)
             rowCount = 0;
     }
 
-    private void putRowKeyToHLL(List<String> row) {
+    private void putRowKeyToHLL(String[] row) {
 
         //generate hash for each row key column
         for (int i = 0; i < nRowKey; i++) {
             Hasher hc = hf.newHasher();
-            if (row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]) != null) {
-                row_hashcodes[i].set(hc.putString(row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i])).hash().asBytes());
+            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+            if (colValue != null) {
+                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
             } else {
                 row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
deleted file mode 100644
index d3c3249..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctIIColumnsMapper.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Queue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.FIFOIterable;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.*;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-/**
- * @author yangli9
- */
-public class FactDistinctIIColumnsMapper extends FactDistinctColumnsMapperBase<ImmutableBytesWritable, Result> {
-
-    private Queue<IIRow> buffer = Lists.newLinkedList();
-    private Iterator<Slice> slices;
-
-    private int[] baseCuboidCol2FlattenTableCol;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.setup(context);
-
-        Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        String iiName = conf.get(BatchConstants.CFG_II_NAME);
-        IIInstance ii = IIManager.getInstance(config).getII(iiName);
-        IIDesc iiDesc = ii.getDescriptor();
-
-        IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(iiDesc);
-        TableRecordInfo info = new TableRecordInfo(iiDesc);
-        KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
-        slices = codec.decodeKeyValue(new FIFOIterable<IIRow>(buffer)).iterator();
-
-        baseCuboidCol2FlattenTableCol = new int[factDictCols.size()];
-        for (int i = 0; i < factDictCols.size(); ++i) {
-            int index = findTblCol(intermediateTableDesc.getColumnList(), columns.get(factDictCols.get(i)));
-            baseCuboidCol2FlattenTableCol[i] = index;
-        }
-    }
-
-    private int findTblCol(List<IntermediateColumnDesc> columns, final TblColRef col) {
-        return Iterators.indexOf(columns.iterator(), new Predicate<IntermediateColumnDesc>() {
-            @Override
-            public boolean apply(IntermediateColumnDesc input) {
-                return input.getColRef().equals(col);
-            }
-        });
-    }
-
-    @Override
-    public void map(ImmutableBytesWritable key, Result cells, Context context) throws IOException, InterruptedException {
-        IIRow iiRow = new IIRow();
-        for (Cell c : cells.rawCells()) {
-            iiRow.updateWith(c);
-        }
-        buffer.add(iiRow);
-
-        if (slices.hasNext()) {
-            byte[] vBytesBuffer = null;
-            Slice slice = slices.next();
-
-            for (RawTableRecord record : slice) {
-                for (int i = 0; i < factDictCols.size(); ++i) {
-                    int baseCuboidIndex = factDictCols.get(i);
-                    outputKey.set((short) baseCuboidIndex);
-                    int indexInRecord = baseCuboidCol2FlattenTableCol[i];
-
-                    Dictionary<?> dictionary = slice.getLocalDictionaries()[indexInRecord];
-                    if (vBytesBuffer == null || dictionary.getSizeOfValue() > vBytesBuffer.length) {
-                        vBytesBuffer = new byte[dictionary.getSizeOfValue() * 2];
-                    }
-
-                    int vid = record.getValueID(indexInRecord);
-                    if (vid == dictionary.nullId()) {
-                        continue;
-                    }
-                    int vBytesSize = dictionary.getValueBytesFromId(vid, vBytesBuffer, 0);
-
-                    outputValue.set(vBytesBuffer, 0, vBytesSize);
-                    context.write(outputKey, outputValue);
-                }
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
index f5ee5b9..20f2474 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
@@ -27,19 +27,14 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
 import org.apache.hadoop.io.Text;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinMapper;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -49,8 +44,9 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.HiveTable;
 import org.apache.kylin.dict.lookup.LookupBytesTable;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -60,6 +56,10 @@ import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.TableSourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author George Song (ysong1),honma
@@ -170,8 +170,9 @@ public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, T
 
                 // load lookup tables
                 if (!lookupTables.containsKey(lookupTableName)) {
-                    HiveTable htable = new HiveTable(metadataManager, lookupTableName);
-                    LookupBytesTable btable = new LookupBytesTable(metadataManager.getTableDesc(lookupTableName), join.getPrimaryKey(), htable);
+                    TableDesc tableDesc = metadataManager.getTableDesc(lookupTableName);
+                    ReadableTable htable = TableSourceFactory.createReadableTable(tableDesc);
+                    LookupBytesTable btable = new LookupBytesTable(tableDesc, join.getPrimaryKey(), htable);
                     lookupTables.put(lookupTableName, btable);
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
index db690b9..f89b143 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
@@ -31,21 +31,21 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author shaoshi
  */
-
 public class InMemCuboidJob extends AbstractHadoopJob {
 
     protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
@@ -62,7 +62,6 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_NCUBOID_LEVEL);
             options.addOption(OPTION_INPUT_FORMAT);
-            options.addOption(OPTION_TABLE_NAME);
             options.addOption(OPTION_HTABLE_NAME);
             options.addOption(OPTION_STATISTICS_OUTPUT);
             parseOptions(options, args);
@@ -71,14 +70,12 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
             String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
             String htableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
 
-
             KylinConfig config = KylinConfig.getInstanceFromEnv();
             CubeManager cubeMgr = CubeManager.getInstance(config);
             CubeInstance cube = cubeMgr.getCube(cubeName);
-
+            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             logger.info("Starting: " + job.getJobName());
@@ -89,13 +86,9 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             DataModelDesc.RealizationCapacity realizationCapacity = cube.getDescriptor().getModel().getCapacity();
             job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, realizationCapacity.toString());
 
-            String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-            HCatInputFormat.setInput(job, dbTableNames[0],
-                    dbTableNames[1]);
-
-            job.setInputFormatClass(HCatInputFormat.class);
-
             // set Mapper
+            IMRTableInputFormat flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
             job.setMapperClass(InMemCuboidMapper.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
             job.setMapOutputValueClass(Text.class);
@@ -112,8 +105,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
             HTable htable = new HTable(conf, htableName);
-            HFileOutputFormat.configureIncrementalLoad(job,
-                    htable);
+            HFileOutputFormat.configureIncrementalLoad(job, htable);
 
 
             // set Reducer; This need be after configureIncrementalLoad, to overwrite the default reducer class

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
index ba87dfe..3eb29df 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
@@ -2,6 +2,7 @@ package org.apache.kylin.job.hadoop.cubev2;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -16,7 +17,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.mr.KylinMapper;
 import org.apache.kylin.cube.CubeInstance;
@@ -25,7 +25,8 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
@@ -36,12 +37,13 @@ import com.google.common.collect.Maps;
 
 /**
  */
-public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ImmutableBytesWritable, Text> {
+public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ImmutableBytesWritable, Text> {
 
     private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
     private CubeInstance cube;
     private CubeDesc cubeDesc;
     private CubeSegment cubeSegment;
+    private IMRTableInputFormat flatTableInputFormat;
 
     private int counter;
     private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
@@ -59,6 +61,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
         cubeDesc = cube.getDescriptor();
         String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
         cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+        flatTableInputFormat = MRBatchCubingEngine.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
 
         Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
 
@@ -83,11 +86,13 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Imm
     }
 
     @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
         // put each row to the queue
-        List<String> row = HiveTableReader.getRowAsList(record);
+        String[] row = flatTableInputFormat.parseMapperInput(record);
+        List<String> rowAsList = Arrays.asList(row);
+        
         while (!future.isDone()) {
-            if (queue.offer(row, 1, TimeUnit.SECONDS)) {
+            if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
                 counter++;
                 if (counter % BatchConstants.COUNTER_MAX == 0) {
                     logger.info("Handled " + counter + " records!");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java
deleted file mode 100644
index c25b164..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIFlattenHiveJob.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.invertedindex.IIDescManager;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.cmd.ICommandOutput;
-import org.apache.kylin.job.cmd.ShellCmd;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-
-/**
- */
-public class IIFlattenHiveJob extends AbstractHadoopJob {
-
-    protected static final Logger logger = LoggerFactory.getLogger(IIFlattenHiveJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-        try {
-            options.addOption(OPTION_II_NAME);
-            parseOptions(options, args);
-
-            String iiname = getOptionValue(OPTION_II_NAME);
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-
-            IIInstance iiInstance = IIManager.getInstance(config).getII(iiname);
-            IIDesc iidesc = IIDescManager.getInstance(config).getIIDesc(iiInstance.getDescName());
-
-            String jobUUID = "00bf87b5-c7b5-4420-a12a-07f6b37b3187";
-            JobEngineConfig engineConfig = new JobEngineConfig(config);
-            IJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(iidesc);
-            String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobUUID);
-            String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, //
-                    JobInstance.getJobWorkingDir(jobUUID, engineConfig.getHdfsWorkingDirectory()), jobUUID);
-            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobUUID, engineConfig);
-
-            StringBuffer buf = new StringBuffer();
-            buf.append("hive -e \"");
-            buf.append(dropTableHql + "\n");
-            buf.append(createTableHql + "\n");
-            buf.append(insertDataHqls + "\n");
-            buf.append("\"");
-            
-            System.out.println(buf.toString());
-            System.out.println("========================");
-
-            ShellCmd cmd = new ShellCmd(buf.toString(), null, 0, null, null, false);
-            ICommandOutput output = cmd.execute();
-            System.out.println(output.getOutput());
-            System.out.println(output.getExitCode());
-            
-            return 0;
-        } catch (Exception e) {
-            logger.error("error execute IIFlattenHiveJob", e);
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        IIFlattenHiveJob job = new IIFlattenHiveJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
index 1ae101d..cc68e1b 100644
--- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
@@ -23,7 +23,6 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.TimeZone;
 
-import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.common.HadoopShellExecutable;
@@ -38,6 +37,7 @@ import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob;
 import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob;
 import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
+import org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide;
 
 import com.google.common.base.Preconditions;
 
@@ -57,7 +57,7 @@ public final class IIJobBuilder {
         IIJob result = initialJob(seg, "BUILD", submitter);
         final String jobId = result.getId();
         final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
-        final String intermediateHiveTableName = intermediateTableDesc.getTableName(jobId);
+        final String intermediateHiveTableName = intermediateTableDesc.getTableName();
         final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
         final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
         final String iiPath = iiRootPath + "*";
@@ -84,7 +84,7 @@ public final class IIJobBuilder {
     }
 
     private AbstractExecutable createFlatHiveTableStep(IIJoinedFlatTableDesc intermediateTableDesc, String jobId) {
-        return JobBuilderSupport.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
+        return BatchCubingInputSide.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
     }
 
     private IIJob initialJob(IISegment seg, String type, String submitter) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 980b375..5637a09 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -43,7 +43,6 @@ import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.job.constant.BatchConstants;
 import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
 import org.apache.kylin.job.hadoop.cubev2.InMemKeyValueCreator;
@@ -52,6 +51,7 @@ import org.apache.kylin.job.inmemcubing.ICuboidWriter;
 import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.ReadableTable.TableSignature;
 import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
 import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.streaming.MicroStreamBatch;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/ITableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/ITableSource.java b/job/src/main/java/org/apache/kylin/source/ITableSource.java
deleted file mode 100644
index ae1dccc..0000000
--- a/job/src/main/java/org/apache/kylin/source/ITableSource.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.source;
-
-public interface ITableSource {
-
-    public <I> I adaptToBuildEngine(Class<I> engineInterface);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index a0cd62e..4b01f24 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -18,15 +18,157 @@
 
 package org.apache.kylin.source.hive;
 
-import org.apache.kylin.engine.mr.IMRJobFlowParticipant;
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.TableDesc;
 
 public class HiveMRInput implements IMRInput {
 
     @Override
-    public IMRJobFlowParticipant createBuildFlowParticipant() {
-        // TODO Auto-generated method stub
-        return null;
+    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+        return new BatchCubingInputSide(seg);
+    }
+    
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table) {
+        return new HiveTableInputFormat(table.getIdentity());
+    }
+    
+    public static class HiveTableInputFormat implements IMRTableInputFormat {
+        final String dbName;
+        final String tableName;
+
+        public HiveTableInputFormat(String hiveTable) {
+            String[] parts = HadoopUtil.parseHiveTableName(hiveTable);
+            dbName = parts[0];
+            tableName = parts[1];
+        }
+
+        @Override
+        public void configureJob(Job job) {
+            try {
+                HCatInputFormat.setInput(job, dbName, tableName);
+                job.setInputFormatClass(HCatInputFormat.class);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public String[] parseMapperInput(Object mapperInput) {
+            return HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput);
+        }
+        
+    }
+
+    public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
+        
+        final JobEngineConfig conf;
+        final CubeSegment seg;
+        final CubeJoinedFlatTableDesc flatHiveTableDesc;
+
+        public BatchCubingInputSide(CubeSegment seg) {
+            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.seg = seg;
+            this.flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId()));
+        }
+        
+        public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) {
+
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+            String insertDataHqls;
+            try {
+                insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to generate insert data SQL for intermediate table.", e);
+            }
+
+            ShellExecutable step = new ShellExecutable();
+            StringBuffer buf = new StringBuffer();
+            buf.append("hive -e \"");
+            buf.append(dropTableHql + "\n");
+            buf.append(createTableHql + "\n");
+            buf.append(insertDataHqls + "\n");
+            buf.append("\"");
+
+            step.setCmd(buf.toString());
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+
+            return step;
+        }
+
+        @Override
+        public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow) {
+            GarbageCollectionStep step = new GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+            step.setOldHiveTable(flatHiveTableDesc.getTableName());
+            jobFlow.addTask(step);
+        }
+
+        @Override
+        public IMRTableInputFormat getFlatTableInputFormat() {
+            return new HiveTableInputFormat(flatHiveTableDesc.getTableName());
+        }
+        
+    }
+    
+    public static class GarbageCollectionStep extends AbstractExecutable {
+
+        @Override
+        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+            StringBuffer output = new StringBuffer();
+
+            final String hiveTable = this.getOldHiveTable();
+            if (StringUtils.isNotEmpty(hiveTable)) {
+                final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS  " + hiveTable + ";\"";
+                ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
+                try {
+                    context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+                    output.append("Hive table " + hiveTable + " is dropped. \n");
+                } catch (IOException e) {
+                    logger.error("job:" + getId() + " execute finished with exception", e);
+                    output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
+                    return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+                }
+            }
+
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+        }
+
+        public void setOldHiveTable(String hiveTable) {
+            setParam("oldHiveTable", hiveTable);
+        }
+
+        private String getOldHiveTable() {
+            return getParam("oldHiveTable");
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
new file mode 100644
index 0000000..76b9bab
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Management class to sync hive table metadata with command See main method for
+ * how to use the class
+ *
+ * @author jianliu
+ */
+public class HiveSourceTableLoader {
+
+    @SuppressWarnings("unused")
+    private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
+
+    public static final String OUTPUT_SURFIX = "json";
+    public static final String TABLE_FOLDER_NAME = "table";
+    public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
+
+    public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
+
+        Map<String, Set<String>> db2tables = Maps.newHashMap();
+        for (String table : hiveTables) {
+            String[] parts = HadoopUtil.parseHiveTableName(table);
+            Set<String> set = db2tables.get(parts[0]);
+            if (set == null) {
+                set = Sets.newHashSet();
+                db2tables.put(parts[0], set);
+            }
+            set.add(parts[1]);
+        }
+
+        // extract from hive
+        Set<String> loadedTables = Sets.newHashSet();
+        for (String database : db2tables.keySet()) {
+            List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
+            loadedTables.addAll(loaded);
+        }
+
+        return loadedTables;
+    }
+
+    private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
+
+        List<String> loadedTables = Lists.newArrayList();
+        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        for (String tableName : tables) {
+            Table table = null;
+            HiveClient hiveClient = new HiveClient();
+            List<FieldSchema> partitionFields = null;
+            List<FieldSchema> fields = null;
+            try {
+                table = hiveClient.getHiveTable(database, tableName);
+                partitionFields = table.getPartitionKeys();
+                fields = hiveClient.getHiveTableFields(database, tableName);
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new IOException(e);
+            }
+
+            if (fields != null && partitionFields != null && partitionFields.size() > 0) {
+                fields.addAll(partitionFields);
+            }
+
+            long tableSize = hiveClient.getFileSizeForTable(table);
+            long tableFileNum = hiveClient.getFileNumberForTable(table);
+            TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
+            if (tableDesc == null) {
+                tableDesc = new TableDesc();
+                tableDesc.setDatabase(database.toUpperCase());
+                tableDesc.setName(tableName.toUpperCase());
+                tableDesc.setUuid(UUID.randomUUID().toString());
+                tableDesc.setLastModified(0);
+            }
+
+            int columnNumber = fields.size();
+            List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
+            for (int i = 0; i < columnNumber; i++) {
+                FieldSchema field = fields.get(i);
+                ColumnDesc cdesc = new ColumnDesc();
+                cdesc.setName(field.getName().toUpperCase());
+                cdesc.setDatatype(field.getType());
+                cdesc.setId(String.valueOf(i + 1));
+                columns.add(cdesc);
+            }
+            tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
+
+            StringBuffer partitionColumnString = new StringBuffer();
+            for (int i = 0, n = partitionFields.size(); i < n; i++) {
+                if (i > 0)
+                    partitionColumnString.append(", ");
+                partitionColumnString.append(partitionFields.get(i).getName().toUpperCase());
+            }
+
+            Map<String, String> map = metaMgr.getTableDescExd(tableDesc.getIdentity());
+
+            if (map == null) {
+                map = Maps.newHashMap();
+            }
+            map.put(MetadataConstants.TABLE_EXD_TABLENAME, table.getTableName());
+            map.put(MetadataConstants.TABLE_EXD_LOCATION, table.getSd().getLocation());
+            map.put(MetadataConstants.TABLE_EXD_IF, table.getSd().getInputFormat());
+            map.put(MetadataConstants.TABLE_EXD_OF, table.getSd().getOutputFormat());
+            map.put(MetadataConstants.TABLE_EXD_OWNER, table.getOwner());
+            map.put(MetadataConstants.TABLE_EXD_LAT, String.valueOf(table.getLastAccessTime()));
+            map.put(MetadataConstants.TABLE_EXD_PC, partitionColumnString.toString());
+            map.put(MetadataConstants.TABLE_EXD_TFS, String.valueOf(tableSize));
+            map.put(MetadataConstants.TABLE_EXD_TNF, String.valueOf(tableFileNum));
+            map.put(MetadataConstants.TABLE_EXD_PARTITIONED, Boolean.valueOf(partitionFields != null && partitionFields.size() > 0).toString());
+
+            metaMgr.saveSourceTable(tableDesc);
+            metaMgr.saveTableExd(tableDesc.getIdentity(), map);
+            loadedTables.add(tableDesc.getIdentity());
+        }
+
+
+        return loadedTables;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
new file mode 100644
index 0000000..02e7c45
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HiveClient;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.dict.lookup.FileTable;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class HiveTable implements ReadableTable {
+
+    private static final Logger logger = LoggerFactory.getLogger(HiveTable.class);
+
+    final private String database;
+    final private String hiveTable;
+    
+    private HiveClient hiveClient;
+
+    public HiveTable(TableDesc tableDesc) {
+        this.database = tableDesc.getDatabase();
+        this.hiveTable = tableDesc.getName();
+    }
+
+    @Override
+    public TableReader getReader() throws IOException {
+        return new HiveTableReader(database, hiveTable);
+    }
+
+    @Override
+    public TableSignature getSignature() throws IOException {
+        try {
+            String path = computeHDFSLocation();
+            Pair<Long, Long> sizeAndLastModified = FileTable.getSizeAndLastModified(path);
+            long size = sizeAndLastModified.getFirst();
+            long lastModified = sizeAndLastModified.getSecond();
+
+            // for non-native hive table, cannot rely on size & last modified on HDFS
+            if (getHiveClient().isNativeTable(database, hiveTable) == false) {
+                lastModified = System.currentTimeMillis(); // assume table is ever changing
+            }
+
+            return new TableSignature(path, size, lastModified);
+
+        } catch (Exception e) {
+            if (e instanceof IOException)
+                throw (IOException) e;
+            else
+                throw new IOException(e);
+        }
+    }
+
+    private String computeHDFSLocation() throws Exception {
+
+        String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable);
+        if (override != null) {
+            logger.debug("Override hive table location " + hiveTable + " -- " + override);
+            return override;
+        }
+
+        return getHiveClient().getHiveTableLocation(database, hiveTable);
+    }
+
+    public HiveClient getHiveClient() {
+
+        if (hiveClient == null) {
+            hiveClient = new HiveClient();
+        }
+        return hiveClient;
+    }
+
+    @Override
+    public String toString() {
+        return "hive: database=[" + database + "], table=[" + hiveTable + "]";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
new file mode 100644
index 0000000..35e24fe
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.transfer.DataTransferFactory;
+import org.apache.hive.hcatalog.data.transfer.HCatReader;
+import org.apache.hive.hcatalog.data.transfer.ReadEntity;
+import org.apache.hive.hcatalog.data.transfer.ReaderContext;
+import org.apache.kylin.source.ReadableTable.TableReader;
+
+/**
+ * An implementation of TableReader with HCatalog for Hive table.
+ */
+public class HiveTableReader implements TableReader {
+
+    private String dbName;
+    private String tableName;
+    private int currentSplit = -1;
+    private ReaderContext readCntxt = null;
+    private Iterator<HCatRecord> currentHCatRecordItr = null;
+    private HCatRecord currentHCatRecord;
+    private int numberOfSplits = 0;
+    private Map<String, String> partitionKV = null;
+
+    /**
+     * Constructor for reading whole hive table
+     * @param dbName
+     * @param tableName
+     * @throws IOException
+     */
+    public HiveTableReader(String dbName, String tableName) throws IOException {
+        this(dbName, tableName, null);
+    }
+
+    /**
+     * Constructor for reading a partition of the hive table
+     * @param dbName
+     * @param tableName
+     * @param partitionKV key-value pairs condition on the partition
+     * @throws IOException
+     */
+    public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV) throws IOException {
+        this.dbName = dbName;
+        this.tableName = tableName;
+        this.partitionKV = partitionKV;
+        initialize();
+    }
+
+    private void initialize() throws IOException {
+        try {
+            this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new IOException(e);
+        }
+
+        this.numberOfSplits = readCntxt.numSplits();
+
+//        HCatTableInfo tableInfo = HCatTableInfo.
+//        HCatSchema schema = HCatBaseInputFormat.getTableSchema(context.getConfiguration);
+    }
+
+    @Override
+    public boolean next() throws IOException {
+
+        while (currentHCatRecordItr == null || !currentHCatRecordItr.hasNext()) {
+            currentSplit++;
+            if (currentSplit == numberOfSplits) {
+                return false;
+            }
+
+            currentHCatRecordItr = loadHCatRecordItr(readCntxt, currentSplit);
+        }
+
+        currentHCatRecord = currentHCatRecordItr.next();
+
+        return true;
+    }
+
+    @Override
+    public String[] getRow() {
+        return getRowAsStringArray(currentHCatRecord);
+    }
+
+    public List<String> getRowAsList() {
+        return getRowAsList(currentHCatRecord);
+    }
+
+    public static List<String> getRowAsList(HCatRecord record, List<String> rowValues) {
+        List<Object> allFields = record.getAll();
+        for (Object o : allFields) {
+            rowValues.add((o == null) ? null : o.toString());
+        }
+        return rowValues;
+    }
+
+    public static List<String> getRowAsList(HCatRecord record) {
+        return Arrays.asList(getRowAsStringArray(record));
+    }
+
+    public static String[] getRowAsStringArray(HCatRecord record) {
+        String[] arr = new String[record.size()];
+        for (int i = 0; i < arr.length; i++) {
+            Object o = record.get(i);
+            arr[i] = (o == null) ? null : o.toString();
+        }
+        return arr;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.readCntxt = null;
+        this.currentHCatRecordItr = null;
+        this.currentHCatRecord = null;
+        this.currentSplit = -1;
+    }
+
+    public String toString() {
+        return "hive table reader for: " + dbName + "." + tableName;
+    }
+
+    private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
+        HiveConf hiveConf = new HiveConf(HiveTableReader.class);
+        Iterator<Entry<String, String>> itr = hiveConf.iterator();
+        Map<String, String> map = new HashMap<String, String>();
+        while (itr.hasNext()) {
+            Entry<String, String> kv = itr.next();
+            map.put(kv.getKey(), kv.getValue());
+        }
+
+        ReadEntity entity;
+        if (partitionKV == null || partitionKV.size() == 0) {
+            entity = new ReadEntity.Builder().withDatabase(database).withTable(table).build();
+        } else {
+            entity = new ReadEntity.Builder().withDatabase(database).withTable(table).withPartition(partitionKV).build();
+        }
+
+        HCatReader reader = DataTransferFactory.getHCatReader(entity, map);
+        ReaderContext cntxt = reader.prepareRead();
+
+        return cntxt;
+    }
+
+    private static Iterator<HCatRecord> loadHCatRecordItr(ReaderContext readCntxt, int dataSplit) throws HCatException {
+        HCatReader currentHCatReader = DataTransferFactory.getHCatReader(readCntxt, dataSplit);
+
+        return currentHCatReader.read();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
index 4265519..a9e95a4 100644
--- a/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
+++ b/job/src/main/java/org/apache/kylin/source/hive/HiveTableSource.java
@@ -19,7 +19,9 @@
 package org.apache.kylin.source.hive;
 
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.ITableSource;
+import org.apache.kylin.source.ReadableTable;
 
 public class HiveTableSource implements ITableSource {
 
@@ -33,4 +35,9 @@ public class HiveTableSource implements ITableSource {
         }
     }
 
+    @Override
+    public ReadableTable createReadableTable(TableDesc tableDesc) {
+        return new HiveTable(tableDesc);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 9693771..1f91b25 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -34,18 +34,28 @@
 
 package org.apache.kylin.job;
 
-import com.google.common.collect.Lists;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractKylinTestCase;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
-import org.apache.kylin.dict.lookup.HiveTableReader;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -58,6 +68,7 @@ import org.apache.kylin.job.hadoop.cube.StorageCleanupJob;
 import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.source.hive.HiveTableReader;
 import org.apache.kylin.streaming.StreamBuilder;
 import org.apache.kylin.streaming.StreamMessage;
 import org.apache.kylin.streaming.invertedindex.IIStreamConsumer;
@@ -68,16 +79,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
 
 /**
  */
@@ -141,11 +143,11 @@ public class BuildIIWithStreamTest {
         IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(desc);
         JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig);
         final String uuid = UUID.randomUUID().toString();
-        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, uuid);
-        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid, uuid);
+        final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
+        final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, jobEngineConfig.getHdfsWorkingDirectory() + "/kylin-" + uuid);
         String insertDataHqls;
         try {
-            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, uuid, jobEngineConfig);
+            insertDataHqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, jobEngineConfig);
         } catch (IOException e1) {
             e1.printStackTrace();
             throw new RuntimeException("Failed to generate insert data SQL for intermediate table.");
@@ -163,7 +165,7 @@ public class BuildIIWithStreamTest {
         logger.info(step.getCmd());
         step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
         kylinConfig.getCliCommandExecutor().execute(step.getCmd(), null);
-        return intermediateTableDesc.getTableName(uuid);
+        return intermediateTableDesc.getTableName();
     }
 
     private void clearSegment(String iiName) throws Exception {
@@ -194,10 +196,7 @@ public class BuildIIWithStreamTest {
         final IIDesc desc = iiManager.getII(iiName).getDescriptor();
         final String tableName = createIntermediateTable(desc, kylinConfig);
         logger.info("intermediate table name:" + tableName);
-        final Configuration conf = new Configuration();
-        HCatInputFormat.setInput(conf, "default", tableName);
-        final HCatSchema tableSchema = HCatInputFormat.getTableSchema(conf);
-        logger.info(StringUtils.join(tableSchema.getFieldNames(), "\n"));
+        
         HiveTableReader reader = new HiveTableReader("default", tableName);
         final List<TblColRef> tblColRefs = desc.listAllColumns();
         for (TblColRef tblColRef : tblColRefs) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 5886324..32e5aff 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -39,11 +39,11 @@ import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.TrieDictionary;
-import org.apache.kylin.dict.lookup.ReadableTable.TableSignature;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.source.ReadableTable.TableSignature;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
index 15fe340..21f6a71 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/hive/JoinedFlatTableTest.java
@@ -63,7 +63,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testGenCreateTableDDL() {
-        String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp", fakeJobUUID);
+        String ddl = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, "/tmp");
         System.out.println(ddl);
 
         System.out.println("The length for the ddl is " + ddl.length());
@@ -71,14 +71,14 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testGenDropTableDDL() {
-        String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, fakeJobUUID);
+        String ddl = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc);
         System.out.println(ddl);
         assertEquals(107, ddl.length());
     }
 
     @Test
     public void testGenerateInsertSql() throws IOException {
-        String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, fakeJobUUID, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+        String sqls = JoinedFlatTable.generateInsertDataStatement(intermediateTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
         System.out.println(sqls);
 
         int length = sqls.length();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
index 9a28ea4..811948b 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java
@@ -1,21 +1,19 @@
 package org.apache.kylin.job.hadoop.invertedindex;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
-import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
 import org.apache.kylin.common.util.FIFOIterable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
@@ -23,9 +21,11 @@ import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.invertedindex.model.*;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.cube.FactDistinctIIColumnsMapper;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodecWithState;
+import org.apache.kylin.invertedindex.model.IIRow;
+import org.apache.kylin.invertedindex.model.KeyValueCodec;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
@@ -41,16 +41,20 @@ import org.apache.kylin.storage.hbase.coprocessor.endpoint.ClearTextDictionary;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.EndpointAggregators;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint;
 import org.apache.kylin.storage.hbase.coprocessor.endpoint.generated.IIProtos;
-import org.apache.kylin.streaming.*;
+import org.apache.kylin.streaming.MicroStreamBatch;
+import org.apache.kylin.streaming.ParsedStreamMessage;
+import org.apache.kylin.streaming.StreamMessage;
+import org.apache.kylin.streaming.StreamParser;
+import org.apache.kylin.streaming.StringStreamParser;
 import org.apache.kylin.streaming.invertedindex.SliceBuilder;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  */
@@ -233,29 +237,4 @@ public class IITest extends LocalFileMetadataTestCase {
         }
     }
 
-    @Test
-    public void factDistinctIIColumnsMapperTest() throws IOException {
-        MapDriver<ImmutableBytesWritable, Result, LongWritable, Text> mapDriver;
-        FactDistinctIIColumnsMapper mapper = new FactDistinctIIColumnsMapper();
-        mapDriver = MapDriver.newMapDriver(mapper);
-
-        mapDriver.getConfiguration().set(BatchConstants.CFG_II_NAME, iiName);
-        mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-        mapDriver.getConfiguration().setStrings("io.serializations", mapDriver.getConfiguration().get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName());
-        mapDriver.addAll(Lists.newArrayList(Collections2.transform(iiRows, new Function<IIRow, Pair<ImmutableBytesWritable, Result>>() {
-            @Nullable
-            @Override
-            public Pair<ImmutableBytesWritable, Result> apply(@Nullable IIRow input) {
-                return new Pair<ImmutableBytesWritable, Result>(new ImmutableBytesWritable(new byte[] { 1 }), Result.create(input.makeCells()));
-            }
-        })));
-
-        List<Pair<LongWritable, Text>> result = mapDriver.run();
-        Set<String> lstgNames = Sets.newHashSet("FP-non GTC", "ABIN");
-        for (Pair<LongWritable, Text> pair : result) {
-            Assert.assertEquals(pair.getFirst().get(), 6);
-            Assert.assertTrue(lstgNames.contains(pair.getSecond().toString()));
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java
deleted file mode 100644
index c9ef366..0000000
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityJobTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.kylin.job.hadoop.cardinality.HiveColumnCardinalityJob;
-
-/**
- * @author ysong1
- * 
- */
-@Ignore("This test is invalid now as the mapper uses HCatalog to fetch the data which need a hive env")
-public class ColumnCardinalityJobTest {
-
-    private Configuration conf;
-
-    @Before
-    public void setup() throws IOException {
-        conf = new Configuration();
-        conf.set("fs.default.name", "file:///");
-        conf.set("mapred.job.tracker", "local");
-    }
-
-    @Test
-    @Ignore("not maintaining")
-    public void testJob() throws Exception {
-        final String input = "src/test/resources/data/test_cal_dt/";
-        final String output = "target/test-output/column-cardinality/";
-
-        FileUtil.fullyDelete(new File(output));
-
-        String[] args = { "-input", input, "-output", output, "-cols", "1,2,3,4,5,6,9,0" };
-        assertEquals("Job failed", 0, ToolRunner.run(new HiveColumnCardinalityJob(), args));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
deleted file mode 100644
index e13289a..0000000
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.job.hadoop.cardinality.ColumnCardinalityMapper;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- * @author ysong1
- * 
- */
-@Ignore("This test is invalid now as the mapper uses HCatalog to fetch the data which need a hive env")
-public class ColumnCardinalityMapperTest {
-
-    @SuppressWarnings("rawtypes")
-    MapDriver mapDriver;
-    String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    @Before
-    public void setUp() {
-        ColumnCardinalityMapper mapper = new ColumnCardinalityMapper();
-        mapDriver = MapDriver.newMapDriver(mapper);
-    }
-
-    public final static String strArr = "abc,tests,test,test,as,sts,test,tss,sets";
-
-    @SuppressWarnings({ "unchecked" })
-    @Test
-    @Ignore
-    public void testMapperOn177() throws IOException {
-        mapDriver.clearInput();
-        File file = new File("src/test/resources/data/test_cal_dt/part-r-00000");
-        FileReader reader = new FileReader(file);
-        BufferedReader breader = new BufferedReader(reader);
-        String s = breader.readLine();
-        int i = 0;
-        while (s != null) {
-            LongWritable inputKey = new LongWritable(i++);
-            mapDriver.addInput(inputKey, new Text(s));
-            s = breader.readLine();
-        }
-        // breader.close();
-        List<Pair<IntWritable, BytesWritable>> result = mapDriver.run();
-        breader.close();
-        assertEquals(9, result.size());
-
-        int key1 = result.get(0).getFirst().get();
-        BytesWritable value1 = result.get(0).getSecond();
-        byte[] bytes = value1.getBytes();
-        HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
-        hllc.readRegisters(ByteBuffer.wrap(bytes));
-        assertTrue(key1 > 0);
-        assertEquals(8, hllc.getCountEstimate());
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void testMapperOnComma() throws IOException {
-        mapDriver.clearInput();
-        LongWritable inputKey1 = new LongWritable(1);
-        LongWritable inputKey2 = new LongWritable(2);
-        LongWritable inputKey3 = new LongWritable(3);
-        LongWritable inputKey4 = new LongWritable(4);
-        LongWritable inputKey5 = new LongWritable(5);
-        LongWritable inputKey6 = new LongWritable(6);
-        LongWritable inputKey7 = new LongWritable(7);
-
-        mapDriver.addInput(inputKey1, new Text());
-        mapDriver.addInput(inputKey2, new Text(strArr));
-        mapDriver.addInput(inputKey3, new Text(strArr));
-        mapDriver.addInput(inputKey4, new Text(strArr));
-        mapDriver.addInput(inputKey5, new Text(strArr));
-        mapDriver.addInput(inputKey6, new Text(strArr));
-        mapDriver.addInput(inputKey7, new Text(strArr));
-
-        List<Pair<IntWritable, BytesWritable>> result = mapDriver.run();
-
-        assertEquals(9, result.size());
-
-        int key1 = result.get(0).getFirst().get();
-        BytesWritable value1 = result.get(0).getSecond();
-        byte[] bytes = value1.getBytes();
-        HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
-        hllc.readRegisters(ByteBuffer.wrap(bytes));
-        System.out.println("ab\177ab".length());
-        assertTrue(key1 > 0);
-        assertEquals(1, hllc.getCountEstimate());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
index 2c01fb6..867ee82 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
@@ -46,6 +46,8 @@ import org.apache.kylin.cube.kv.RowConstants;
  * 
  */
 public class ColumnCardinalityReducerTest {
+    
+    public final static String strArr = "abc,tests,test,test,as,sts,test,tss,sets";
 
     ReduceDriver<IntWritable, BytesWritable, IntWritable, LongWritable> reduceDriver;
     String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
@@ -76,23 +78,23 @@ public class ColumnCardinalityReducerTest {
     public void testReducer() throws IOException {
         IntWritable key1 = new IntWritable(1);
         List<BytesWritable> values1 = new ArrayList<BytesWritable>();
-        values1.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr)));
+        values1.add(new BytesWritable(getBytes(strArr)));
 
         IntWritable key2 = new IntWritable(2);
         List<BytesWritable> values2 = new ArrayList<BytesWritable>();
-        values2.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " x")));
+        values2.add(new BytesWritable(getBytes(strArr + " x")));
 
         IntWritable key3 = new IntWritable(3);
         List<BytesWritable> values3 = new ArrayList<BytesWritable>();
-        values3.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " xx")));
+        values3.add(new BytesWritable(getBytes(strArr + " xx")));
 
         IntWritable key4 = new IntWritable(4);
         List<BytesWritable> values4 = new ArrayList<BytesWritable>();
-        values4.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " xxx")));
+        values4.add(new BytesWritable(getBytes(strArr + " xxx")));
 
         IntWritable key5 = new IntWritable(5);
         List<BytesWritable> values5 = new ArrayList<BytesWritable>();
-        values5.add(new BytesWritable(getBytes(ColumnCardinalityMapperTest.strArr + " xxxx")));
+        values5.add(new BytesWritable(getBytes(strArr + " xxxx")));
 
         reduceDriver.withInput(key1, values1);
         reduceDriver.withInput(key2, values2);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
new file mode 100644
index 0000000..de6df2c
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
+
+    @Before
+    public void setup() throws Exception {
+        super.createTestMetadata();
+    }
+
+    @After
+    public  void after() throws Exception {
+        super.cleanupTestMetadata();
+    }
+
+    @Test
+    public void test() throws IOException {
+        if (!useSandbox())
+            return;
+
+        KylinConfig config = getTestConfig();
+        String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
+        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+
+        assertTrue(loaded.size() == toLoad.length);
+        for (String str : toLoad)
+            assertTrue(loaded.contains(str));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d109e27b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
new file mode 100644
index 0000000..624f158
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.source.hive;
+
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.source.hive.HiveTableReader;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * This test case need the hive runtime; Please run it with sandbox;
+ * @author shaoshi
+ *
+ * It is in the exclude list of default profile in pom.xml
+ */
+public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
+
+
+    @Test
+    public void test() throws IOException {
+        HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
+        int rowNumber = 0;
+        while (reader.next()) {
+            String[] row = reader.getRow();
+            Assert.assertEquals(9, row.length);
+            //System.out.println(ArrayUtils.toString(row));
+            rowNumber++;
+        }
+
+        reader.close();
+        Assert.assertEquals(10000, rowNumber);
+    }
+}