You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 09:13:11 UTC

[2/4] incubator-kylin git commit: KYLIN-1112 Reorganize InvertedIndex source codes into plug-in architecture

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
deleted file mode 100644
index 042678e..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsJob extends AbstractHadoopJob {
-    protected static final Logger logger = LoggerFactory.getLogger(IIDistinctColumnsJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_TABLE_NAME);
-            options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            String tableName = getOptionValue(OPTION_TABLE_NAME).toUpperCase();
-            String iiName = getOptionValue(OPTION_II_NAME);
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            // ----------------------------------------------------------------------------
-
-            logger.info("Starting: " + job.getJobName() + " on table " + tableName);
-
-            IIManager iiMgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-            IIInstance ii = iiMgr.getII(iiName);
-            job.getConfiguration().set(BatchConstants.TABLE_NAME, tableName);
-            job.getConfiguration().set(BatchConstants.TABLE_COLUMNS, getColumns(ii));
-
-            setJobClasspath(job);
-
-            setupMapper();
-            setupReducer(output);
-
-            return waitForCompletion(job);
-
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-
-    }
-
-    private String getColumns(IIInstance ii) {
-        IIJoinedFlatTableDesc iiflat = new IIJoinedFlatTableDesc(ii.getDescriptor());
-        StringBuilder buf = new StringBuilder();
-        for (IntermediateColumnDesc col : iiflat.getColumnList()) {
-            if (buf.length() > 0)
-                buf.append(",");
-            buf.append(col.getColumnName());
-        }
-        return buf.toString();
-    }
-
-    private void setupMapper() throws IOException {
-
-        String tableName = job.getConfiguration().get(BatchConstants.TABLE_NAME);
-        String[] dbTableNames = HadoopUtil.parseHiveTableName(tableName);
-
-        logger.info("setting hcat input format, db name {} , table name {}", dbTableNames[0], dbTableNames[1]);
-
-        HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-
-        job.setInputFormatClass(HCatInputFormat.class);
-
-        job.setMapperClass(IIDistinctColumnsMapper.class);
-        job.setCombinerClass(IIDistinctColumnsCombiner.class);
-        job.setMapOutputKeyClass(ShortWritable.class);
-        job.setMapOutputValueClass(Text.class);
-    }
-
-    private void setupReducer(Path output) throws IOException {
-        job.setReducerClass(IIDistinctColumnsReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(NullWritable.class);
-        job.setOutputValueClass(Text.class);
-
-        FileOutputFormat.setOutputPath(job, output);
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
-        job.setNumReduceTasks(1);
-
-        deletePath(job.getConfiguration(), output);
-    }
-
-    public static void main(String[] args) throws Exception {
-        IIDistinctColumnsJob job = new IIDistinctColumnsJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
deleted file mode 100644
index 3418a57..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsMapper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinMapper;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ShortWritable, Text> {
-
-    private ShortWritable outputKey = new ShortWritable();
-    private Text outputValue = new Text();
-    private HCatSchema schema = null;
-    private int columnSize = 0;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
-        columnSize = schema.getFields().size();
-    }
-
-    @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-
-        HCatFieldSchema fieldSchema = null;
-        for (short i = 0; i < columnSize; i++) {
-            outputKey.set(i);
-            fieldSchema = schema.get(i);
-            Object fieldValue = record.get(fieldSchema.getName(), schema);
-            if (fieldValue == null)
-                continue;
-            byte[] bytes = Bytes.toBytes(fieldValue.toString());
-            outputValue.set(bytes, 0, bytes.length);
-            context.write(outputKey, outputValue);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
deleted file mode 100644
index fcb4dd5..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-
-/**
- * @author yangli9
- */
-public class IIDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
-
-    private String[] columns;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-        this.columns = conf.get(BatchConstants.TABLE_COLUMNS).split(",");
-    }
-
-    @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-        String columnName = columns[key.get()];
-
-        HashSet<ByteArray> set = new HashSet<ByteArray>();
-        for (Text textValue : values) {
-            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
-            set.add(value);
-        }
-
-        Configuration conf = context.getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
-        FSDataOutputStream out = fs.create(new Path(outputPath, columnName));
-
-        try {
-            for (ByteArray value : set) {
-                out.write(value.array(), value.offset(), value.length());
-                out.write('\n');
-            }
-        } finally {
-            out.close();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
deleted file mode 100644
index c9ad448..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexJob extends AbstractHadoopJob {
-    protected static final Logger logger = LoggerFactory.getLogger(InvertedIndexJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_TABLE_NAME);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            String iiname = getOptionValue(OPTION_II_NAME);
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            // ----------------------------------------------------------------------------
-
-            System.out.println("Starting: " + job.getJobName());
-
-            IIInstance ii = getII(iiname);
-            short sharding = ii.getDescriptor().getSharding();
-
-            setJobClasspath(job);
-
-            setupMapper(intermediateTable);
-            setupReducer(output, sharding);
-            attachMetadata(ii);
-
-            return waitForCompletion(job);
-
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        } finally {
-            if (job != null)
-                cleanupTempConfFile(job.getConfiguration());
-        }
-
-    }
-
-    private IIInstance getII(String iiName) {
-        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-        IIInstance ii = mgr.getII(iiName);
-        if (ii == null)
-            throw new IllegalArgumentException("No Inverted Index found by name " + iiName);
-        return ii;
-    }
-
-    private void attachMetadata(IIInstance ii) throws IOException {
-
-        Configuration conf = job.getConfiguration();
-        attachKylinPropsAndMetadata(ii, conf);
-
-        IISegment seg = ii.getFirstSegment();
-        conf.set(BatchConstants.CFG_II_NAME, ii.getName());
-        conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
-    }
-
-    protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
-        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-        // write II / model_desc / II_desc / dict / table
-        ArrayList<String> dumpList = new ArrayList<String>();
-        dumpList.add(ii.getResourcePath());
-        dumpList.add(ii.getDescriptor().getModel().getResourcePath());
-        dumpList.add(ii.getDescriptor().getResourcePath());
-
-        for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = metaMgr.getTableDesc(tableName);
-            dumpList.add(table.getResourcePath());
-        }
-        for (IISegment segment : ii.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
-        }
-
-        attachKylinPropsAndMetadata(dumpList, conf);
-    }
-
-    private void setupMapper(String intermediateTable) throws IOException {
-
-        String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-        HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-
-        job.setInputFormatClass(HCatInputFormat.class);
-
-        job.setMapperClass(InvertedIndexMapper.class);
-        job.setMapOutputKeyClass(LongWritable.class);
-        job.setMapOutputValueClass(ImmutableBytesWritable.class);
-        job.setPartitionerClass(InvertedIndexPartitioner.class);
-    }
-
-    private void setupReducer(Path output, short sharding) throws IOException {
-        job.setReducerClass(InvertedIndexReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(ImmutableBytesWritable.class);
-
-        job.setNumReduceTasks(sharding);
-
-        FileOutputFormat.setOutputPath(job, output);
-
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
-        deletePath(job.getConfiguration(), output);
-    }
-
-    public static void main(String[] args) throws Exception {
-        InvertedIndexJob job = new InvertedIndexJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
deleted file mode 100644
index bc43b65..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, LongWritable, ImmutableBytesWritable> {
-
-    private TableRecordInfo info;
-    private TableRecord rec;
-
-    private LongWritable outputKey;
-    private ImmutableBytesWritable outputValue;
-    private HCatSchema schema = null;
-    private List<HCatFieldSchema> fields;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-        IIManager mgr = IIManager.getInstance(config);
-        IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
-        this.info = new TableRecordInfo(seg);
-        this.rec = this.info.createTableRecord();
-
-        outputKey = new LongWritable();
-        outputValue = new ImmutableBytesWritable(rec.getBytes());
-
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
-
-        fields = schema.getFields();
-    }
-
-    @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-
-        rec.reset();
-        for (int i = 0; i < fields.size(); i++) {
-            Object fieldValue = record.get(i);
-            rec.setValueString(i, fieldValue == null ? null : fieldValue.toString());
-        }
-
-        outputKey.set(rec.getTimestamp());
-        // outputValue's backing bytes array is the same as rec
-
-        context.write(outputKey, outputValue);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
deleted file mode 100644
index 396c221..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable {
-
-    private Configuration conf;
-    private TableRecordInfo info;
-    private TableRecord rec;
-
-    @Override
-    public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) {
-        rec.setBytes(value.get(), value.getOffset(), value.getLength());
-        return rec.getShard();
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-        try {
-            KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-            IIManager mgr = IIManager.getInstance(config);
-            IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-            IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
-            this.info = new TableRecordInfo(seg);
-            this.rec = this.info.createTableRecord();
-        } catch (IOException e) {
-            throw new RuntimeException("", e);
-        }
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
deleted file mode 100644
index 5a69eec..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.IncrementalSliceMaker;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
-
-    private TableRecordInfo info;
-    private TableRecord rec;
-    private IncrementalSliceMaker builder;
-    private IIKeyValueCodec kv;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-        IIManager mgr = IIManager.getInstance(config);
-        IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
-        info = new TableRecordInfo(seg);
-        rec = info.createTableRecord();
-        builder = null;
-        kv = new IIKeyValueCodec(info.getDigest());
-    }
-
-    @Override
-    public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
-            throws IOException, InterruptedException {
-        for (ImmutableBytesWritable v : values) {
-            rec.setBytes(v.get(), v.getOffset(), v.getLength());
-
-            if (builder == null) {
-                builder = new IncrementalSliceMaker(info, rec.getShard());
-            }
-
-            //TODO: to delete this log
-            System.out.println(rec.getShard() + " - " + rec);
-
-            Slice slice = builder.append(rec);
-            if (slice != null) {
-                output(slice, context);
-            }
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        Slice slice = builder.close();
-        if (slice != null) {
-            output(slice, context);
-        }
-    }
-
-    private void output(Slice slice, Context context) throws IOException, InterruptedException {
-        for (IIRow pair : kv.encodeKeyValue(slice)) {
-            context.write(pair.getKey(), pair.getValue());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
deleted file mode 100644
index 0af846b..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.invertedindex;
-
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-/**
- */
-public class IIJob extends DefaultChainedExecutable {
-
-    public IIJob() {
-        super();
-    }
-
-    private static final String II_INSTANCE_NAME = "iiName";
-    private static final String SEGMENT_ID = "segmentId";
-
-    void setIIName(String name) {
-        setParam(II_INSTANCE_NAME, name);
-    }
-
-    public String getIIName() {
-        return getParam(II_INSTANCE_NAME);
-    }
-
-    void setSegmentId(String segmentId) {
-        setParam(SEGMENT_ID, segmentId);
-    }
-
-    public String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
deleted file mode 100644
index 4bd06c5..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.invertedindex;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
-import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.hadoop.dict.CreateInvertedIndexDictionaryJob;
-import org.apache.kylin.job.hadoop.invertedindex.IIBulkLoadJob;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHFileJob;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
-import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-import org.apache.kylin.source.hive.HiveMRInput.BatchCubingInputSide;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public final class IIJobBuilder {
-
-    final JobEngineConfig engineConfig;
-
-    public IIJobBuilder(JobEngineConfig engineConfig) {
-        this.engineConfig = engineConfig;
-    }
-
-    public IIJob buildJob(IISegment seg, String submitter) {
-        checkPreconditions(seg);
-
-        IIJob result = initialJob(seg, "BUILD", submitter);
-        final String jobId = result.getId();
-        final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
-        final String intermediateTableIdentity = getIntermediateTableIdentity(intermediateTableDesc);
-        final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
-        final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
-        final String iiPath = iiRootPath + "*";
-
-        final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId);
-        result.addTask(intermediateHiveTableStep);
-
-        result.addTask(createFactDistinctColumnsStep(seg, intermediateTableIdentity, jobId, factDistinctColumnsPath));
-
-        result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-
-        result.addTask(createInvertedIndexStep(seg, intermediateTableIdentity, iiRootPath));
-
-        // create htable step
-        result.addTask(createCreateHTableStep(seg));
-
-        // generate hfiles step
-        result.addTask(createConvertToHfileStep(seg, iiPath, jobId));
-
-        // bulk load step
-        result.addTask(createBulkLoadStep(seg, jobId));
-
-        return result;
-    }
-
-    private AbstractExecutable createFlatHiveTableStep(IIJoinedFlatTableDesc intermediateTableDesc, String jobId) {
-        return BatchCubingInputSide.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
-    }
-
-    private IIJob initialJob(IISegment seg, String type, String submitter) {
-        IIJob result = new IIJob();
-        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
-        format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
-        result.setIIName(seg.getIIInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
-        result.setSubmitter(submitter);
-        return result;
-    }
-
-    private void checkPreconditions(IISegment seg) {
-        Preconditions.checkNotNull(seg, "segment cannot be null");
-        Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
-    }
-
-    private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) {
-        try {
-            String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM);
-            if (jobConf != null && jobConf.length() > 0) {
-                builder.append(" -conf ").append(jobConf);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) {
-        return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns";
-    }
-
-    private String getHFilePath(IISegment seg, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/";
-    }
-
-    private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(IIDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, engineConfig);
-        appendExecCmdParameters(cmd, "tablename", factTableName);
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "output", output);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step");
-
-        result.setMapReduceParams(cmd.toString());
-        return result;
-    }
-
-    private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
-    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
-        // base cuboid job
-        MapReduceExecutable buildIIStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, engineConfig);
-
-        buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
-
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
-        appendExecCmdParameters(cmd, "output", iiOutputTempPath);
-        appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
-
-        buildIIStep.setMapReduceParams(cmd.toString());
-        buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
-        return buildIIStep;
-    }
-
-    private HadoopShellExecutable createCreateHTableStep(IISegment seg) {
-        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
-        createHtableStep.setJobParams(cmd.toString());
-        createHtableStep.setJobClass(IICreateHTableJob.class);
-
-        return createHtableStep;
-    }
-
-    private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) {
-        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, engineConfig);
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step");
-
-        createHFilesStep.setMapReduceParams(cmd.toString());
-        createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
-
-        return createHFilesStep;
-    }
-
-    private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) {
-        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-
-        bulkLoadStep.setJobParams(cmd.toString());
-        bulkLoadStep.setJobClass(IIBulkLoadJob.class);
-
-        return bulkLoadStep;
-
-    }
-
-    private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
-        return buf.append(" -").append(paraName).append(" ").append(paraValue);
-    }
-
-    private String getJobWorkingDir(String uuid) {
-        return engineConfig.getHdfsWorkingDirectory() + "kylin-" + uuid;
-    }
-
-    private String getIntermediateTableIdentity(IIJoinedFlatTableDesc intermediateTableDesc) {
-        return engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + "." + intermediateTableDesc.getTableName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/invertedindex/src/main/java/org/apache/kylin/job/tools/IICLI.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/job/tools/IICLI.java b/invertedindex/src/main/java/org/apache/kylin/job/tools/IICLI.java
deleted file mode 100644
index 8c39aa1..0000000
--- a/invertedindex/src/main/java/org/apache/kylin/job/tools/IICLI.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.RawTableRecord;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.invertedindex.model.IIRow;
-
-/**
- * @author yangli9
- */
-public class IICLI {
-
-    public static void main(String[] args) throws IOException {
-        Configuration hconf = HadoopUtil.getCurrentConfiguration();
-        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-        String iiName = args[0];
-        IIInstance ii = mgr.getII(iiName);
-
-        String path = args[1];
-        System.out.println("Reading from " + path + " ...");
-
-        TableRecordInfo info = new TableRecordInfo(ii.getFirstSegment());
-        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
-        int count = 0;
-        for (Slice slice : codec.decodeKeyValue(readSequenceKVs(hconf, path))) {
-            for (RawTableRecord rec : slice) {
-                System.out.printf(new TableRecord(rec, info).toString());
-                count++;
-            }
-        }
-        System.out.println("Total " + count + " records");
-    }
-
-    public static Iterable<IIRow> readSequenceKVs(Configuration hconf, String path) throws IOException {
-        final Reader reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
-        return new Iterable<IIRow>() {
-            @Override
-            public Iterator<IIRow> iterator() {
-                return new Iterator<IIRow>() {
-                    ImmutableBytesWritable k = new ImmutableBytesWritable();
-                    ImmutableBytesWritable v = new ImmutableBytesWritable();
-                    IIRow pair = new IIRow(k, v, null);
-
-                    @Override
-                    public boolean hasNext() {
-                        boolean hasNext = false;
-                        try {
-                            hasNext = reader.next(k, v);
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        } finally {
-                            if (hasNext == false) {
-                                IOUtils.closeQuietly(reader);
-                            }
-                        }
-                        return hasNext;
-                    }
-
-                    @Override
-                    public IIRow next() {
-                        return pair;
-                    }
-
-                    @Override
-                    public void remove() {
-                        throw new UnsupportedOperationException();
-                    }
-                };
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/jdbc/kylin_jdbc.log.2014-12-22
----------------------------------------------------------------------
diff --git a/jdbc/kylin_jdbc.log.2014-12-22 b/jdbc/kylin_jdbc.log.2014-12-22
deleted file mode 100644
index 7c5e001..0000000
--- a/jdbc/kylin_jdbc.log.2014-12-22
+++ /dev/null
@@ -1,18 +0,0 @@
-[main]:[2014-12-22 11:47:05,477][DEBUG][com.kylinolap.jdbc.KylinConnectionImpl.<init>(KylinConnectionImpl.java:68)] - Kylin base url test_url, project name test_db
-[main]:[2014-12-22 11:47:05,486][DEBUG][com.kylinolap.jdbc.Driver$1.onConnectionInit(Driver.java:111)] - Connection inited.
-[main]:[2014-12-22 11:47:05,487][DEBUG][com.kylinolap.jdbc.KylinMetaImpl.getTables(KylinMetaImpl.java:108)] - Get tables with conn KylinConnectionImpl [baseUrl=test_url, project=test_db, metaProject=com.kylinolap.jdbc.KylinMetaImpl$MetaProject@21300853]
-[main]:[2014-12-22 11:47:05,487][DEBUG][com.kylinolap.jdbc.KylinMetaImpl$MetaProject.getMetaTables(KylinMetaImpl.java:489)] - getMetaTables with catalog:null, schema:null, table:null
-[main]:[2014-12-22 11:47:05,534][DEBUG][com.kylinolap.jdbc.KylinConnectionImpl.<init>(KylinConnectionImpl.java:68)] - Kylin base url test_url, project name test_db
-[main]:[2014-12-22 11:47:05,534][DEBUG][com.kylinolap.jdbc.Driver$1.onConnectionInit(Driver.java:111)] - Connection inited.
-[main]:[2014-12-22 15:57:49,077][DEBUG][com.kylinolap.jdbc.KylinConnectionImpl.<init>(KylinConnectionImpl.java:68)] - Kylin base url test_url, project name test_db
-[main]:[2014-12-22 15:57:49,086][DEBUG][com.kylinolap.jdbc.Driver$1.onConnectionInit(Driver.java:111)] - Connection inited.
-[main]:[2014-12-22 15:57:49,087][DEBUG][com.kylinolap.jdbc.KylinMetaImpl.getTables(KylinMetaImpl.java:108)] - Get tables with conn KylinConnectionImpl [baseUrl=test_url, project=test_db, metaProject=com.kylinolap.jdbc.KylinMetaImpl$MetaProject@3add750e]
-[main]:[2014-12-22 15:57:49,087][DEBUG][com.kylinolap.jdbc.KylinMetaImpl$MetaProject.getMetaTables(KylinMetaImpl.java:489)] - getMetaTables with catalog:null, schema:null, table:null
-[main]:[2014-12-22 15:57:49,134][DEBUG][com.kylinolap.jdbc.KylinConnectionImpl.<init>(KylinConnectionImpl.java:68)] - Kylin base url test_url, project name test_db
-[main]:[2014-12-22 15:57:49,135][DEBUG][com.kylinolap.jdbc.Driver$1.onConnectionInit(Driver.java:111)] - Connection inited.
-[main]:[2014-12-22 17:13:45,081][DEBUG][com.kylinolap.jdbc.KylinConnectionImpl.<init>(KylinConnectionImpl.java:68)] - Kylin base url test_url, project name test_db
-[main]:[2014-12-22 17:13:45,090][DEBUG][com.kylinolap.jdbc.Driver$1.onConnectionInit(Driver.java:111)] - Connection inited.
-[main]:[2014-12-22 17:13:45,091][DEBUG][com.kylinolap.jdbc.KylinMetaImpl.getTables(KylinMetaImpl.java:108)] - Get tables with conn KylinConnectionImpl [baseUrl=test_url, project=test_db, metaProject=com.kylinolap.jdbc.KylinMetaImpl$MetaProject@21300853]
-[main]:[2014-12-22 17:13:45,091][DEBUG][com.kylinolap.jdbc.KylinMetaImpl$MetaProject.getMetaTables(KylinMetaImpl.java:489)] - getMetaTables with catalog:null, schema:null, table:null
-[main]:[2014-12-22 17:13:45,141][DEBUG][com.kylinolap.jdbc.KylinConnectionImpl.<init>(KylinConnectionImpl.java:68)] - Kylin base url test_url, project name test_db
-[main]:[2014-12-22 17:13:45,141][DEBUG][com.kylinolap.jdbc.Driver$1.onConnectionInit(Driver.java:111)] - Connection inited.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/server/pom.xml
----------------------------------------------------------------------
diff --git a/server/pom.xml b/server/pom.xml
index d51638f..f2f9e32 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -58,6 +58,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-source-hive</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-source-kafka</artifactId>
             <version>${project.parent.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 4571852..1eb2683 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -32,6 +32,8 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -43,11 +45,12 @@ import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
 
 public class HiveMRInput implements IMRInput {
 
     @Override
-    public IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
+    public IMRBatchCubingInputSide getBatchCubingInputSide(IRealizationSegment seg) {
         return new BatchCubingInputSide(seg);
     }
 
@@ -90,13 +93,13 @@ public class HiveMRInput implements IMRInput {
     public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
 
         final JobEngineConfig conf;
-        final CubeSegment seg;
-        final CubeJoinedFlatTableDesc flatHiveTableDesc;
+        final IRealizationSegment seg;
+        final IJoinedFlatTableDesc flatHiveTableDesc;
 
-        public BatchCubingInputSide(CubeSegment seg) {
+        public BatchCubingInputSide(IRealizationSegment seg) {
             this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
             this.seg = seg;
-            this.flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+            this.flatHiveTableDesc = seg.getJoinedFlatTableDesc();
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
new file mode 100644
index 0000000..8cf921a
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IIBulkLoadJob.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+
+/**
+ */
+public class IIBulkLoadJob extends AbstractHadoopJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_HTABLE_NAME);
+            options.addOption(OPTION_II_NAME);
+            parseOptions(options, args);
+
+            String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
+            String input = getOptionValue(OPTION_INPUT_PATH);
+
+            Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+            FileSystem fs = FileSystem.get(conf);
+
+            Path columnFamilyPath = new Path(input, IIDesc.HBASE_FAMILY);
+
+            // File may have already been auto-loaded (in the case of MapR DB)
+            if (fs.exists(columnFamilyPath)) {
+                FsPermission permission = new FsPermission((short) 0777);
+                fs.setPermission(columnFamilyPath, permission);
+            }
+
+            return ToolRunner.run(new LoadIncrementalHFiles(conf), new String[] { input, tableName });
+
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileJob.java
new file mode 100644
index 0000000..4781f2b
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileJob.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class IICreateHFileJob extends AbstractHadoopJob {
+
+    protected static final Logger logger = LoggerFactory.getLogger(IICreateHFileJob.class);
+
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_II_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_HTABLE_NAME);
+            parseOptions(options, args);
+
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+
+            setJobClasspath(job);
+
+            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
+            FileOutputFormat.setOutputPath(job, output);
+
+            job.setInputFormatClass(SequenceFileInputFormat.class);
+            job.setMapperClass(IICreateHFileMapper.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+            job.setMapOutputValueClass(KeyValue.class);
+
+            String tableName = getOptionValue(OPTION_HTABLE_NAME);
+            HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName);
+            HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+            this.deletePath(job.getConfiguration(), output);
+
+            return waitForCompletion(job);
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
new file mode 100644
index 0000000..e4b688f
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.invertedindex.model.IIDesc;
+
+/**
+ * @author yangli9
+ */
+public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable, KeyValue> {
+
+    long timestamp;
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+
+        timestamp = System.currentTimeMillis();
+    }
+
+    @Override
+    protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
+
+        KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
+                IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
+                IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
+                timestamp, Type.Put, //
+                value.get(), value.getOffset(), value.getLength());
+
+        context.write(key, kv);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
new file mode 100644
index 0000000..0a72a91
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.storage.hbase.util.IIDeployCoprocessorCLI;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class IICreateHTableJob extends AbstractHadoopJob {
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_II_NAME);
+            options.addOption(OPTION_HTABLE_NAME);
+            parseOptions(options, args);
+
+            String tableName = getOptionValue(OPTION_HTABLE_NAME);
+            String iiName = getOptionValue(OPTION_II_NAME);
+
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            IIManager iiManager = IIManager.getInstance(config);
+            IIInstance ii = iiManager.getII(iiName);
+            int sharding = ii.getDescriptor().getSharding();
+
+            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
+            HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
+            cf.setMaxVersions(1);
+
+            String hbaseDefaultCC = config.getHbaseDefaultCompressionCodec().toLowerCase();
+
+            switch (hbaseDefaultCC) {
+            case "snappy": {
+                logger.info("hbase will use snappy to compress data");
+                cf.setCompressionType(Compression.Algorithm.SNAPPY);
+                break;
+            }
+            case "lzo": {
+                logger.info("hbase will use lzo to compress data");
+                cf.setCompressionType(Compression.Algorithm.LZO);
+                break;
+            }
+            case "gz":
+            case "gzip": {
+                logger.info("hbase will use gzip to compress data");
+                cf.setCompressionType(Compression.Algorithm.GZ);
+                break;
+            }
+            case "lz4": {
+                logger.info("hbase will use lz4 to compress data");
+                cf.setCompressionType(Compression.Algorithm.LZ4);
+                break;
+            }
+            default: {
+                logger.info("hbase will not user any compression codec to compress data");
+            }
+            }
+
+            cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
+            tableDesc.addFamily(cf);
+            tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix());
+            tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
+            tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
+
+            Configuration conf = HBaseConfiguration.create(getConf());
+            if (User.isHBaseSecurityEnabled(conf)) {
+                // add coprocessor for bulk load
+                tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+            }
+
+            IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
+
+            // drop the table first
+            HBaseAdmin admin = new HBaseAdmin(conf);
+            if (admin.tableExists(tableName)) {
+                admin.disableTable(tableName);
+                admin.deleteTable(tableName);
+            }
+
+            // create table
+            byte[][] splitKeys = getSplits(sharding);
+            if (splitKeys.length == 0)
+                splitKeys = null;
+            admin.createTable(tableDesc, splitKeys);
+            if (splitKeys != null) {
+                for (int i = 0; i < splitKeys.length; i++) {
+                    System.out.println("split key " + i + ": " + BytesUtil.toHex(splitKeys[i]));
+                }
+            }
+            System.out.println("create hbase table " + tableName + " done.");
+            admin.close();
+
+            return 0;
+        } catch (Exception e) {
+            printUsage(options);
+            throw e;
+        }
+    }
+
+    //one region for one shard
+    private byte[][] getSplits(int shard) {
+        byte[][] result = new byte[shard - 1][];
+        for (int i = 1; i < shard; ++i) {
+            byte[] split = new byte[IIKeyValueCodec.SHARD_LEN];
+            BytesUtil.writeUnsigned(i, split, 0, IIKeyValueCodec.SHARD_LEN);
+            result[i - 1] = split;
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
index c634a1d..ff8b659 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.steps;
 
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMROutput;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public class HBaseMROutput implements IMROutput {
@@ -42,6 +43,23 @@ public class HBaseMROutput implements IMROutput {
     }
 
     @Override
+    public IMRBatchInvertedIndexingOutputSide getBatchInvertedIndexingOutputSide(final IISegment seg) {
+        return new IMRBatchInvertedIndexingOutputSide() {
+            HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+            @Override
+            public void addStepPhase3_BuildII(DefaultChainedExecutable jobFlow, String rootPath) {
+                steps.addSaveIIToHTableSteps(jobFlow, rootPath);
+            }
+
+            @Override
+            public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                steps.addCubingGarbageCollectionSteps(jobFlow);
+            }
+        };
+    }
+
+    @Override
     public IMRBatchMergeOutputSide getBatchMergeOutputSide(final CubeSegment seg) {
         return new IMRBatchMergeOutputSide() {
             HBaseMRSteps steps = new HBaseMRSteps(seg);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f8590d25/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 43cbb70..1267d2d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -3,14 +3,19 @@ package org.apache.kylin.storage.hbase.steps;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.storage.hbase.ii.IIBulkLoadJob;
+import org.apache.kylin.storage.hbase.ii.IICreateHFileJob;
+import org.apache.kylin.storage.hbase.ii.IICreateHTableJob;
 import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 
 import com.google.common.base.Preconditions;
@@ -18,7 +23,7 @@ import com.google.common.collect.Lists;
 
 public class HBaseMRSteps extends JobBuilderSupport {
 
-    public HBaseMRSteps(CubeSegment seg) {
+    public HBaseMRSteps(IRealizationSegment seg) {
         super(seg, null);
     }
 
@@ -42,11 +47,11 @@ public class HBaseMRSteps extends JobBuilderSupport {
         rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
         StringBuilder cmd = new StringBuilder();
 
-        appendMapReduceParameters(cmd, seg);
+        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
         appendExecCmdParameters(cmd, "input", inputPath);
         appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(jobId));
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getRealization().getName() + "_Step");
 
         rowkeyDistributionStep.setMapReduceParams(cmd.toString());
         rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
@@ -65,7 +70,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
         StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "segmentname", seg.getName());
         appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
         appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
@@ -84,12 +89,12 @@ public class HBaseMRSteps extends JobBuilderSupport {
         createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
         StringBuilder cmd = new StringBuilder();
 
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
         appendExecCmdParameters(cmd, "input", inputPath);
         appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
         appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
+        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
 
         createHFilesStep.setMapReduceParams(cmd.toString());
         createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
@@ -105,7 +110,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         StringBuilder cmd = new StringBuilder();
         appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
         appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
 
         bulkLoadStep.setJobParams(cmd.toString());
         bulkLoadStep.setJobClass(BulkLoadJob.class);
@@ -121,7 +126,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
     public List<String> getMergingHTables() {
-        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance)seg.getRealization()).getMergingSegments((CubeSegment)seg);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingHTables = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
@@ -131,7 +136,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
     public List<String> getMergingHDFSPaths() {
-        final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+        final List<CubeSegment> mergingSegments = ((CubeInstance)seg.getRealization()).getMergingSegments((CubeSegment)seg);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
         final List<String> mergingHDFSPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
@@ -141,11 +146,11 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
     public String getHFilePath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
     }
 
     public String getRowkeyDistributionOutputPath(String jobId) {
-        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
     }
 
     public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
@@ -177,4 +182,67 @@ public class HBaseMRSteps extends JobBuilderSupport {
 
         jobFlow.addTask(step);
     }
+
+    public void addSaveIIToHTableSteps(DefaultChainedExecutable jobFlow, String rootPath) {
+        // create htable step
+        jobFlow.addTask(createCreateIIHTableStep(seg));
+
+        final String iiPath = rootPath + "*";
+        
+        // generate hfiles step
+        jobFlow.addTask(createConvertIIToHfileStep(seg, iiPath, jobFlow.getId()));
+
+        // bulk load step
+        jobFlow.addTask(createIIBulkLoadStep(seg, jobFlow.getId()));
+        
+    }
+
+
+    private HadoopShellExecutable createCreateIIHTableStep(IRealizationSegment seg) {
+        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
+        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+
+        createHtableStep.setJobParams(cmd.toString());
+        createHtableStep.setJobClass(IICreateHTableJob.class);
+
+        return createHtableStep;
+    }
+
+    private MapReduceExecutable createConvertIIToHfileStep(IRealizationSegment seg, String inputPath, String jobId) {
+        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
+        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+        appendExecCmdParameters(cmd, "input", inputPath);
+        appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getRealization().getName() + "_Step");
+
+        createHFilesStep.setMapReduceParams(cmd.toString());
+        createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
+
+        return createHFilesStep;
+    }
+
+    private HadoopShellExecutable createIIBulkLoadStep(IRealizationSegment seg, String jobId) {
+        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
+        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
+
+        StringBuilder cmd = new StringBuilder();
+        appendExecCmdParameters(cmd, "input", getHFilePath(jobId));
+        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
+        appendExecCmdParameters(cmd, "iiname", seg.getRealization().getName());
+
+        bulkLoadStep.setJobParams(cmd.toString());
+        bulkLoadStep.setJobClass(IIBulkLoadJob.class);
+
+        return bulkLoadStep;
+
+    }
+    
 }