You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/11 13:49:49 UTC

[10/51] [partial] kylin git commit: KYLIN-1416 keep only website in document branch

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
deleted file mode 100644
index 698a978..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop;
-
-/**
- * @author George Song (ysong1)
- *
- */
-
-import static org.apache.hadoop.util.StringUtils.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.StringSplitter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("static-access")
-public abstract class AbstractHadoopJob extends Configured implements Tool {
-    protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
-
-    protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
-    protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
-    protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
-    protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
-    protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
-    protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
-    protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
-    protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
-    protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
-    protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
-    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
-    protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
-
-    protected String name;
-    protected boolean isAsync = false;
-    protected OptionsHelper optionsHelper = new OptionsHelper();
-
-    protected Job job;
-
-    protected void parseOptions(Options options, String[] args) throws ParseException {
-        optionsHelper.parseOptions(options, args);
-    }
-
-    public void printUsage(Options options) {
-        optionsHelper.printUsage(getClass().getSimpleName(), options);
-    }
-
-    public Option[] getOptions() {
-        return optionsHelper.getOptions();
-    }
-
-    public String getOptionsAsString() {
-        return optionsHelper.getOptionsAsString();
-    }
-
-    protected String getOptionValue(Option option) {
-        return optionsHelper.getOptionValue(option);
-    }
-
-    protected boolean hasOption(Option option) {
-        return optionsHelper.hasOption(option);
-    }
-
-    protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
-        int retVal = 0;
-        long start = System.nanoTime();
-        if (isAsync) {
-            job.submit();
-        } else {
-            job.waitForCompletion(true);
-            retVal = job.isSuccessful() ? 0 : 1;
-            logger.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures.  Time taken ") + formatTime((System.nanoTime() - start) / 1000000L));
-        }
-        return retVal;
-    }
-
-    private static final String KYLIN_HIVE_DEPENDENCY_JARS = "[^,]*hive-exec.jar|[^,]*hive-metastore.jar|[^,]*hive-hcatalog-core[0-9.-]*jar";
-
-    String filterKylinHiveDependency(String kylinHiveDependency) {
-        if (StringUtils.isBlank(kylinHiveDependency))
-            return "";
-
-        StringBuilder jarList = new StringBuilder();
-
-        Pattern hivePattern = Pattern.compile(KYLIN_HIVE_DEPENDENCY_JARS);
-        Matcher matcher = hivePattern.matcher(kylinHiveDependency);
-
-        while (matcher.find()) {
-            if (jarList.length() > 0)
-                jarList.append(",");
-            jarList.append(matcher.group());
-        }
-
-        return jarList.toString();
-    }
-
-    private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
-
-    protected void setJobClasspath(Job job) {
-        String jarPath = KylinConfig.getInstanceFromEnv().getKylinJobJarPath();
-        File jarFile = new File(jarPath);
-        if (jarFile.exists()) {
-            job.setJar(jarPath);
-            logger.info("append job jar: " + jarPath);
-        } else {
-            job.setJarByClass(this.getClass());
-        }
-
-        String kylinHiveDependency = System.getProperty("kylin.hive.dependency");
-        String kylinHBaseDependency = System.getProperty("kylin.hbase.dependency");
-        logger.info("append kylin.hbase.dependency: " + kylinHBaseDependency + " to " + MAP_REDUCE_CLASSPATH);
-
-        Configuration jobConf = job.getConfiguration();
-        String classpath = jobConf.get(MAP_REDUCE_CLASSPATH);
-        if (classpath == null || classpath.length() == 0) {
-            logger.info("Didn't find " + MAP_REDUCE_CLASSPATH + " in job configuration, will run 'mapred classpath' to get the default value.");
-            classpath = getDefaultMapRedClasspath();
-            classpath = classpath.replace(":", ","); // yarn classpath is comma separated
-            logger.info("The default mapred classpath is: " + classpath);
-        }
-
-        if (kylinHBaseDependency != null) {
-            // yarn classpath is comma separated
-            kylinHBaseDependency = kylinHBaseDependency.replace(":", ",");
-            classpath = classpath + "," + kylinHBaseDependency;
-        }
-
-        jobConf.set(MAP_REDUCE_CLASSPATH, classpath);
-        logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH));
-
-        /*
-         *  set extra dependencies as tmpjars & tmpfiles if configured
-         */
-        StringBuilder kylinDependency = new StringBuilder();
-
-        // for hive dependencies
-        if (kylinHiveDependency != null) {
-            // yarn classpath is comma separated
-            kylinHiveDependency = kylinHiveDependency.replace(":", ",");
-
-            logger.info("Hive Dependencies Before Filtered: " + kylinHiveDependency);
-            String filteredHive = filterKylinHiveDependency(kylinHiveDependency);
-            logger.info("Hive Dependencies After Filtered: " + filteredHive);
-
-            if (kylinDependency.length() > 0)
-                kylinDependency.append(",");
-            kylinDependency.append(filteredHive);
-        }
-
-        // for KylinJobMRLibDir
-        String mrLibDir = KylinConfig.getInstanceFromEnv().getKylinJobMRLibDir();
-        if (!StringUtils.isBlank(mrLibDir)) {
-            File dirFileMRLIB = new File(mrLibDir);
-            if (dirFileMRLIB.exists()) {
-                if (kylinDependency.length() > 0)
-                    kylinDependency.append(",");
-                kylinDependency.append(mrLibDir);
-            } else {
-                logger.info("The directory '" + mrLibDir + "' for 'kylin.job.mr.lib.dir' does not exist!!!");
-            }
-        }
-
-        setJobTmpJarsAndFiles(job, kylinDependency.toString());
-    }
-
-    private void setJobTmpJarsAndFiles(Job job, String kylinDependency) {
-        if (StringUtils.isBlank(kylinDependency))
-            return;
-
-        String[] fNameList = kylinDependency.split(",");
-
-        try {
-            Configuration jobConf = job.getConfiguration();
-            FileSystem fs = FileSystem.getLocal(jobConf);
-
-            StringBuilder jarList = new StringBuilder();
-            StringBuilder fileList = new StringBuilder();
-
-            for (String fileName : fNameList) {
-                Path p = new Path(fileName);
-                if (fs.getFileStatus(p).isDirectory()) {
-                    appendTmpDir(job, fileName);
-                    continue;
-                }
-
-                StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
-                if (list.length() > 0)
-                    list.append(",");
-                list.append(fs.getFileStatus(p).getPath().toString());
-            }
-
-            appendTmpFiles(fileList.toString(), jobConf);
-            appendTmpJars(jarList.toString(), jobConf);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void appendTmpDir(Job job, String tmpDir) {
-        if (StringUtils.isBlank(tmpDir))
-            return;
-
-        try {
-            Configuration jobConf = job.getConfiguration();
-            FileSystem fs = FileSystem.getLocal(jobConf);
-            FileStatus[] fList = fs.listStatus(new Path(tmpDir));
-
-            StringBuilder jarList = new StringBuilder();
-            StringBuilder fileList = new StringBuilder();
-
-            for (FileStatus file : fList) {
-                Path p = file.getPath();
-                if (fs.getFileStatus(p).isDirectory()) {
-                    appendTmpDir(job, p.toString());
-                    continue;
-                }
-
-                StringBuilder list = (p.getName().endsWith(".jar")) ? jarList : fileList;
-                if (list.length() > 0)
-                    list.append(",");
-                list.append(fs.getFileStatus(p).getPath().toString());
-            }
-
-            appendTmpFiles(fileList.toString(), jobConf);
-            appendTmpJars(jarList.toString(), jobConf);
-
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void appendTmpJars(String jarList, Configuration conf) {
-        if (StringUtils.isBlank(jarList))
-            return;
-
-        String tmpJars = conf.get("tmpjars", null);
-        if (tmpJars == null) {
-            tmpJars = jarList;
-        } else {
-            tmpJars += "," + jarList;
-        }
-        conf.set("tmpjars", tmpJars);
-        logger.info("Job 'tmpjars' updated -- " + tmpJars);
-    }
-
-    private void appendTmpFiles(String fileList, Configuration conf) {
-        if (StringUtils.isBlank(fileList))
-            return;
-
-        String tmpFiles = conf.get("tmpfiles", null);
-        if (tmpFiles == null) {
-            tmpFiles = fileList;
-        } else {
-            tmpFiles += "," + fileList;
-        }
-        conf.set("tmpfiles", tmpFiles);
-        logger.info("Job 'tmpfiles' updated -- " + tmpFiles);
-    }
-
-    private String getDefaultMapRedClasspath() {
-
-        String classpath = "";
-        try {
-            CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
-            ShellCmdOutput output = new ShellCmdOutput();
-            executor.execute("mapred classpath", output);
-
-            classpath = output.getOutput().trim();
-        } catch (IOException e) {
-            logger.error("Failed to run: 'mapred classpath'.", e);
-        }
-
-        return classpath;
-    }
-
-    public void addInputDirs(String input, Job job) throws IOException {
-        for (String inp : StringSplitter.split(input, ",")) {
-            inp = inp.trim();
-            if (inp.endsWith("/*")) {
-                inp = inp.substring(0, inp.length() - 2);
-                FileSystem fs = FileSystem.get(job.getConfiguration());
-                Path path = new Path(inp);
-                FileStatus[] fileStatuses = fs.listStatus(path);
-                boolean hasDir = false;
-                for (FileStatus stat : fileStatuses) {
-                    if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
-                        hasDir = true;
-                        addInputDirs(stat.getPath().toString(), job);
-                    }
-                }
-                if (fileStatuses.length > 0 && !hasDir) {
-                    addInputDirs(path.toString(), job);
-                }
-            } else {
-                logger.debug("Add input " + inp);
-                FileInputFormat.addInputPath(job, new Path(inp));
-            }
-        }
-    }
-
-    protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
-        File tmp = File.createTempFile("kylin_job_meta", "");
-        FileUtils.forceDelete(tmp);
-
-        File metaDir = new File(tmp, "meta");
-        metaDir.mkdirs();
-
-        // write kylin.properties
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        File kylinPropsFile = new File(metaDir, "kylin.properties");
-        kylinConfig.writeProperties(kylinPropsFile);
-
-        // write cube / model_desc / cube_desc / dict / table
-        ArrayList<String> dumpList = new ArrayList<String>();
-        dumpList.add(cube.getResourcePath());
-        dumpList.add(cube.getDescriptor().getModel().getResourcePath());
-        dumpList.add(cube.getDescriptor().getResourcePath());
-        for (String tableName : cube.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
-            dumpList.add(table.getResourcePath());
-        }
-
-        for (CubeSegment segment : cube.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
-        }
-
-        dumpResources(kylinConfig, metaDir, dumpList);
-        addToHadoopDistCache(conf, metaDir);
-    }
-
-    protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
-        File tmp = File.createTempFile("kylin_job_meta", "");
-        FileUtils.forceDelete(tmp);
-
-        File metaDir = new File(tmp, "meta");
-        metaDir.mkdirs();
-
-        // write kylin.properties
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        File kylinPropsFile = new File(metaDir, "kylin.properties");
-        kylinConfig.writeProperties(kylinPropsFile);
-
-        // write II / model_desc / II_desc / dict / table
-        ArrayList<String> dumpList = new ArrayList<String>();
-        dumpList.add(ii.getResourcePath());
-        dumpList.add(ii.getDescriptor().getModel().getResourcePath());
-        dumpList.add(ii.getDescriptor().getResourcePath());
-
-        for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
-            TableDesc table = MetadataManager.getInstance(kylinConfig).getTableDesc(tableName);
-            dumpList.add(table.getResourcePath());
-        }
-
-        for (IISegment segment : ii.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
-        }
-
-        dumpResources(kylinConfig, metaDir, dumpList);
-        addToHadoopDistCache(conf, metaDir);
-    }
-
-    private void addToHadoopDistCache(Configuration conf, File metaDir) {
-        // hadoop distributed cache
-        String hdfsMetaDir = OptionsHelper.convertToFileURL(metaDir.getAbsolutePath());
-        if (hdfsMetaDir.startsWith("/")) // note Path on windows is like "d:/../..."
-            hdfsMetaDir = "file://" + hdfsMetaDir;
-        else
-            hdfsMetaDir = "file:///" + hdfsMetaDir;
-        logger.info("HDFS meta dir is: " + hdfsMetaDir);
-
-        appendTmpFiles(hdfsMetaDir, conf);
-    }
-
-    private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
-        ResourceStore from = ResourceStore.getStore(kylinConfig);
-        KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
-        ResourceStore to = ResourceStore.getStore(localConfig);
-        for (String path : dumpList) {
-            RawResource res = from.getResource(path);
-            if (res == null)
-                throw new IllegalStateException("No resource found at -- " + path);
-            to.putResource(path, res.inputStream, res.timestamp);
-            res.inputStream.close();
-        }
-    }
-
-    protected void deletePath(Configuration conf, Path path) throws IOException {
-        FileSystem fs = FileSystem.get(path.toUri(), conf);
-        if (fs.exists(path)) {
-            fs.delete(path, true);
-        }
-    }
-
-    protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        if (job == null) {
-            throw new JobException("Job is null");
-        }
-
-        long mapInputBytes = 0;
-        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
-        for (InputSplit split : input.getSplits(job)) {
-            mapInputBytes += split.getLength();
-        }
-        if (mapInputBytes == 0) {
-            throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
-        }
-        double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
-        return totalMapInputMB;
-    }
-
-    protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
-        if (job == null) {
-            throw new JobException("Job is null");
-        }
-        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
-        return input.getSplits(job).size();
-    }
-
-    public static KylinConfig loadKylinPropsAndMetadata(Configuration conf) throws IOException {
-        File metaDir = new File("meta");
-        System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
-        logger.info("The absolute path for meta dir is " + metaDir.getAbsolutePath());
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        kylinConfig.setMetadataUrl(metaDir.getAbsolutePath());
-        return kylinConfig;
-    }
-
-    protected void cleanupTempConfFile(Configuration conf) {
-        String tempMetaFileString = conf.get("tmpfiles");
-        logger.info("tempMetaFileString is : " + tempMetaFileString);
-        if (tempMetaFileString != null) {
-            if (tempMetaFileString.startsWith("file://")) {
-                tempMetaFileString = tempMetaFileString.substring("file://".length());
-                File tempMetaFile = new File(tempMetaFileString);
-                if (tempMetaFile.exists()) {
-                    try {
-                        FileUtils.forceDelete(tempMetaFile.getParentFile());
-
-                    } catch (IOException e) {
-                        logger.warn("error when deleting " + tempMetaFile, e);
-                    }
-                } else {
-                    logger.info("" + tempMetaFileString + " does not exist");
-                }
-            } else {
-                logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString);
-            }
-        }
-    }
-
-    public void kill() throws JobException {
-        if (job != null) {
-            try {
-                job.killJob();
-            } catch (IOException e) {
-                throw new JobException(e);
-            }
-        }
-    }
-
-    public Map<String, String> getInfo() throws JobException {
-        if (job != null) {
-            Map<String, String> status = new HashMap<String, String>();
-            if (null != job.getJobID()) {
-                status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
-            }
-            if (null != job.getTrackingURL()) {
-                status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
-            }
-
-            return status;
-        } else {
-            throw new JobException("Job is null");
-        }
-    }
-
-    public Counters getCounters() throws JobException {
-        if (job != null) {
-            try {
-                return job.getCounters();
-            } catch (IOException e) {
-                throw new JobException(e);
-            }
-        } else {
-            throw new JobException("Job is null");
-        }
-    }
-
-    public void setAsync(boolean isAsync) {
-        this.isAsync = isAsync;
-    }
-
-    public Job getJob() {
-        return this.job;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
deleted file mode 100644
index 787181c..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
-
-/**
- * @author Jack
- * 
- */
-public class ColumnCardinalityMapper<T> extends KylinMapper<T, HCatRecord, IntWritable, BytesWritable> {
-
-    private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
-    public static final String DEFAULT_DELIM = ",";
-
-    private int counter = 0;
-
-    private HCatSchema schema = null;
-    private int columnSize = 0;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
-        columnSize = schema.getFields().size();
-    }
-
-    @Override
-    public void map(T key, HCatRecord value, Context context) throws IOException, InterruptedException {
-
-        HCatFieldSchema field;
-        Object fieldValue;
-        for (int m = 0; m < columnSize; m++) {
-            field = schema.get(m);
-            fieldValue = value.get(field.getName(), schema);
-            if (fieldValue == null)
-                fieldValue = "NULL";
-
-            if (counter < 5 && m < 10) {
-                System.out.println("Get row " + counter + " column '" + field.getName() + "'  value: " + fieldValue);
-            }
-
-            if (fieldValue != null)
-                getHllc(m).add(Bytes.toBytes(fieldValue.toString()));
-        }
-
-        counter++;
-    }
-
-    private HyperLogLogPlusCounter getHllc(Integer key) {
-        if (!hllcMap.containsKey(key)) {
-            hllcMap.put(key, new HyperLogLogPlusCounter());
-        }
-        return hllcMap.get(key);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
deleted file mode 100644
index ab4285a..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.cube.kv.RowConstants;
-
-/**
- * @author Jack
- * 
- */
-public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWritable, IntWritable, LongWritable> {
-
-    public static final int ONE = 1;
-    private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-    }
-
-    @Override
-    public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
-        int skey = key.get();
-        for (BytesWritable v : values) {
-            ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
-            HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter();
-            hll.readRegisters(buffer);
-            getHllc(skey).merge(hll);
-            hll.clear();
-        }
-    }
-
-    private HyperLogLogPlusCounter getHllc(Integer key) {
-        if (!hllcMap.containsKey(key)) {
-            hllcMap.put(key, new HyperLogLogPlusCounter());
-        }
-        return hllcMap.get(key);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        List<Integer> keys = new ArrayList<Integer>();
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        while (it.hasNext()) {
-            keys.add(it.next());
-        }
-        Collections.sort(keys);
-        it = keys.iterator();
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
-            // context.write(new Text("ErrorRate_" + key), new
-            // LongWritable((long)hllc.getErrorRate()));
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
deleted file mode 100644
index f27d074..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-/**
- * This hadoop job will scan all rows of the hive table and then calculate the cardinality on each column.
- * @author shaoshi
- *
- */
-public class HiveColumnCardinalityJob extends AbstractHadoopJob {
-    public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job";
-
-    @SuppressWarnings("static-access")
-    protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
-
-    public static final String OUTPUT_PATH = BatchConstants.CFG_KYLIN_HDFS_TEMP_DIR + "cardinality";
-
-    public HiveColumnCardinalityJob() {
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_TABLE);
-            options.addOption(OPTION_OUTPUT_PATH);
-
-            parseOptions(options, args);
-
-            // start job
-            String jobName = JOB_TITLE + getOptionsAsString();
-            System.out.println("Starting: " + jobName);
-            Configuration conf = getConf();
-
-            JobEngineConfig jobEngineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            conf.addResource(jobEngineConfig.getHadoopJobConfFilePath(null));
-
-            job = Job.getInstance(conf, jobName);
-
-            setJobClasspath(job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-            job.getConfiguration().set("dfs.block.size", "67108864");
-
-            // Mapper
-            String table = getOptionValue(OPTION_TABLE);
-            String[] dbTableNames = HadoopUtil.parseHiveTableName(table);
-            HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-
-            job.setInputFormatClass(HCatInputFormat.class);
-            job.setMapperClass(ColumnCardinalityMapper.class);
-            job.setMapOutputKeyClass(IntWritable.class);
-            job.setMapOutputValueClass(BytesWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(ColumnCardinalityReducer.class);
-            job.setOutputFormatClass(TextOutputFormat.class);
-            job.setOutputKeyClass(IntWritable.class);
-            job.setOutputValueClass(LongWritable.class);
-            job.setNumReduceTasks(1);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            System.out.println("Going to submit HiveColumnCardinalityJob for table '" + table + "'");
-            int result = waitForCompletion(job);
-
-            return result;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
deleted file mode 100644
index 7bd3814..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This job will update save the cardinality result into Kylin table metadata store.
- *
- * @author shaoshi
- */
-public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob {
-    public static final String JOB_TITLE = "Kylin Hive Column Cardinality Update Job";
-
-    @SuppressWarnings("static-access")
-    protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
-
-    private static final Logger logger = LoggerFactory.getLogger(HiveColumnCardinalityUpdateJob.class);
-    private String table;
-
-    public HiveColumnCardinalityUpdateJob() {
-
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_TABLE);
-            options.addOption(OPTION_OUTPUT_PATH);
-
-            parseOptions(options, args);
-
-            this.table = getOptionValue(OPTION_TABLE).toUpperCase();
-            // start job
-            String jobName = JOB_TITLE + getOptionsAsString();
-            logger.info("Starting: " + jobName);
-            Configuration conf = getConf();
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            updateKylinTableExd(table.toUpperCase(), output.toString(), conf);
-            return 0;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-
-    }
-
-    public void updateKylinTableExd(String tableName, String outPath, Configuration config) throws IOException {
-        List<String> columns = null;
-        try {
-            columns = readLines(new Path(outPath), config);
-        } catch (Exception e) {
-            logger.error("Failed to resolve cardinality for " + tableName + " from " + outPath, e);
-            return;
-        }
-
-        StringBuffer cardi = new StringBuffer();
-        Iterator<String> it = columns.iterator();
-        while (it.hasNext()) {
-            String string = it.next();
-            String[] ss = StringUtils.split(string, "\t");
-
-            if (ss.length != 2) {
-                logger.info("The hadoop cardinality value is not valid " + string);
-                continue;
-            }
-            cardi.append(ss[1]);
-            cardi.append(",");
-        }
-        String scardi = cardi.toString();
-        if (scardi.length() > 0) {
-            scardi = scardi.substring(0, scardi.length() - 1);
-            MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-            Map<String, String> tableExd = metaMgr.getTableDescExd(tableName);
-            tableExd.put(MetadataConstants.TABLE_EXD_CARDINALITY, scardi);
-            metaMgr.saveTableExd(tableName.toUpperCase(), tableExd);
-        } else {
-            throw new IllegalArgumentException("No cardinality data is collected for table " + tableName);
-        }
-    }
-
-    private static List<String> readLines(Path location, Configuration conf) throws Exception {
-        FileSystem fileSystem = FileSystem.get(location.toUri(), conf);
-        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
-        FileStatus[] items = fileSystem.listStatus(location);
-        if (items == null)
-            return new ArrayList<String>();
-        List<String> results = new ArrayList<String>();
-        for (FileStatus item : items) {
-
-            // ignoring files like _SUCCESS
-            if (item.getPath().getName().startsWith("_")) {
-                continue;
-            }
-
-            CompressionCodec codec = factory.getCodec(item.getPath());
-            InputStream stream = null;
-
-            // check if we have a compression codec we need to use
-            if (codec != null) {
-                stream = codec.createInputStream(fileSystem.open(item.getPath()));
-            } else {
-                stream = fileSystem.open(item.getPath());
-            }
-
-            StringWriter writer = new StringWriter();
-            IOUtils.copy(stream, writer, "UTF-8");
-            String raw = writer.toString();
-            for (String str : raw.split("\n")) {
-                results.add(str);
-            }
-        }
-        return results;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
deleted file mode 100644
index b600213..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * @author honma
- * 
- */
-
-public class BaseCuboidJob extends CuboidJob {
-    public BaseCuboidJob() {
-        this.setMapperClass(BaseCuboidMapper.class);
-    }
-
-    public static void main(String[] args) throws Exception {
-        CuboidJob job = new BaseCuboidJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
deleted file mode 100644
index d06963b..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesSplitter;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author George Song (ysong1)
- */
-public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text> {
-
-    private static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapper.class);
-
-    public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
-    public static final byte[] ONE = Bytes.toBytes("1");
-
-    private String cubeName;
-    private String segmentName;
-    private Cuboid baseCuboid;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment cubeSegment;
-    private List<byte[]> nullBytes;
-
-    private CubeJoinedFlatTableDesc intermediateTableDesc;
-    private String intermediateTableRowDelimiter;
-    private byte byteRowDelimiter;
-
-    private int counter;
-    private int errorRecordCounter;
-    private Text outputKey = new Text();
-    private Text outputValue = new Text();
-    protected MeasureIngester<?>[] aggrIngesters;
-    protected Map<TblColRef, Dictionary<String>> dictionaryMap;
-    private Object[] measures;
-    private byte[][] keyBytesBuf;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-    private BytesSplitter bytesSplitter;
-    private AbstractRowKeyEncoder rowKeyEncoder;
-    private MeasureCodec measureCodec;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-        intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
-        if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
-            throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
-        }
-
-        byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
-
-        bytesSplitter = new BytesSplitter(200, 16384);
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
-        measureCodec = new MeasureCodec(cubeDesc.getMeasures());
-        measures = new Object[cubeDesc.getMeasures().size()];
-
-        int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        keyBytesBuf = new byte[colCount][];
-
-        aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-        dictionaryMap = cubeSegment.buildDictionaryMap();
-
-        initNullBytes();
-    }
-
-    private void initNullBytes() {
-        nullBytes = Lists.newArrayList();
-        nullBytes.add(HIVE_NULL);
-        String[] nullStrings = cubeDesc.getNullStrings();
-        if (nullStrings != null) {
-            for (String s : nullStrings) {
-                nullBytes.add(Bytes.toBytes(s));
-            }
-        }
-    }
-
-    private boolean isNull(byte[] v) {
-        for (byte[] nullByte : nullBytes) {
-            if (Bytes.equals(v, nullByte))
-                return true;
-        }
-        return false;
-    }
-
-    private byte[] buildKey(SplittedBytes[] splitBuffers) {
-        int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
-        for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
-            int index = rowKeyColumnIndexes[i];
-            keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
-            if (isNull(keyBytesBuf[i])) {
-                keyBytesBuf[i] = null;
-            }
-        }
-        return rowKeyEncoder.encode(keyBytesBuf);
-    }
-
-    private void buildValue(SplittedBytes[] splitBuffers) {
-
-        for (int i = 0; i < measures.length; i++) {
-            measures[i] = buildValueOf(i, splitBuffers);
-        }
-
-        valueBuf.clear();
-        measureCodec.encode(measures, valueBuf);
-    }
-
-    private Object buildValueOf(int idxOfMeasure, SplittedBytes[] splitBuffers) {
-        MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
-        FunctionDesc function = measure.getFunction();
-        int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
-
-        int paramCount = function.getParameterCount();
-        String[] inputToMeasure = new String[paramCount];
-
-        // pick up parameter values
-        ParameterDesc param = function.getParameter();
-        int colParamIdx = 0; // index among parameters of column type
-        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
-            String value;
-            if (function.isCount()) {
-                value = "1";
-            } else if (param.isColumnType()) {
-                value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers);
-            } else {
-                value = param.getValue();
-            }
-            inputToMeasure[i] = value;
-        }
-
-        return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
-    }
-
-    private String getCell(int i, SplittedBytes[] splitBuffers) {
-        byte[] bytes = Arrays.copyOf(splitBuffers[i].value, splitBuffers[i].length);
-        if (isNull(bytes))
-            return null;
-        else
-            return Bytes.toString(bytes);
-    }
-
-    @Override
-    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-
-        try {
-            bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-            intermediateTableDesc.sanityCheck(bytesSplitter);
-
-            byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
-            outputKey.set(rowKey, 0, rowKey.length);
-
-            buildValue(bytesSplitter.getSplitBuffers());
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
-            context.write(outputKey, outputValue);
-        } catch (Exception ex) {
-            handleErrorRecord(bytesSplitter, ex);
-        }
-    }
-
-    private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
-
-        ex.printStackTrace(System.err);
-        System.err.println("Insane record: " + bytesSplitter);
-
-        errorRecordCounter++;
-        if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
-            if (ex instanceof IOException)
-                throw (IOException) ex;
-            else if (ex instanceof RuntimeException)
-                throw (RuntimeException) ex;
-            else
-                throw new RuntimeException("", ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
deleted file mode 100644
index 3c1e4a5..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1)
- */
-public class CubeHFileJob extends AbstractHadoopJob {
-
-    protected static final Logger logger = LoggerFactory.getLogger(CubeHFileJob.class);
-
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_HTABLE_NAME);
-            parseOptions(options, args);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-
-            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-
-            setJobClasspath(job);
-
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-            FileOutputFormat.setOutputPath(job, output);
-
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(CubeHFileMapper.class);
-            job.setReducerClass(KeyValueSortReducer.class);
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            Configuration conf = HBaseConfiguration.create(getConf());
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-            HTable htable = new HTable(conf, tableName);
-
-            //Automatic config !
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
-
-            // set block replication to 3 for hfiles
-            conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
-
-            this.deletePath(job.getConfiguration(), output);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            logger.error("error in CubeHFileJob", e);
-            printUsage(options);
-            throw e;
-        } finally {
-            if (job != null)
-                cleanupTempConfFile(job.getConfiguration());
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new CubeHFileJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
deleted file mode 100644
index f12d229..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileMapper.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import com.google.common.collect.Lists;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class CubeHFileMapper extends KylinMapper<Text, Text, ImmutableBytesWritable, KeyValue> {
-
-    ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
-
-    String cubeName;
-    CubeDesc cubeDesc;
-
-    MeasureCodec inputCodec;
-    Object[] inputMeasures;
-    List<KeyValueCreator> keyValueCreators;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
-        CubeManager cubeMgr = CubeManager.getInstance(config);
-        cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
-
-        inputCodec = new MeasureCodec(cubeDesc.getMeasures());
-        inputMeasures = new Object[cubeDesc.getMeasures().size()];
-        keyValueCreators = Lists.newArrayList();
-
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
-            }
-        }
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        outputKey.set(key.getBytes(), 0, key.getLength());
-        KeyValue outputValue;
-
-        int n = keyValueCreators.size();
-        if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for
-                                                            // simple full copy
-
-            outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength());
-            context.write(outputKey, outputValue);
-
-        } else { // normal (complex) case that distributes measures to multiple
-                 // HBase columns
-
-            inputCodec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), inputMeasures);
-
-            for (int i = 0; i < n; i++) {
-                outputValue = keyValueCreators.get(i).create(key, inputMeasures);
-                context.write(outputKey, outputValue);
-            }
-        }
-    }
-
-    class KeyValueCreator {
-        byte[] cfBytes;
-        byte[] qBytes;
-        long timestamp;
-
-        int[] refIndex;
-        MeasureDesc[] refMeasures;
-
-        MeasureCodec codec;
-        Object[] colValues;
-        ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-        boolean isFullCopy;
-
-        public KeyValueCreator(CubeDesc cubeDesc, HBaseColumnDesc colDesc) {
-
-            cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
-            qBytes = Bytes.toBytes(colDesc.getQualifier());
-            timestamp = 0; // use 0 for timestamp
-
-            List<MeasureDesc> measures = cubeDesc.getMeasures();
-            String[] measureNames = getMeasureNames(cubeDesc);
-            String[] refs = colDesc.getMeasureRefs();
-
-            refIndex = new int[refs.length];
-            refMeasures = new MeasureDesc[refs.length];
-            for (int i = 0; i < refs.length; i++) {
-                refIndex[i] = indexOf(measureNames, refs[i]);
-                refMeasures[i] = measures.get(refIndex[i]);
-            }
-
-            codec = new MeasureCodec(refMeasures);
-            colValues = new Object[refs.length];
-
-            isFullCopy = true;
-            for (int i = 0; i < measures.size(); i++) {
-                if (refIndex.length <= i || refIndex[i] != i)
-                    isFullCopy = false;
-            }
-        }
-
-        public KeyValue create(Text key, Object[] measureValues) {
-            for (int i = 0; i < colValues.length; i++) {
-                colValues[i] = measureValues[refIndex[i]];
-            }
-
-            valueBuf.clear();
-            codec.encode(colValues, valueBuf);
-
-            return create(key, valueBuf.array(), 0, valueBuf.position());
-        }
-
-        public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
-            return new KeyValue(key.getBytes(), 0, key.getLength(), //
-                    cfBytes, 0, cfBytes.length, //
-                    qBytes, 0, qBytes.length, //
-                    timestamp, Type.Put, //
-                    value, voffset, vlen);
-        }
-
-        private int indexOf(String[] measureNames, String ref) {
-            for (int i = 0; i < measureNames.length; i++)
-                if (measureNames[i].equalsIgnoreCase(ref))
-                    return i;
-
-            throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames));
-        }
-
-        private String[] getMeasureNames(CubeDesc cubeDesc) {
-            List<MeasureDesc> measures = cubeDesc.getMeasures();
-            String[] result = new String[measures.size()];
-            for (int i = 0; i < measures.size(); i++)
-                result[i] = measures.get(i).getName();
-            return result;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
deleted file mode 100644
index e4875e9..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.CuboidCLI;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- */
-public class CuboidJob extends AbstractHadoopJob {
-
-    protected static final Logger logger = LoggerFactory.getLogger(CuboidJob.class);
-    private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
-
-    @SuppressWarnings("rawtypes")
-    private Class<? extends Mapper> mapperClass;
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_NCUBOID_LEVEL);
-            options.addOption(OPTION_INPUT_FORMAT);
-            parseOptions(options, args);
-
-            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            logger.info("Starting: " + job.getJobName());
-            FileInputFormat.setInputPaths(job, input);
-
-            setJobClasspath(job);
-
-            // Mapper
-            if (this.mapperClass == null) {
-                throw new Exception("Mapper class is not set!");
-            }
-
-            boolean isInputTextFormat = false;
-            if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
-                isInputTextFormat = true;
-            }
-
-            if (isInputTextFormat) {
-                job.setInputFormatClass(TextInputFormat.class);
-
-            } else {
-                job.setInputFormatClass(SequenceFileInputFormat.class);
-            }
-            job.setMapperClass(this.mapperClass);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(Text.class);
-            job.setCombinerClass(CuboidReducer.class); // for base cuboid shuffle skew, some rowkey aggregates far more records than others
-
-            // Reducer
-            job.setReducerClass(CuboidReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(Text.class);
-
-            FileOutputFormat.setOutputPath(job, output);
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-            job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel);
-
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            setReduceTaskNum(job, config, cubeName, nCuboidLevel);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            logger.error("error in CuboidJob", e);
-            printUsage(options);
-            throw e;
-        } finally {
-            if (job != null)
-                cleanupTempConfFile(job.getConfiguration());
-        }
-    }
-
-    protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        Configuration jobConf = job.getConfiguration();
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
-        CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
-
-        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-
-        // total map input MB
-        double totalMapInputMB = this.getTotalMapInputMB();
-
-        // output / input ratio
-        int preLevelCuboids, thisLevelCuboids;
-        if (level == 0) { // base cuboid
-            preLevelCuboids = thisLevelCuboids = 1;
-        } else { // n-cuboid
-            int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc);
-            preLevelCuboids = allLevelCount[level - 1];
-            thisLevelCuboids = allLevelCount[level];
-        }
-
-        // total reduce input MB
-        double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / preLevelCuboids;
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
-
-        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
-        if (cubeDesc.hasMemoryHungryMeasures()) {
-            numReduceTasks = numReduceTasks * 4;
-        }
-
-        // at least 1 reducer
-        numReduceTasks = Math.max(1, numReduceTasks);
-        // no more than 5000 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
-        jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks);
-
-        logger.info("Having total map input MB " + Math.round(totalMapInputMB));
-        logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
-        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio);
-        logger.info("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks);
-    }
-
-    /**
-     * @param mapperClass
-     *            the mapperClass to set
-     */
-    @SuppressWarnings("rawtypes")
-    public void setMapperClass(Class<? extends Mapper> mapperClass) {
-        this.mapperClass = mapperClass;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
deleted file mode 100644
index 3859d0e..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidReducer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.measure.MeasureAggregators;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
-
-    private static final Logger logger = LoggerFactory.getLogger(CuboidReducer.class);
-
-    private String cubeName;
-    private CubeDesc cubeDesc;
-    private List<MeasureDesc> measuresDescs;
-
-    private MeasureCodec codec;
-    private MeasureAggregators aggs;
-
-    private int counter;
-    private int cuboidLevel;
-    private boolean[] needAggr;
-    private Object[] input;
-    private Object[] result;
-
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private Text outputValue = new Text();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        // only used in Build job, not in Merge job
-        cuboidLevel = context.getConfiguration().getInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, 0);
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
-        cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
-        measuresDescs = cubeDesc.getMeasures();
-
-        codec = new MeasureCodec(measuresDescs);
-        aggs = new MeasureAggregators(measuresDescs);
-
-        input = new Object[measuresDescs.size()];
-        result = new Object[measuresDescs.size()];
-        needAggr = new boolean[measuresDescs.size()];
-
-        if (cuboidLevel > 0) {
-            for (int i = 0; i < measuresDescs.size(); i++) {
-                needAggr[i] = !measuresDescs.get(i).getFunction().getMeasureType().onlyAggrInBaseCuboid();
-            }
-        }
-    }
-
-    @Override
-    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
-        aggs.reset();
-
-        for (Text value : values) {
-            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
-            if (cuboidLevel > 0) {
-                aggs.aggregate(input, needAggr);
-            } else {
-                aggs.aggregate(input);
-            }
-        }
-        aggs.collectStates(result);
-
-        valueBuf.clear();
-        codec.encode(result, valueBuf);
-
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
-        context.write(key, outputValue);
-
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
deleted file mode 100644
index 9792463..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsCombiner.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsCombiner extends KylinReducer<ShortWritable, Text, ShortWritable, Text> {
-
-    private Text outputValue = new Text();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-    }
-
-    @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
-        HashSet<ByteArray> set = new HashSet<ByteArray>();
-        for (Text textValue : values) {
-            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
-            set.add(value);
-        }
-
-        for (ByteArray value : set) {
-            outputValue.set(value.data);
-            context.write(key, outputValue);
-        }
-    }
-
-}