You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:19 UTC

[11/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
deleted file mode 100644
index eded7c7..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- */
-public class MergeCuboidJob extends CuboidJob {
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            System.out.println("Starting: " + jobName);
-            job = Job.getInstance(getConf(), jobName);
-
-            setJobClasspath(job);
-
-            // set inputs
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(MergeCuboidMapper.class);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(Text.class);
-
-            // Reducer - only one
-            job.setReducerClass(CuboidReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(Text.class);
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            setReduceTaskNum(job, config, cubeName, 0);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            logger.error("error in MergeCuboidJob", e);
-            printUsage(options);
-            throw e;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
deleted file mode 100644
index 0e7a89f..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- * @author ysong1, honma
- */
-public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
-
-    private KylinConfig config;
-    private String cubeName;
-    private String segmentName;
-    private CubeManager cubeManager;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
-    // life cycle
-
-    private Text outputKey = new Text();
-
-    private byte[] newKeyBuf;
-    private RowKeySplitter rowKeySplitter;
-
-    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
-
-    private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
-
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dictsNeedMerging.get(col);
-        if (ret != null)
-            return ret;
-        else {
-            ret = cubeDesc.getRowkey().isUseDictionary(col);
-            if (ret) {
-                String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
-            }
-            dictsNeedMerging.put(col, ret);
-            return ret;
-        }
-    }
-
-    private String extractJobIDFromPath(String path) {
-        Matcher matcher = JOB_NAME_PATTERN.matcher(path);
-        // check the first occurance
-        if (matcher.find()) {
-            return matcher.group(1);
-        } else {
-            throw new IllegalStateException("Can not extract job ID from file path : " + path);
-        }
-    }
-
-    private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
-        for (CubeSegment segment : cubeInstance.getSegments()) {
-            String lastBuildJobID = segment.getLastBuildJobID();
-            if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
-                return segment;
-            }
-        }
-
-        throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-
-    }
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        cubeManager = CubeManager.getInstance(config);
-        cube = cubeManager.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
-        // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        newKeyBuf = new byte[256];// size will auto-grow
-
-        // decide which source segment
-        InputSplit inputSplit = context.getInputSplit();
-        String filePath = ((FileSplit) inputSplit).getPath().toString();
-        System.out.println("filePath:" + filePath);
-        String jobID = extractJobIDFromPath(filePath);
-        System.out.println("jobID:" + jobID);
-        sourceCubeSegment = findSegmentWithUuid(jobID, cube);
-        System.out.println(sourceCubeSegment);
-
-        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length);
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
-
-        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
-        int bufOffset = 0;
-        BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
-        bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
-        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
-            TblColRef col = cuboid.getColumns().get(i);
-
-            if (this.checkNeedMerging(col)) {
-                // if dictionary on fact table column, needs rewrite
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
-                while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
-                }
-
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
-                int idInMergedDict;
-
-                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
-                if (size < 0) {
-                    idInMergedDict = mergedDict.nullId();
-                } else {
-                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
-                }
-
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
-                bufOffset += mergedDict.getSizeOfId();
-            } else {
-                // keep as it is
-                while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
-                }
-
-                System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
-                bufOffset += splittedByteses[i + 1].length;
-            }
-        }
-        byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
-        outputKey.set(newKey, 0, newKey.length);
-
-        context.write(outputKey, value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
deleted file mode 100644
index 005254b..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- */
-public class MetadataCleanupJob extends AbstractHadoopJob {
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused metadata").create("delete");
-
-    protected static final Logger log = LoggerFactory.getLogger(MetadataCleanupJob.class);
-
-    boolean delete = false;
-
-    private KylinConfig config = null;
-
-    public static final long TIME_THREADSHOLD = 2 * 26 * 3600 * 1000l; // 2 days
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-     */
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        log.info("----- jobs args: " + Arrays.toString(args));
-        try {
-            options.addOption(OPTION_DELETE);
-            parseOptions(options, args);
-
-            log.info("options: '" + getOptionsAsString() + "'");
-            log.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
-            delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
-
-            config = KylinConfig.getInstanceFromEnv();
-
-            cleanup();
-
-            return 0;
-        } catch (Exception e) {
-            e.printStackTrace(System.err);
-            throw e;
-        }
-    }
-
-    private ResourceStore getStore() {
-        return ResourceStore.getStore(config);
-    }
-
-    private boolean isOlderThanThreshold(long resourceTime) {
-        long currentTime = System.currentTimeMillis();
-
-        if (currentTime - resourceTime > TIME_THREADSHOLD)
-            return true;
-        return false;
-    }
-
-    public void cleanup() throws Exception {
-        CubeManager cubeManager = CubeManager.getInstance(config);
-
-        List<String> activeResourceList = Lists.newArrayList();
-        for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) {
-            for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) {
-                activeResourceList.addAll(segment.getSnapshotPaths());
-                activeResourceList.addAll(segment.getDictionaryPaths());
-                activeResourceList.add(segment.getStatisticsResourcePath());
-            }
-        }
-
-        List<String> toDeleteResource = Lists.newArrayList();
-
-        // two level resources, snapshot tables and cube statistics
-        for (String resourceRoot : new String[]{ResourceStore.SNAPSHOT_RESOURCE_ROOT, ResourceStore.CUBE_STATISTICS_ROOT}) {
-            ArrayList<String> snapshotTables = getStore().listResources(resourceRoot);
-
-            for (String snapshotTable : snapshotTables) {
-                ArrayList<String> snapshotNames = getStore().listResources(snapshotTable);
-                if (snapshotNames != null)
-                    for (String snapshot : snapshotNames) {
-                        if (!activeResourceList.contains(snapshot)) {
-                            if (isOlderThanThreshold(getStore().getResourceTimestamp(snapshot)))
-                                toDeleteResource.add(snapshot);
-                        }
-                    }
-            }
-        }
-
-        // three level resources, only dictionaries
-        ArrayList<String> dictTables = getStore().listResources(ResourceStore.DICT_RESOURCE_ROOT);
-
-        for (String table : dictTables) {
-            ArrayList<String> tableColNames = getStore().listResources(table);
-            if (tableColNames != null)
-                for (String tableCol : tableColNames) {
-                    ArrayList<String> dictionaries = getStore().listResources(tableCol);
-                    if (dictionaries != null)
-                        for (String dict : dictionaries)
-                            if (!activeResourceList.contains(dict)) {
-                                if (isOlderThanThreshold(getStore().getResourceTimestamp(dict)))
-                                    toDeleteResource.add(dict);
-                            }
-                }
-        }
-
-
-        if (toDeleteResource.size() > 0) {
-            logger.info("The following resources have no reference, will be cleaned from metadata store: \n");
-
-            for (String s : toDeleteResource) {
-                logger.info(s);
-                if (delete == true) {
-                    getStore().deleteResource(s);
-                }
-            }
-        } else {
-            logger.info("No resource to be cleaned up from metadata store;");
-        }
-
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new MetadataCleanupJob(), args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java
deleted file mode 100644
index a391979..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-
-public class NDCuboidJob extends CuboidJob {
-
-    public NDCuboidJob() {
-        this.setMapperClass(NDCuboidMapper.class);
-    }
-
-    public static void main(String[] args) throws Exception {
-        CuboidJob job = new NDCuboidJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
deleted file mode 100644
index c4978bf..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
-
-    private static final Logger logger = LoggerFactory.getLogger(NDCuboidMapper.class);
-
-    private Text outputKey = new Text();
-    private String cubeName;
-    private String segmentName;
-    private CubeDesc cubeDesc;
-    private CuboidScheduler cuboidScheduler;
-
-    private int handleCounter;
-    private int skipCounter;
-
-    private byte[] keyBuf = new byte[4096];
-    private RowKeySplitter rowKeySplitter;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        cubeDesc = cube.getDescriptor();
-
-        // initialize CubiodScheduler
-        cuboidScheduler = new CuboidScheduler(cubeDesc);
-
-        rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
-    }
-
-    private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
-        int offset = 0;
-
-        // cuboid id
-        System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length);
-        offset += childCuboid.getBytes().length;
-
-        // rowkey columns
-        long mask = Long.highestOneBit(parentCuboid.getId());
-        long parentCuboidId = parentCuboid.getId();
-        long childCuboidId = childCuboid.getId();
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
-        int index = 1; // skip cuboidId
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & parentCuboidId) > 0) {// if the this bit position equals
-                                              // 1
-                if ((mask & childCuboidId) > 0) {// if the child cuboid has this
-                                                 // column
-                    System.arraycopy(splitBuffers[index].value, 0, keyBuf, offset, splitBuffers[index].length);
-                    offset += splitBuffers[index].length;
-                }
-                index++;
-            }
-            mask = mask >> 1;
-        }
-
-        return offset;
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength());
-        Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
-
-        Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
-
-        // if still empty or null
-        if (myChildren == null || myChildren.size() == 0) {
-            context.getCounter(BatchConstants.MAPREDUCE_COUTNER_GROUP_NAME, "Skipped records").increment(1L);
-            skipCounter++;
-            if (skipCounter % BatchConstants.COUNTER_MAX == 0) {
-                logger.info("Skipped " + skipCounter + " records!");
-            }
-            return;
-        }
-
-        context.getCounter(BatchConstants.MAPREDUCE_COUTNER_GROUP_NAME, "Processed records").increment(1L);
-
-        handleCounter++;
-        if (handleCounter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + handleCounter + " records!");
-        }
-
-        for (Long child : myChildren) {
-            Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
-            int keyLength = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
-            outputKey.set(keyBuf, 0, keyLength);
-            context.write(outputKey, value);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
index 97b5de8..2acace4 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
@@ -45,8 +45,8 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.lookup.LookupBytesTable;
 import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
index 277624a..b259e1a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
deleted file mode 100644
index 7c5e5db..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-
-/**
- * @author xjiang, ysong1
- * 
- */
-
-public class RangeKeyDistributionJob extends AbstractHadoopJob {
-    protected static final Logger log = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-     */
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-
-            parseOptions(options, args);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            job = Job.getInstance(getConf(), jobName);
-
-            setJobClasspath(job);
-
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-            // job.getConfiguration().set("dfs.block.size", "67108864");
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(RangeKeyDistributionMapper.class);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(LongWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(RangeKeyDistributionReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(LongWritable.class);
-            job.setNumReduceTasks(1);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            RealizationCapacity realizationCapacity = cube.getDescriptor().getModel().getCapacity();
-            job.getConfiguration().set(BatchConstants.CUBE_CAPACITY, realizationCapacity.toString());
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
deleted file mode 100644
index e49f090..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-/**
- * @author ysong1
- * 
- */
-public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
-    private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
-
-    private LongWritable outputValue = new LongWritable(0);
-
-    private long bytesRead = 0;
-
-    private Text lastKey;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        lastKey = key;
-
-        int bytesLength = key.getLength() + value.getLength();
-        bytesRead += bytesLength;
-
-        if (bytesRead >= ONE_MEGA_BYTES) {
-            outputValue.set(bytesRead);
-            context.write(key, outputValue);
-
-            // reset bytesRead
-            bytesRead = 0;
-        }
-
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        if (lastKey != null) {
-            outputValue.set(bytesRead);
-            context.write(lastKey, outputValue);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
deleted file mode 100644
index a580a9e..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kylin.cube.model.v1.CubeDesc.CubeCapacity;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- * 
- */
-public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
-    public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
-    public static final int SMALL_CUT = 5;  //  5 GB per region
-    public static final int MEDIUM_CUT = 10; //  10 GB per region
-    public static final int LARGE_CUT = 50; // 50 GB per region
-    
-    public static final int MAX_REGION = 1000;
-
-    private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
-
-    private LongWritable outputValue = new LongWritable(0);
-
-    private int cut;
-    private long bytesRead = 0;
-    private List<Text> gbPoints = new ArrayList<Text>();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        CubeCapacity cubeCapacity = CubeCapacity.valueOf(context.getConfiguration().get(BatchConstants.CUBE_CAPACITY));
-        switch (cubeCapacity) {
-        case SMALL:
-            cut = SMALL_CUT;
-            break;
-        case MEDIUM:
-            cut = MEDIUM_CUT;
-            break;
-        case LARGE:
-            cut = LARGE_CUT;
-            break;
-        }
-
-        logger.info("Chosen cut for htable is " + cut);
-    }
-
-    @Override
-    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-        for (LongWritable v : values) {
-            bytesRead += v.get();
-        }
-        
-        if (bytesRead >= ONE_GIGA_BYTES) {
-            gbPoints.add(new Text(key));
-            bytesRead = 0; // reset bytesRead
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        int nRegion = Math.round((float) gbPoints.size() / (float) cut);
-        nRegion = Math.max(1,  nRegion);
-        nRegion = Math.min(MAX_REGION, nRegion);
-        
-        int gbPerRegion = gbPoints.size() / nRegion;
-        gbPerRegion = Math.max(1, gbPerRegion);
-        
-        System.out.println(nRegion + " regions");
-        System.out.println(gbPerRegion + " GB per region");
-        
-        for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
-            Text key = gbPoints.get(i);
-            outputValue.set(i);
-            System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
-            context.write(key, outputValue);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
deleted file mode 100644
index faf6675..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author ysong1
- * 
- */
-public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
-
-    @SuppressWarnings("static-access")
-    protected static final Option rowKeyStatsFilePath = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(rowKeyStatsFilePath);
-
-            parseOptions(options, args);
-
-            String statsFilePath = getOptionValue(rowKeyStatsFilePath);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            job = Job.getInstance(getConf(), jobName);
-
-            setJobClasspath(job);
-
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(RowKeyDistributionCheckerMapper.class);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(LongWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(RowKeyDistributionCheckerReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(LongWritable.class);
-            job.setNumReduceTasks(1);
-
-            job.getConfiguration().set("rowKeyStatsFilePath", statsFilePath);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new RowKeyDistributionCheckerJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
deleted file mode 100644
index de57562..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.engine.mr.KylinMapper;
-
-/**
- * @author ysong1
- * 
- */
-public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
-    String rowKeyStatsFilePath;
-    byte[][] splitKeys;
-    Map<Text, Long> resultMap;
-    List<Text> keyList;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        rowKeyStatsFilePath = context.getConfiguration().get("rowKeyStatsFilePath");
-        splitKeys = this.getSplits(context.getConfiguration(), new Path(rowKeyStatsFilePath));
-
-        resultMap = new HashMap<Text, Long>();
-        keyList = new ArrayList<Text>();
-        for (int i = 0; i < splitKeys.length; i++) {
-            Text key = new Text(splitKeys[i]);
-            resultMap.put(key, 0L);
-            keyList.add(new Text(splitKeys[i]));
-        }
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        for (Text t : keyList) {
-            if (key.compareTo(t) < 0) {
-                Long v = resultMap.get(t);
-                long length = key.getLength() + value.getLength();
-                v += length;
-                resultMap.put(t, v);
-                break;
-            }
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        LongWritable outputValue = new LongWritable();
-        for (Entry<Text, Long> kv : resultMap.entrySet()) {
-            outputValue.set(kv.getValue());
-            context.write(kv.getKey(), outputValue);
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    public byte[][] getSplits(Configuration conf, Path path) {
-        List<byte[]> rowkeyList = new ArrayList<byte[]>();
-        SequenceFile.Reader reader = null;
-        try {
-            reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
-            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-            while (reader.next(key, value)) {
-                byte[] tmp = ((Text) key).copyBytes();
-                if (rowkeyList.contains(tmp) == false) {
-                    rowkeyList.add(tmp);
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            IOUtils.closeStream(reader);
-        }
-
-        byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
-
-        return retValue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
deleted file mode 100644
index c010a33..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author ysong1
- * 
- */
-public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
-    LongWritable outputKey = new LongWritable(0L);
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-    }
-
-    @Override
-    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-
-        long length = 0;
-        for (LongWritable v : values) {
-            length += v.get();
-        }
-
-        outputKey.set(length);
-        context.write(key, outputKey);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
index d88b116..1247cd3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
@@ -35,13 +35,13 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java
deleted file mode 100644
index 1a5e690..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateDictionaryJob.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.dict;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.engine.mr.DFSFileTable;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- * @author ysong1
- * 
- */
-
-public class CreateDictionaryJob extends AbstractHadoopJob {
-
-    private int returnCode = 0;
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            parseOptions(options, args);
-
-            final String cubeName = getOptionValue(OPTION_CUBE_NAME);
-            final String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-            final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
-
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-
-            DictionaryGeneratorCLI.processSegment(config, cubeName, segmentName, new DistinctColumnValuesProvider() {
-                @Override
-                public ReadableTable getDistinctValuesFor(TblColRef col) {
-                    return new DFSFileTable(factColumnsInputPath + "/" + col.getName(), -1);
-                }
-            });
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-
-        return returnCode;
-    }
-    
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new CreateDictionaryJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
index 66a1106..87ee70e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java
@@ -23,9 +23,9 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
 import org.apache.kylin.engine.mr.DFSFileTable;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ReadableTable;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
deleted file mode 100644
index 7a4702e..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hbase;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.util.ToolRunner;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- * 
- */
-public class BulkLoadJob extends AbstractHadoopJob {
-
-    protected static final Logger log = LoggerFactory.getLogger(BulkLoadJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_HTABLE_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            parseOptions(options, args);
-
-            String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-            // e.g
-            // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
-            // end with "/"
-            String input = getOptionValue(OPTION_INPUT_PATH);
-
-            Configuration conf = HBaseConfiguration.create(getConf());
-            FileSystem fs = FileSystem.get(conf);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            CubeDesc cubeDesc = cube.getDescriptor();
-            FsPermission permission = new FsPermission((short) 0777);
-            for (HBaseColumnFamilyDesc cf : cubeDesc.getHBaseMapping().getColumnFamily()) {
-                String cfName = cf.getName();
-                Path columnFamilyPath = new Path(input + cfName);
-
-                // File may have already been auto-loaded (in the case of MapR DB)
-                if(fs.exists(columnFamilyPath)) {
-                    fs.setPermission(columnFamilyPath, permission);
-                }
-            }
-
-            String[] newArgs = new String[2];
-            newArgs[0] = input;
-            newArgs[1] = tableName;
-
-            log.debug("Start to run LoadIncrementalHFiles");
-            int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs);
-            log.debug("End to run LoadIncrementalHFiles");
-            return ret;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new BulkLoadJob(), args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
deleted file mode 100644
index 304fa04..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hbase;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class CreateHTableJob extends AbstractHadoopJob {
-
-    protected static final Logger logger = LoggerFactory.getLogger(CreateHTableJob.class);
-
-    CubeInstance cube = null;
-    CubeDesc cubeDesc = null;
-    String segmentName = null;
-    KylinConfig kylinConfig;
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        options.addOption(OPTION_CUBE_NAME);
-        options.addOption(OPTION_SEGMENT_NAME);
-        options.addOption(OPTION_PARTITION_FILE_PATH);
-        options.addOption(OPTION_HTABLE_NAME);
-        options.addOption(OPTION_STATISTICS_ENABLED);
-        parseOptions(options, args);
-
-        Path partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
-        boolean statistics_enabled = Boolean.parseBoolean(getOptionValue(OPTION_STATISTICS_ENABLED));
-
-        String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        CubeManager cubeMgr = CubeManager.getInstance(kylinConfig);
-        cube = cubeMgr.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-        CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
-        String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-        Configuration conf = HBaseConfiguration.create(getConf());
-
-        try {
-
-            byte[][] splitKeys;
-            if (statistics_enabled) {
-                List<Integer> rowkeyColumnSize = Lists.newArrayList();
-                long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-                Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-                List<TblColRef> columnList = baseCuboid.getColumns();
-
-                for (int i = 0; i < columnList.size(); i++) {
-                    logger.info("Rowkey column " + i + " length " + cubeSegment.getColumnLength(columnList.get(i)));
-                    rowkeyColumnSize.add(cubeSegment.getColumnLength(columnList.get(i)));
-                }
-
-                splitKeys = getSplitsFromCuboidStatistics(conf, kylinConfig, rowkeyColumnSize, cubeSegment);
-            } else {
-                splitKeys = getSplits(conf, partitionFilePath);
-            }
-
-            CubeHTableUtil.createHTable(cubeDesc, tableName, splitKeys);
-            return 0;
-        } catch (Exception e) {
-            printUsage(options);
-            e.printStackTrace(System.err);
-            logger.error(e.getLocalizedMessage(), e);
-            return 2;
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    public byte[][] getSplits(Configuration conf, Path path) throws Exception {
-        FileSystem fs = path.getFileSystem(conf);
-        if (fs.exists(path) == false) {
-            System.err.println("Path " + path + " not found, no region split, HTable will be one region");
-            return null;
-        }
-
-        List<byte[]> rowkeyList = new ArrayList<byte[]>();
-        SequenceFile.Reader reader = null;
-        try {
-            reader = new SequenceFile.Reader(fs, path, conf);
-            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-            while (reader.next(key, value)) {
-                rowkeyList.add(((Text) key).copyBytes());
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        } finally {
-            IOUtils.closeStream(reader);
-        }
-
-        logger.info((rowkeyList.size() + 1) + " regions");
-        logger.info(rowkeyList.size() + " splits");
-        for (byte[] split : rowkeyList) {
-            logger.info(StringUtils.byteToHexString(split));
-        }
-
-        byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
-        return retValue.length == 0 ? null : retValue;
-    }
-
-
-    @SuppressWarnings("deprecation")
-    public static byte[][] getSplitsFromCuboidStatistics(Configuration conf, KylinConfig kylinConfig,  List<Integer> rowkeyColumnSize, CubeSegment cubeSegment) throws IOException {
-
-        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-        DataModelDesc.RealizationCapacity cubeCapacity = cubeDesc.getModel().getCapacity();
-        int cut = kylinConfig.getHBaseRegionCut(cubeCapacity.toString());
-
-        logger.info("Cube capacity " + cubeCapacity.toString() + ", chosen cut for HTable is " + cut + "GB");
-
-        Map<Long, Long> cuboidSizeMap = Maps.newHashMap();
-        long totalSizeInM = 0;
-
-        ResourceStore rs = ResourceStore.getStore(kylinConfig);
-        String fileKey = cubeSegment.getStatisticsResourcePath();
-        InputStream is = rs.getResource(fileKey);
-        File tempFile = null;
-        FileOutputStream tempFileStream = null;
-        try {
-            tempFile = File.createTempFile(cubeSegment.getUuid(), ".seq");
-            tempFileStream = new FileOutputStream(tempFile);
-            org.apache.commons.io.IOUtils.copy(is, tempFileStream);
-        } finally {
-            IOUtils.closeStream(is);
-            IOUtils.closeStream(tempFileStream);
-        }
-
-        FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
-        SequenceFile.Reader reader = null;
-        try {
-            reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
-            LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-            BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-            int samplingPercentage = 25;
-            while (reader.next(key, value)) {
-                if (key.get() == 0l) {
-                    samplingPercentage = Bytes.toInt(value.getBytes());
-                } else {
-                    HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
-                    ByteArray byteArray = new ByteArray(value.getBytes());
-                    hll.readRegisters(byteArray.asBuffer());
-
-                    cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / samplingPercentage);
-                }
-
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
-        } finally {
-            IOUtils.closeStream(reader);
-        }
-
-        List<Long> allCuboids = Lists.newArrayList();
-        allCuboids.addAll(cuboidSizeMap.keySet());
-        Collections.sort(allCuboids);
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        for (long cuboidId : allCuboids) {
-            long cuboidSize = estimateCuboidStorageSize(cubeDesc, cuboidId, cuboidSizeMap.get(cuboidId), baseCuboidId, rowkeyColumnSize);
-            cuboidSizeMap.put(cuboidId, cuboidSize);
-            totalSizeInM += cuboidSize;
-        }
-
-        int nRegion = Math.round((float) totalSizeInM / ((float) cut * 1024l));
-        nRegion = Math.max(kylinConfig.getHBaseRegionCutMin(), nRegion);
-        nRegion = Math.min(kylinConfig.getHBaseRegionCutMax(), nRegion);
-
-        int mbPerRegion = (int) (totalSizeInM / (nRegion));
-        mbPerRegion = Math.max(1, mbPerRegion);
-
-        logger.info("Total size " + totalSizeInM + "M (estimated)");
-        logger.info(nRegion + " regions (estimated)");
-        logger.info(mbPerRegion + " MB per region (estimated)");
-
-        List<Long> regionSplit = Lists.newArrayList();
-
-
-        long size = 0;
-        int regionIndex = 0;
-        int cuboidCount = 0;
-        for (int i = 0; i < allCuboids.size(); i++) {
-            long cuboidId = allCuboids.get(i);
-            if (size >= mbPerRegion || (size + cuboidSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) {
-                // if the size already bigger than threshold, or it will exceed by 20%, cut for next region
-                regionSplit.add(cuboidId);
-                logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId + " (" + cuboidCount + ") cuboids");
-                size = 0;
-                cuboidCount = 0;
-                regionIndex++;
-            }
-            size += cuboidSizeMap.get(cuboidId);
-            cuboidCount++;
-        }
-
-
-        byte[][] result = new byte[regionSplit.size()][];
-        for (int i = 0; i < regionSplit.size(); i++) {
-            result[i] = Bytes.toBytes(regionSplit.get(i));
-        }
-
-        return result;
-    }
-
-    /**
-     * Estimate the cuboid's size
-     *
-     * @param cubeDesc
-     * @param cuboidId
-     * @param rowCount
-     * @return the cuboid size in M bytes
-     */
-    private static long estimateCuboidStorageSize(CubeDesc cubeDesc, long cuboidId, long rowCount, long baseCuboidId, List<Integer> rowKeyColumnLength) {
-
-        int bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
-
-        long mask = Long.highestOneBit(baseCuboidId);
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & cuboidId) > 0) {
-                bytesLength += rowKeyColumnLength.get(i); //colIO.getColumnLength(columnList.get(i));
-            }
-            mask = mask >> 1;
-        }
-
-        // add the measure length
-        int space = 0;
-        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            DataType returnType = measureDesc.getFunction().getReturnDataType();
-            if (returnType.isHLLC()) {
-                // for HLL, it will be compressed when export to bytes
-                space += returnType.getSpaceEstimate() * 0.75;
-            } else {
-                space += returnType.getSpaceEstimate();
-            }
-        }
-        bytesLength += space;
-
-        logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + bytesLength + " bytes.");
-        logger.info("Cuboid " + cuboidId + " total size is " + (bytesLength * rowCount / (1024L * 1024L)) + "M.");
-        return bytesLength * rowCount / (1024L * 1024L);
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new CreateHTableJob(), args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
deleted file mode 100644
index 88b345b..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CubeHTableUtil.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package org.apache.kylin.job.hadoop.hbase;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.job.tools.DeployCoprocessorCLI;
-import org.apache.kylin.job.tools.LZOSupportnessChecker;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- */
-public class CubeHTableUtil {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeHTableUtil.class);
-
-    public static void createHTable(CubeDesc cubeDesc, String tableName, byte[][] splitKeys) throws IOException {
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
-        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
-        // https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/regionserver/ConstantSizeRegionSplitPolicy.html
-        tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName());
-        tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
-
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin admin = new HBaseAdmin(conf);
-
-        try {
-            if (User.isHBaseSecurityEnabled(conf)) {
-                // add coprocessor for bulk load
-                tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
-            }
-
-            for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-                HColumnDescriptor cf = new HColumnDescriptor(cfDesc.getName());
-                cf.setMaxVersions(1);
-
-                if (LZOSupportnessChecker.getSupportness()) {
-                    logger.info("hbase will use lzo to compress cube data");
-                    cf.setCompressionType(Compression.Algorithm.LZO);
-                } else {
-                    logger.info("hbase will not use lzo to compress cube data");
-                }
-
-                cf.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
-                cf.setInMemory(false);
-                cf.setBlocksize(4 * 1024 * 1024); // set to 4MB
-                tableDesc.addFamily(cf);
-            }
-
-            if (admin.tableExists(tableName)) {
-                // admin.disableTable(tableName);
-                // admin.deleteTable(tableName);
-                throw new RuntimeException("HBase table " + tableName + " exists!");
-            }
-
-            DeployCoprocessorCLI.deployCoprocessor(tableDesc);
-
-            admin.createTable(tableDesc, splitKeys);
-            Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons");
-            logger.info("create hbase table " + tableName + " done.");
-        } catch (Exception e) {
-            logger.error("Failed to create HTable", e);
-            throw e;
-        } finally {
-            admin.close();
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java
deleted file mode 100644
index f710d34..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/SqlHiveDataTypeMapping.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.hive;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class SqlHiveDataTypeMapping {
-
-    public static String getHiveDataType(String javaDataType) {
-        String hiveDataType = javaDataType.toLowerCase().startsWith("varchar") ? "string" : javaDataType;
-        hiveDataType = javaDataType.toLowerCase().startsWith("integer") ? "int" : hiveDataType;
-
-        return hiveDataType.toLowerCase();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
index 8575f89..d3f6d68 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIBulkLoadJob.java
@@ -25,11 +25,11 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
index d55900b..08b7a46 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
index 3690e48..6149d27 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java
@@ -32,15 +32,15 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.tools.DeployCoprocessorCLI;
-import org.apache.kylin.job.tools.LZOSupportnessChecker;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI;
+import org.apache.kylin.storage.hbase.util.LZOSupportnessChecker;
 
 /**
  * @author George Song (ysong1)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
index b1ff6f1..1ed6b89 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsJob.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.slf4j.Logger;