You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/12 06:22:40 UTC

[20/97] [abbrv] [partial] incubator-kylin git commit: cleanup for migration from github.com

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityReducer.java
deleted file mode 100644
index 06c98ee..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityReducer.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.kylinolap.common.hll.HyperLogLogPlusCounter;
-import com.kylinolap.cube.kv.RowConstants;
-
-/**
- * @author Jack
- * 
- */
-public class ColumnCardinalityReducer extends Reducer<IntWritable, BytesWritable, IntWritable, LongWritable> {
-
-    public static final int ONE = 1;
-    private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
-
-    @Override
-    public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
-        for (BytesWritable v : values) {
-            int skey = key.get();
-            ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
-            HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter();
-            hll.readRegisters(buffer);
-            getHllc(skey).merge(hll);
-            hll.clear();
-        }
-    }
-
-    private HyperLogLogPlusCounter getHllc(Integer key) {
-        if (!hllcMap.containsKey(key)) {
-            hllcMap.put(key, new HyperLogLogPlusCounter());
-        }
-        return hllcMap.get(key);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        List<Integer> keys = new ArrayList<Integer>();
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        while (it.hasNext()) {
-            keys.add(it.next());
-        }
-        Collections.sort(keys);
-        it = keys.iterator();
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
-            // context.write(new Text("ErrorRate_" + key), new
-            // LongWritable((long)hllc.getErrorRate()));
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java
deleted file mode 100644
index b6ea002..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ /dev/null
@@ -1,254 +0,0 @@
-package com.kylinolap.job.hadoop.cardinality;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.ToolRunner;
-
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-public class HiveColumnCardinalityJob extends AbstractHadoopJob {
-    public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job";
-
-    @SuppressWarnings("static-access")
-    protected static final Option OPTION_FORMAT = OptionBuilder.withArgName("input format").hasArg().isRequired(true).withDescription("The file format").create("iformat");
-
-    @SuppressWarnings("static-access")
-    protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("input_dilim").hasArg().isRequired(false).withDescription("Input delim").create("idelim");
-
-    public static final String KEY_INPUT_DELIM = "INPUT_DELIM";
-    public static final String OUTPUT_PATH = "/tmp/cardinality";
-
-    /**
-     * This is the jar path
-     */
-    private String jarPath;
-    private Configuration conf;
-
-    /**
-     * MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY
-     */
-    private String tokenPath;
-
-    public HiveColumnCardinalityJob() {
-
-    }
-
-    public HiveColumnCardinalityJob(String path, String tokenPath) {
-        this.jarPath = path;
-        this.tokenPath = tokenPath;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.hadoop.conf.Configured#getConf()
-     */
-    @Override
-    public Configuration getConf() {
-        if (conf != null) {
-            return conf;
-        }
-        conf = new JobConf();
-        String path = "/apache/hadoop/conf/";
-        File file = new File(path);
-        if (file.isDirectory()) {
-            File[] files = file.listFiles();
-            for (int i = 0; i < files.length; i++) {
-                File tmp = files[i];
-                if (tmp.getName().endsWith(".xml")) {
-                    try {
-                        conf.addResource(new FileInputStream(tmp));
-                    } catch (FileNotFoundException e) {
-                        e.printStackTrace();
-                    }
-                }
-            }
-        }
-
-        // conf.addResource("/apache/hadoop/conf/mapred-site.xml");
-        if (tokenPath != null) {
-            conf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, tokenPath);
-            conf.set("hadoop.security.authentication", "kerberos");
-            UserGroupInformation.setConfiguration(conf);
-            try {
-                UserGroupInformation.loginUserFromKeytab("b_kylin@CORP.EBAY.COM", "~/.keytabs/b_kylin.keytab");
-                System.out.println("###" + UserGroupInformation.getLoginUser());
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-        return conf;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public int run(String[] args) throws Exception {
-
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_FORMAT);
-            options.addOption(OPTION_INPUT_DELIM);
-
-            parseOptions(options, args);
-
-            // start job
-            String jobName = JOB_TITLE + getOptionsAsString();
-            System.out.println("Starting: " + jobName);
-            Configuration conf = getConf();
-            job = Job.getInstance(conf, jobName);
-
-            // set job configuration - basic
-            if (jarPath == null || !new File(jarPath).exists()) {
-                job.setJarByClass(getClass());
-            } else {
-                job.setJar(jarPath);
-            }
-            FileInputFormat.setInputDirRecursive(job, true);
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-            job.getConfiguration().set("dfs.block.size", "67108864");
-
-            String format = getOptionValue(OPTION_FORMAT);
-            @SuppressWarnings("rawtypes")
-            Class cformat = getFormat(format);
-            String delim = getOptionValue(OPTION_INPUT_DELIM);
-            if (delim != null) {
-                if (delim.equals("t")) {
-                    delim = "\t";
-                } else if (delim.equals("177")) {
-                    delim = "\177";
-                }
-                job.getConfiguration().set(KEY_INPUT_DELIM, delim);
-            }
-
-            // Mapper
-            job.setInputFormatClass(cformat);
-            job.setMapperClass(ColumnCardinalityMapper.class);
-            job.setMapOutputKeyClass(IntWritable.class);
-            job.setMapOutputValueClass(BytesWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(ColumnCardinalityReducer.class);
-            job.setOutputFormatClass(TextOutputFormat.class);
-            job.setOutputKeyClass(IntWritable.class);
-            job.setOutputValueClass(LongWritable.class);
-            job.setNumReduceTasks(1);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            int result = waitForCompletion(job);
-            return result;
-        } catch (Exception e) {
-            printUsage(options);
-            e.printStackTrace(System.err);
-            log.error(e.getLocalizedMessage(), e);
-            return 2;
-        }
-
-    }
-
-    /**
-     * @param format
-     * @throws ClassNotFoundException
-     */
-    @SuppressWarnings("rawtypes")
-    private Class getFormat(String format) throws ClassNotFoundException {
-        if (format.endsWith(".TextInputFormat")) {
-            return Class.forName("org.apache.hadoop.mapreduce.lib.input.TextInputFormat");
-        } else if (format.endsWith(".SequenceFileInputFormat")) {
-            return Class.forName("org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat");
-        } else {
-            return Class.forName(format);
-        }
-
-    }
-
-    public static void main1(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new HiveColumnCardinalityJob(), args);
-        System.exit(exitCode);
-    }
-
-    public static void main(String[] args) {
-
-        String location = "hdfs://apollo-phx-nn.vip.ebay.com:8020/tmp/f1a98d8a-26b9-452e-ab7b-9f01e5a6459b/shipping_sisense_cube_desc_intermediate_table";
-        String tempName = "test";
-        String inputFormat = "org.apache.hadoop.mapred.SequenceFileInputFormat";
-        String delim = "177";
-        String jarPath = "/usr/lib/kylin/kylin-index-latest.jar";
-
-        args = new String[] { "-input", location, "-output", "/tmp/cardinality/" + tempName, "-iformat", inputFormat, "-idelim", delim };
-        HiveColumnCardinalityJob job = new HiveColumnCardinalityJob(jarPath, "/tmp/krb5cc_882");
-        try {
-            ToolRunner.run(job, args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public List<String> readLines(Path location, Configuration conf) throws Exception {
-        FileSystem fileSystem = FileSystem.get(location.toUri(), conf);
-        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
-        FileStatus[] items = fileSystem.listStatus(location);
-        if (items == null)
-            return new ArrayList<String>();
-        List<String> results = new ArrayList<String>();
-        for (FileStatus item : items) {
-
-            // ignoring files like _SUCCESS
-            if (item.getPath().getName().startsWith("_")) {
-                continue;
-            }
-
-            CompressionCodec codec = factory.getCodec(item.getPath());
-            InputStream stream = null;
-
-            // check if we have a compression codec we need to use
-            if (codec != null) {
-                stream = codec.createInputStream(fileSystem.open(item.getPath()));
-            } else {
-                stream = fileSystem.open(item.getPath());
-            }
-
-            StringWriter writer = new StringWriter();
-            IOUtils.copy(stream, writer, "UTF-8");
-            String raw = writer.toString();
-            for (String str : raw.split("\n")) {
-                results.add(str);
-            }
-        }
-        return results;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidJob.java
deleted file mode 100644
index cc25b13..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidJob.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * @author honma
- * 
- */
-
-public class BaseCuboidJob extends CuboidJob {
-    public BaseCuboidJob() {
-        this.setMapperClass(BaseCuboidMapper.class);
-    }
-
-    public static void main(String[] args) throws Exception {
-        CuboidJob job = new BaseCuboidJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidMapper.java
deleted file mode 100644
index 39b3918..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/BaseCuboidMapper.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.common.BytesSplitter;
-import com.kylinolap.cube.common.SplittedBytes;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.cube.kv.AbstractRowKeyEncoder;
-import com.kylinolap.cube.kv.RowConstants;
-import com.kylinolap.cube.measure.MeasureCodec;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.job.hadoop.hive.JoinedFlatTableDesc;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.FunctionDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-import com.kylinolap.metadata.model.cube.ParameterDesc;
-
-/**
- * @author George Song (ysong1)
- */
-public class BaseCuboidMapper<KEYIN> extends Mapper<KEYIN, Text, Text, Text> {
-
-    private static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapper.class);
-
-    public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
-    public static final byte[] ONE = Bytes.toBytes("1");
-
-    private String cubeName;
-    private String segmentName;
-    private Cuboid baseCuboid;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment cubeSegment;
-    private List<byte[]> nullBytes;
-
-    private JoinedFlatTableDesc intermediateTableDesc;
-    private String intermediateTableRowDelimiter;
-    private byte byteRowDelimiter;
-
-    private int counter;
-    private int errorRecordCounter;
-    private Text outputKey = new Text();
-    private Text outputValue = new Text();
-    private Object[] measures;
-    private byte[][] keyBytesBuf;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-    private BytesSplitter bytesSplitter;
-    private AbstractRowKeyEncoder rowKeyEncoder;
-    private MeasureCodec measureCodec;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-        intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
-        if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
-            throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
-        }
-
-        byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        cubeSegment = cube.getSegment(segmentName, CubeSegmentStatusEnum.NEW);
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
-        intermediateTableDesc = new JoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
-
-        bytesSplitter = new BytesSplitter(200, 4096);
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
-        measureCodec = new MeasureCodec(cubeDesc.getMeasures());
-        measures = new Object[cubeDesc.getMeasures().size()];
-
-        int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        keyBytesBuf = new byte[colCount][];
-
-        initNullBytes();
-    }
-
-    private void initNullBytes() {
-        nullBytes = Lists.newArrayList();
-        nullBytes.add(HIVE_NULL);
-        String[] nullStrings = cubeDesc.getNullStrings();
-        if (nullStrings != null) {
-            for (String s : nullStrings) {
-                nullBytes.add(Bytes.toBytes(s));
-            }
-        }
-    }
-
-    private boolean isNull(byte[] v) {
-        for (byte[] nullByte : nullBytes) {
-            if (Bytes.equals(v, nullByte))
-                return true;
-        }
-        return false;
-    }
-
-    private byte[] buildKey(SplittedBytes[] splitBuffers) {
-        int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
-        for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
-            int index = rowKeyColumnIndexes[i];
-            keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
-            if (isNull(keyBytesBuf[i])) {
-                keyBytesBuf[i] = null;
-            }
-        }
-        return rowKeyEncoder.encode(keyBytesBuf);
-    }
-
-    private void buildValue(SplittedBytes[] splitBuffers) {
-
-        for (int i = 0; i < measures.length; i++) {
-            byte[] valueBytes = getValueBytes(splitBuffers, i);
-            measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
-        }
-
-        valueBuf.clear();
-        measureCodec.encode(measures, valueBuf);
-    }
-
-    private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
-        MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
-        FunctionDesc func = desc.getFunction();
-        ParameterDesc paramDesc = func.getParameter();
-        int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-
-        byte[] result = null;
-
-        // constant
-        if (flatTableIdx == null) {
-            result = Bytes.toBytes(paramDesc.getValue());
-        }
-        // column values
-        else {
-            // for multiple columns, their values are joined
-            for (int i = 0; i < flatTableIdx.length; i++) {
-                SplittedBytes split = splitBuffers[flatTableIdx[i]];
-                if (result == null) {
-                    result = Arrays.copyOf(split.value, split.length);
-                } else {
-                    byte[] newResult = new byte[result.length + split.length];
-                    System.arraycopy(result, 0, newResult, 0, result.length);
-                    System.arraycopy(split.value, 0, newResult, result.length, split.length);
-                    result = newResult;
-                }
-            }
-        }
-
-        if (func.isCount() || func.isHolisticCountDistinct()) {
-            // note for holistic count distinct, this value will be ignored
-            result = ONE;
-        }
-
-        if (isNull(result)) {
-            result = null;
-        }
-
-        return result;
-    }
-
-    @Override
-    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-
-        bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-
-        try {
-            byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
-            outputKey.set(rowKey, 0, rowKey.length);
-
-            buildValue(bytesSplitter.getSplitBuffers());
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
-            context.write(outputKey, outputValue);
-        } catch (Exception ex) {
-            handleErrorRecord(bytesSplitter.getSplitBuffers(), ex);
-        }
-    }
-
-    private void handleErrorRecord(SplittedBytes[] splitBuffers, Exception ex) throws IOException {
-        
-        StringBuilder buf = new StringBuilder();
-        buf.append("Error record: [");
-        for (int i = 0; i < splitBuffers.length; i++) {
-            if (i > 0)
-                buf.append(", ");
-            
-            buf.append(Bytes.toString(splitBuffers[i].value, 0, splitBuffers[i].length));
-        }
-        buf.append("] -- ");
-        buf.append(ex.toString());
-        System.err.println(buf.toString());
-        ex.printStackTrace(System.err);
-        
-        errorRecordCounter++;
-        if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
-            if (ex instanceof IOException)
-                throw (IOException) ex;
-            else if (ex instanceof RuntimeException)
-                throw (RuntimeException) ex;
-            else
-                throw new RuntimeException("", ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileJob.java
deleted file mode 100644
index 1f939d1..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileJob.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.File;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-
-public class CubeHFileJob extends AbstractHadoopJob {
-
-    protected static final Logger log = LoggerFactory.getLogger(CubeHFileJob.class);
-
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_HTABLE_NAME);
-            parseOptions(options, args);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-
-            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-
-            File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-            if (JarFile.exists()) {
-                job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-            } else {
-                job.setJarByClass(this.getClass());
-            }
-
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-            FileOutputFormat.setOutputPath(job, output);
-
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(CubeHFileMapper.class);
-            job.setReducerClass(KeyValueSortReducer.class);
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            Configuration conf = HBaseConfiguration.create(getConf());
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
-            HTable htable = new HTable(conf, tableName);
-
-            //Automatic config !
-            HFileOutputFormat.configureIncrementalLoad(job, htable);
-
-            // set block replication to 3 for hfiles
-            conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");
-
-            this.deletePath(job.getConfiguration(), output);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            log.error(e.getLocalizedMessage(), e);
-            return 2;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new CubeHFileJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileMapper.java
deleted file mode 100644
index 4f9b5c8..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/CubeHFileMapper.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.google.common.collect.Lists;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.kv.RowConstants;
-import com.kylinolap.cube.measure.MeasureCodec;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.HBaseColumnDesc;
-import com.kylinolap.metadata.model.cube.HBaseColumnFamilyDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class CubeHFileMapper extends Mapper<Text, Text, ImmutableBytesWritable, KeyValue> {
-
-    ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
-
-    String cubeName;
-    CubeDesc cubeDesc;
-
-    MeasureCodec inputCodec;
-    Object[] inputMeasures;
-    List<KeyValueCreator> keyValueCreators;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
-        CubeManager cubeMgr = CubeManager.getInstance(config);
-        cubeDesc = cubeMgr.getCube(cubeName).getDescriptor();
-
-        inputCodec = new MeasureCodec(cubeDesc.getMeasures());
-        inputMeasures = new Object[cubeDesc.getMeasures().size()];
-        keyValueCreators = Lists.newArrayList();
-
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                keyValueCreators.add(new KeyValueCreator(cubeDesc, colDesc));
-            }
-        }
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        outputKey.set(key.getBytes(), 0, key.getLength());
-        KeyValue outputValue;
-
-        int n = keyValueCreators.size();
-        if (n == 1 && keyValueCreators.get(0).isFullCopy) { // shortcut for
-                                                            // simple full copy
-
-            outputValue = keyValueCreators.get(0).create(key, value.getBytes(), 0, value.getLength());
-            context.write(outputKey, outputValue);
-
-        } else { // normal (complex) case that distributes measures to multiple
-                 // HBase columns
-
-            inputCodec.decode(value, inputMeasures);
-
-            for (int i = 0; i < n; i++) {
-                outputValue = keyValueCreators.get(i).create(key, inputMeasures);
-                context.write(outputKey, outputValue);
-            }
-        }
-    }
-
-    class KeyValueCreator {
-        byte[] cfBytes;
-        byte[] qBytes;
-        long timestamp;
-
-        int[] refIndex;
-        MeasureDesc[] refMeasures;
-
-        MeasureCodec codec;
-        Object[] colValues;
-        ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-        boolean isFullCopy;
-
-        public KeyValueCreator(CubeDesc cubeDesc, HBaseColumnDesc colDesc) {
-
-            cfBytes = Bytes.toBytes(colDesc.getColumnFamilyName());
-            qBytes = Bytes.toBytes(colDesc.getQualifier());
-            timestamp = System.currentTimeMillis();
-
-            List<MeasureDesc> measures = cubeDesc.getMeasures();
-            String[] measureNames = getMeasureNames(cubeDesc);
-            String[] refs = colDesc.getMeasureRefs();
-
-            refIndex = new int[refs.length];
-            refMeasures = new MeasureDesc[refs.length];
-            for (int i = 0; i < refs.length; i++) {
-                refIndex[i] = indexOf(measureNames, refs[i]);
-                refMeasures[i] = measures.get(refIndex[i]);
-            }
-
-            codec = new MeasureCodec(refMeasures);
-            colValues = new Object[refs.length];
-
-            isFullCopy = true;
-            for (int i = 0; i < measures.size(); i++) {
-                if (refIndex.length <= i || refIndex[i] != i)
-                    isFullCopy = false;
-            }
-        }
-
-        public KeyValue create(Text key, Object[] measureValues) {
-            for (int i = 0; i < colValues.length; i++) {
-                colValues[i] = measureValues[refIndex[i]];
-            }
-
-            valueBuf.clear();
-            codec.encode(colValues, valueBuf);
-
-            return create(key, valueBuf.array(), 0, valueBuf.position());
-        }
-
-        public KeyValue create(Text key, byte[] value, int voffset, int vlen) {
-            return new KeyValue(key.getBytes(), 0, key.getLength(), //
-                    cfBytes, 0, cfBytes.length, //
-                    qBytes, 0, qBytes.length, //
-                    timestamp, Type.Put, //
-                    value, voffset, vlen);
-        }
-
-        private int indexOf(String[] measureNames, String ref) {
-            for (int i = 0; i < measureNames.length; i++)
-                if (measureNames[i].equalsIgnoreCase(ref))
-                    return i;
-
-            throw new IllegalArgumentException("Measure '" + ref + "' not found in " + Arrays.toString(measureNames));
-        }
-
-        private String[] getMeasureNames(CubeDesc cubeDesc) {
-            List<MeasureDesc> measures = cubeDesc.getMeasures();
-            String[] result = new String[measures.size()];
-            for (int i = 0; i < measures.size(); i++)
-                result[i] = measures.get(i).getName();
-            return result;
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidJob.java
deleted file mode 100644
index 687fe10..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidJob.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.cuboid.CuboidCLI;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.exception.JobException;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-
-/**
- * @author ysong1
- */
-public class CuboidJob extends AbstractHadoopJob {
-
-    protected static final Logger log = LoggerFactory.getLogger(CuboidJob.class);
-    private static final String MAPRED_REDUCE_TASKS = "mapred.reduce.tasks";
-
-    @SuppressWarnings("rawtypes")
-    private Class<? extends Mapper> mapperClass;
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_NCUBOID_LEVEL);
-            options.addOption(OPTION_INPUT_FORMAT);
-            parseOptions(options, args);
-
-            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            System.out.println("Starting: " + job.getJobName());
-            FileInputFormat.setInputPaths(job, input);
-
-            File jarFile = new File(config.getKylinJobJarPath());
-            if (jarFile.exists()) {
-                job.setJar(config.getKylinJobJarPath());
-            } else {
-                job.setJarByClass(this.getClass());
-            }
-
-            // Mapper
-            if (this.mapperClass == null) {
-                throw new Exception("Mapper class is not set!");
-            }
-
-            boolean isInputTextFormat = false;
-            if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) {
-                isInputTextFormat = true;
-            }
-
-            if (isInputTextFormat) {
-                job.setInputFormatClass(TextInputFormat.class);
-
-            } else {
-                job.setInputFormatClass(SequenceFileInputFormat.class);
-            }
-            job.setMapperClass(this.mapperClass);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(Text.class);
-            job.setCombinerClass(CuboidReducer.class); // for base cuboid shuffle skew, some rowkey aggregates far more records than others
-
-            // Reducer
-            job.setReducerClass(CuboidReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(Text.class);
-
-            FileOutputFormat.setOutputPath(job, output);
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            setReduceTaskNum(job, config, cubeName, nCuboidLevel);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            log.error(e.getLocalizedMessage(), e);
-            return 2;
-        }
-    }
-
-    protected void setReduceTaskNum(Job job, KylinConfig config, String cubeName, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        Configuration jobConf = job.getConfiguration();
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-
-        CubeDesc cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
-
-        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-
-        // total map input MB
-        double totalMapInputMB = this.getTotalMapInputMB();
-
-        // output / input ratio
-        int preLevelCuboids, thisLevelCuboids;
-        if (level == 0) { // base cuboid
-            preLevelCuboids = thisLevelCuboids = 1;
-        } else { // n-cuboid
-            int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc);
-            preLevelCuboids = allLevelCount[level - 1];
-            thisLevelCuboids = allLevelCount[level];
-        }
-
-        // total reduce input MB
-        double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / preLevelCuboids;
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
-
-        // adjust reducer number for cube which has DISTINCT_COUNT measures for
-        // better performance
-        if (cubeDesc.hasHolisticCountDistinctMeasures()) {
-            numReduceTasks = numReduceTasks * 4;
-        }
-
-        // at least 1 reducer
-        numReduceTasks = Math.max(1, numReduceTasks);
-        // no more than 5000 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
-        jobConf.setInt(MAPRED_REDUCE_TASKS, numReduceTasks);
-
-        System.out.println("Having total map input MB " + Math.round(totalMapInputMB));
-        System.out.println("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
-        System.out.println("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio);
-        System.out.println("Setting " + MAPRED_REDUCE_TASKS + "=" + numReduceTasks);
-    }
-
-    /**
-     * @param mapperClass
-     *            the mapperClass to set
-     */
-    @SuppressWarnings("rawtypes")
-    public void setMapperClass(Class<? extends Mapper> mapperClass) {
-        this.mapperClass = mapperClass;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidReducer.java
deleted file mode 100644
index 50af652..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/CuboidReducer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.kv.RowConstants;
-import com.kylinolap.cube.measure.MeasureAggregators;
-import com.kylinolap.cube.measure.MeasureCodec;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.MeasureDesc;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class CuboidReducer extends Reducer<Text, Text, Text, Text> {
-
-    private static final Logger logger = LoggerFactory.getLogger(CuboidReducer.class);
-
-    private String cubeName;
-    private CubeDesc cubeDesc;
-    private List<MeasureDesc> measuresDescs;
-
-    private MeasureCodec codec;
-    private MeasureAggregators aggs;
-
-    private int counter;
-    private Object[] input;
-    private Object[] result;
-
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private Text outputValue = new Text();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
-        cubeDesc = CubeManager.getInstance(config).getCube(cubeName).getDescriptor();
-        measuresDescs = cubeDesc.getMeasures();
-
-        codec = new MeasureCodec(measuresDescs);
-        aggs = new MeasureAggregators(measuresDescs);
-
-        input = new Object[measuresDescs.size()];
-        result = new Object[measuresDescs.size()];
-    }
-
-    @Override
-    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
-        aggs.reset();
-
-        for (Text value : values) {
-            codec.decode(value, input);
-            aggs.aggregate(input);
-        }
-        aggs.collectStates(result);
-
-        valueBuf.clear();
-        codec.encode(result, valueBuf);
-
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
-        context.write(key, outputValue);
-
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsCombiner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsCombiner.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsCombiner.java
deleted file mode 100644
index 916dd9e..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsCombiner.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.HashSet;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.kylinolap.common.util.ByteArray;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsCombiner extends Reducer<ShortWritable, Text, ShortWritable, Text> {
-
-    private Text outputValue = new Text();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-    }
-
-    @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-
-        HashSet<ByteArray> set = new HashSet<ByteArray>();
-        for (Text textValue : values) {
-            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
-            set.add(value);
-        }
-
-        for (ByteArray value : set) {
-            outputValue.set(value.data);
-            context.write(key, outputValue);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java
deleted file mode 100644
index 556f690..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsJob.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsJob extends AbstractHadoopJob {
-    protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_INPUT_FORMAT);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            String cubeName = getOptionValue(OPTION_CUBE_NAME);
-            Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
-            String inputFormat = getOptionValue(OPTION_INPUT_FORMAT);
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            // ----------------------------------------------------------------------------
-
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            System.out.println("Starting: " + job.getJobName());
-
-            setupMapInput(input, inputFormat);
-            setupReduceOutput(output);
-
-            // add metadata to distributed cache
-            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
-            attachKylinPropsAndMetadata(cubeMgr.getCube(cubeName), job.getConfiguration());
-
-            return waitForCompletion(job);
-
-        } catch (Exception e) {
-            printUsage(options);
-            log.error(e.getLocalizedMessage(), e);
-            return 2;
-        }
-
-    }
-
-    private void setupMapInput(Path input, String inputFormat) throws IOException {
-        FileInputFormat.setInputPaths(job, input);
-
-        File JarFile = new File(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-        if (JarFile.exists()) {
-            job.setJar(KylinConfig.getInstanceFromEnv().getKylinJobJarPath());
-        } else {
-            job.setJarByClass(this.getClass());
-        }
-
-        if ("text".equalsIgnoreCase(inputFormat) || "textinputformat".equalsIgnoreCase(inputFormat)) {
-            job.setInputFormatClass(TextInputFormat.class);
-        } else {
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-        }
-        job.setMapperClass(FactDistinctColumnsMapper.class);
-        job.setCombinerClass(FactDistinctColumnsCombiner.class);
-        job.setMapOutputKeyClass(ShortWritable.class);
-        job.setMapOutputValueClass(Text.class);
-    }
-
-    private void setupReduceOutput(Path output) throws IOException {
-        job.setReducerClass(FactDistinctColumnsReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(NullWritable.class);
-        job.setOutputValueClass(Text.class);
-
-        FileOutputFormat.setOutputPath(job, output);
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
-        job.setNumReduceTasks(1);
-
-        deletePath(job.getConfiguration(), output);
-    }
-
-    public static void main(String[] args) throws Exception {
-        FactDistinctColumnsJob job = new FactDistinctColumnsJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java
deleted file mode 100644
index 236658d..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsMapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.common.BytesSplitter;
-import com.kylinolap.cube.common.SplittedBytes;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.dict.DictionaryManager;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.job.hadoop.hive.JoinedFlatTableDesc;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.RowKeyDesc;
-import com.kylinolap.metadata.model.cube.TblColRef;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsMapper<KEYIN> extends Mapper<KEYIN, Text, ShortWritable, Text> {
-
-    private String cubeName;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private int[] factDictCols;
-
-    private JoinedFlatTableDesc intermediateTableDesc;
-    private String intermediateTableRowDelimiter;
-    private byte byteRowDelimiter;
-    private BytesSplitter bytesSplitter;
-
-    private ShortWritable outputKey = new ShortWritable();
-    private Text outputValue = new Text();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        Configuration conf = context.getConfiguration();
-        intermediateTableRowDelimiter = conf.get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
-        byteRowDelimiter = intermediateTableRowDelimiter.getBytes("UTF-8")[0];
-        bytesSplitter = new BytesSplitter(200, 4096);
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
-        cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        intermediateTableDesc = new JoinedFlatTableDesc(cubeDesc, null);
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-        List<TblColRef> columns = baseCuboid.getColumns();
-
-        ArrayList<Integer> factDictCols = new ArrayList<Integer>();
-        RowKeyDesc rowkey = cubeDesc.getRowkey();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-        for (int i = 0; i < columns.size(); i++) {
-            TblColRef col = columns.get(i);
-            if (rowkey.isUseDictionary(col) == false)
-                continue;
-
-            String scanTable = (String) dictMgr.decideSourceData(cubeDesc, col, null)[0];
-            if (cubeDesc.isFactTable(scanTable)) {
-                System.out.println(col + " -- " + i);
-                factDictCols.add(i);
-            }
-        }
-        this.factDictCols = new int[factDictCols.size()];
-        for (int i = 0; i < factDictCols.size(); i++)
-            this.factDictCols[i] = factDictCols.get(i);
-    }
-
-    @Override
-    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
-
-        bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-        SplittedBytes[] splitBuffers = bytesSplitter.getSplitBuffers();
-
-        int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
-        for (int i : factDictCols) {
-            outputKey.set((short) i);
-            SplittedBytes bytes = splitBuffers[flatTableIndexes[i]];
-            outputValue.set(bytes.value, 0, bytes.length);
-            System.out.println(i + " -- " + outputValue);
-            context.write(outputKey, outputValue);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsReducer.java
deleted file mode 100644
index e701781..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.ByteArray;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.cuboid.Cuboid;
-import com.kylinolap.job.constant.BatchConstants;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.TblColRef;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsReducer extends Reducer<ShortWritable, Text, NullWritable, Text> {
-
-    private List<TblColRef> columnList = new ArrayList<TblColRef>();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
-        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeDesc cubeDesc = cube.getDescriptor();
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-        columnList = baseCuboid.getColumns();
-    }
-
-    @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-        TblColRef col = columnList.get(key.get());
-
-        HashSet<ByteArray> set = new HashSet<ByteArray>();
-        for (Text textValue : values) {
-            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
-            set.add(value);
-        }
-
-        Configuration conf = context.getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
-        FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
-
-        try {
-            for (ByteArray value : set) {
-                out.write(value.data);
-                out.write('\n');
-            }
-        } finally {
-            out.close();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionJob.java
deleted file mode 100644
index 0c14abb..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionJob.java
+++ /dev/null
@@ -1,139 +0,0 @@
-package com.kylinolap.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package com.kylinolap.index.cube;
-//
-//import org.apache.commons.cli.Options;
-//import org.apache.hadoop.fs.FileSystem;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Job;
-//import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-//import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-//import org.apache.hadoop.util.ToolRunner;
-//
-//import com.kylinolap.cube.CubeInstance;
-//import com.kylinolap.cube.CubeManager;
-//import com.kylinolap.cube.cuboid.Cuboid;
-//import com.kylinolap.cube.kv.AbstractRowKeyEncoder;
-//import com.kylinolap.cube.kv.RowKeyEncoder;
-//import com.kylinolap.index.AbstractHadoopJob;
-//import com.kylinolap.metadata.model.cube.CubeDesc;
-//
-///**
-// * @author xjiang
-// *
-// */
-//
-//public class KeyDistributionJob extends AbstractHadoopJob {
-//
-//    public static final String JOB_TITLE = "Kylin Row Key Distribution Job";
-//    public static final String KEY_HEADER_LENGTH = "key_header_length";
-//    public static final String KEY_COLUMN_PERCENTAGE = "key_column_percentage";
-//    public static final String KEY_SPLIT_NUMBER = "key_split_number";
-//
-//    /* (non-Javadoc)
-//     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-//     */
-//    @Override
-//    public int run(String[] args) throws Exception {
-//        Options options = new Options();
-//
-//        try {
-//            options.addOption(OPTION_INPUT_PATH);
-//            options.addOption(OPTION_OUTPUT_PATH);
-//            options.addOption(OPTION_METADATA_URL);
-//            options.addOption(OPTION_CUBE_NAME);
-//            options.addOption(OPTION_KEY_COLUMN_PERCENTAGE);
-//            options.addOption(OPTION_KEY_SPLIT_NUMBER);
-//            parseOptions(options, args);
-//
-//            // start job
-//            String jobName = JOB_TITLE + getOptionsAsString();
-//            System.out.println("Starting: " + jobName);
-//            Job job = Job.getInstanceFromEnv(getConf(), jobName);
-//
-//            // set job configuration - basic 
-//            job.setJarByClass(getClass());
-//            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-//
-//            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-//            FileOutputFormat.setOutputPath(job, output);
-//            //job.getConfiguration().set("dfs.block.size", "67108864");
-//
-//            // set job configuration - key prefix size & key split number
-//            String keyColumnPercentage = getOptionValue(OPTION_KEY_COLUMN_PERCENTAGE);
-//            job.getConfiguration().set(KEY_COLUMN_PERCENTAGE, keyColumnPercentage);
-//            String metadataUrl = validateMetadataUrl(getOptionValue(OPTION_METADATA_URL));
-//            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-//            int keyHeaderLen = getKeyHeaderLength(metadataUrl, cubeName);
-//            job.getConfiguration().set(KEY_HEADER_LENGTH, String.valueOf(keyHeaderLen));
-//            job.getConfiguration().set(KEY_SPLIT_NUMBER, getOptionValue(OPTION_KEY_SPLIT_NUMBER));
-//
-//            // Mapper
-//            job.setInputFormatClass(SequenceFileInputFormat.class);
-//            job.setMapperClass(KeyDistributionMapper.class);
-//            job.setMapOutputKeyClass(Text.class);
-//            job.setMapOutputValueClass(LongWritable.class);
-//
-//            // Combiner, not needed any more as mapper now does the groping
-//            //job.setCombinerClass(KeyDistributionCombiner.class);
-//
-//            // Reducer - only one
-//            job.setReducerClass(KeyDistributionReducer.class);
-//            // use sequence file as output
-//            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-//            // key is text
-//            job.setOutputKeyClass(Text.class);
-//            // value is long
-//            job.setOutputValueClass(LongWritable.class);
-//            job.setNumReduceTasks(1);
-//
-//            FileSystem fs = FileSystem.get(job.getConfiguration());
-//            if (fs.exists(output))
-//                fs.delete(output, true);
-//
-//            return waitForCompletion(job);
-//        } catch (Exception e) {
-//            printUsage(options);
-//            e.printStackTrace(System.err);
-//            return 2;
-//        }
-//    }
-//
-//    private int getKeyHeaderLength(String metadataUrl, String cubeName) {
-//        CubeManager cubeMgr = CubeManager.getInstanceFromEnv(metadataUrl);
-//        CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
-//        CubeDesc cubeDesc = cubeInstance.getDescriptor();
-//        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-//        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-//        RowKeyEncoder rowKeyEncoder =
-//                (RowKeyEncoder) AbstractRowKeyEncoder.createInstance(cubeInstance.getTheOnlySegment(),
-//                        baseCuboid);
-//
-//        return rowKeyEncoder.getHeaderLength();
-//
-//    }
-//
-//    public static void main(String[] args) throws Exception {
-//        int exitCode = ToolRunner.run(new KeyDistributionJob(), args);
-//        System.exit(exitCode);
-//    }
-// }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionMapper.java
deleted file mode 100644
index 037c3d9..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionMapper.java
+++ /dev/null
@@ -1,104 +0,0 @@
-package com.kylinolap.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package com.kylinolap.index.cube;
-//
-//import java.io.IOException;
-//
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Mapper;
-//
-///**
-// * @author xjiang
-// *
-// */
-//public class KeyDistributionMapper extends Mapper<Text, Text, Text, LongWritable> {
-//
-//    private int headerLength;
-//
-//    private Text currentKey;
-//    private long outputLong;
-//    private Text outputKey;
-//    private LongWritable outputValue;
-//    private int columnPercentage;
-//    private int allRowCount;
-//
-//    @Override
-//    protected void setup(Context context) throws IOException {
-//        String percentStr = context.getConfiguration().get(KeyDistributionJob.KEY_COLUMN_PERCENTAGE);
-//        this.columnPercentage = Integer.valueOf(percentStr).intValue();
-//        if (this.columnPercentage <= 0 || this.columnPercentage >= 100) {
-//            this.columnPercentage = 20;
-//        }
-//        String headerLenStr = context.getConfiguration().get(KeyDistributionJob.KEY_HEADER_LENGTH);
-//        this.headerLength = Integer.valueOf(headerLenStr).intValue();
-//
-//        currentKey = new Text();
-//        outputLong = 0;
-//        outputKey = new Text();
-//        outputValue = new LongWritable(1);
-//        allRowCount = 0;
-//    }
-//
-//    @Override
-//    protected void cleanup(Context context) throws IOException, InterruptedException {
-//        emit(context); // emit the last holding record
-//
-//        byte[] zerokey = new byte[] { 0 };
-//        outputKey.set(zerokey);
-//        outputValue.set(allRowCount);
-//        context.write(outputKey, outputValue);
-//    }
-//
-//    @Override
-//    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-//        byte[] bytes = key.getBytes();
-//        int columnLength = bytes.length - this.headerLength;
-//        int columnPrefixLen = columnLength * this.columnPercentage / 100;
-//        if (columnPrefixLen == 0 && columnLength > 0) {
-//            columnPrefixLen = 1;
-//        }
-//        if (columnPrefixLen > 0) {
-//            currentKey.set(bytes, 0, this.headerLength + columnPrefixLen);
-//        } else {
-//            currentKey.set(bytes);
-//        }
-//
-//        allRowCount++;
-//
-//        if (outputKey.getLength() == 0) { // first record
-//            outputKey.set(currentKey);
-//            outputLong = 1;
-//        } else if (outputKey.equals(currentKey)) { // same key, note input is sorted
-//            outputLong++;
-//        } else { // the next key
-//            emit(context);
-//            outputKey.set(currentKey);
-//            outputLong = 1;
-//        }
-//    }
-//
-//    private void emit(Context context) throws IOException, InterruptedException {
-//        if (outputLong == 0)
-//            return;
-//
-//        outputValue.set(outputLong);
-//        context.write(outputKey, outputValue);
-//    }
-// }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionReducer.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionReducer.java
deleted file mode 100644
index e0edc20..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/KeyDistributionReducer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.kylinolap.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package com.kylinolap.index.cube;
-//
-//import java.io.IOException;
-//
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Reducer;
-//import org.apache.hadoop.util.StringUtils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-///**
-// * @author xjiang
-// *
-// */
-//public class KeyDistributionReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
-//
-//    private static final Logger logger = LoggerFactory.getLogger(KeyDistributionReducer.class);
-//
-//    private LongWritable outputValue;
-//    private boolean isTotalCount;
-//    private long totalCount;
-//    private int splitNumber;
-//    private long splitQuota;
-//    private long splitRemain;
-//
-//    @Override
-//    protected void setup(Context context) throws IOException, InterruptedException {
-//        String splitStr = context.getConfiguration().get(KeyDistributionJob.KEY_SPLIT_NUMBER);
-//        splitNumber = Integer.valueOf(splitStr).intValue();
-//        outputValue = new LongWritable();
-//        isTotalCount = true;
-//        totalCount = 0;
-//        splitQuota = 0;
-//        splitRemain = 0;
-//    }
-//
-//    @Override
-//    protected void cleanup(Context context) throws IOException, InterruptedException {
-//        logger.info("---------------");
-//        long splitCount = splitQuota - splitRemain;
-//        logger.info("Total Count = " + totalCount + ", Left Count = " + splitCount);
-//    }
-//
-//    @Override
-//    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
-//            InterruptedException {
-//
-//        // calculate split quota
-//        if (isTotalCount) {
-//            for (LongWritable count : values) {
-//                totalCount += count.get();
-//            }
-//            splitQuota = totalCount / splitNumber;
-//            splitRemain = splitQuota;
-//            isTotalCount = false;
-//            return;
-//        }
-//
-//        // output key when split quota is used up 
-//        for (LongWritable count : values) {
-//            splitRemain -= count.get();
-//        }
-//        if (splitRemain <= 0) {
-//            long splitCount = splitQuota - splitRemain;
-//            String hexKey = StringUtils.byteToHexString(key.getBytes());
-//            logger.info(hexKey + "\t\t" + splitCount);
-//
-//            outputValue.set(splitCount);
-//            context.write(key, outputValue);
-//            splitRemain = splitQuota;
-//        }
-//
-//    }
-// }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidJob.java b/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidJob.java
deleted file mode 100644
index d97e135..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cube/MergeCuboidJob.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cube;
-
-import java.io.File;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- */
-public class MergeCuboidJob extends CuboidJob {
-
-    private static final Logger log = LoggerFactory.getLogger(MergeCuboidJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            // CubeSegment cubeSeg = cubeMgr.findSegment(cube, segmentName);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            System.out.println("Starting: " + jobName);
-            job = Job.getInstance(getConf(), jobName);
-
-            // set job configuration - basic
-            File JarFile = new File(config.getKylinJobJarPath());
-            if (JarFile.exists()) {
-                job.setJar(config.getKylinJobJarPath());
-            } else {
-                job.setJarByClass(this.getClass());
-            }
-
-            // setJobJar(job);
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(MergeCuboidMapper.class);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(Text.class);
-
-            // Reducer - only one
-            job.setReducerClass(CuboidReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(Text.class);
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            setReduceTaskNum(job, config, cubeName, 0);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            log.error(e.getLocalizedMessage(), e);
-            return 2;
-        }
-    }
-
-}