You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/11 13:49:48 UTC

[09/51] [partial] kylin git commit: KYLIN-1416 keep only website in document branch

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
deleted file mode 100644
index 56c9659..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsJob.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsJob extends AbstractHadoopJob {
-    protected static final Logger log = LoggerFactory.getLogger(FactDistinctColumnsJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_TABLE_NAME);
-            parseOptions(options, args);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            Configuration jobConf = job.getConfiguration();
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME);
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
-
-            // ----------------------------------------------------------------------------
-            // add metadata to distributed cache
-            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
-
-            jobConf.set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            System.out.println("Starting: " + job.getJobName());
-
-            setJobClasspath(job);
-
-            setupMapper(intermediateTable);
-            setupReducer(output);
-
-            // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment();
-            attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration());
-
-            return waitForCompletion(job);
-
-        } catch (Exception e) {
-            logger.error("error in FactDistinctColumnsJob", e);
-            printUsage(options);
-            throw e;
-        } finally {
-            if (job != null) {
-                cleanupTempConfFile(job.getConfiguration());
-            }
-        }
-
-    }
-
-    private void setupMapper(String intermediateTable) throws IOException {
-        //        FileInputFormat.setInputPaths(job, input);
-
-        String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-        HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-
-        job.setInputFormatClass(HCatInputFormat.class);
-        job.setMapperClass(FactDistinctColumnsMapper.class);
-        job.setCombinerClass(FactDistinctColumnsCombiner.class);
-        job.setMapOutputKeyClass(ShortWritable.class);
-        job.setMapOutputValueClass(Text.class);
-    }
-
-    private void setupReducer(Path output) throws IOException {
-        job.setReducerClass(FactDistinctColumnsReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(NullWritable.class);
-        job.setOutputValueClass(Text.class);
-
-        FileOutputFormat.setOutputPath(job, output);
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
-        job.setNumReduceTasks(1);
-
-        deletePath(job.getConfiguration(), output);
-    }
-
-    public static void main(String[] args) throws Exception {
-        FactDistinctColumnsJob job = new FactDistinctColumnsJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
deleted file mode 100644
index 72802aa..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.RowKeyDesc;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, ShortWritable, Text> {
-
-    private String cubeName;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private int[] factDictCols;
-
-    private CubeJoinedFlatTableDesc intermediateTableDesc;
-
-    private ShortWritable outputKey = new ShortWritable();
-    private Text outputValue = new Text();
-    private int errorRecordCounter;
-
-    private HCatSchema schema = null;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
-        cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-        List<TblColRef> columns = baseCuboid.getColumns();
-
-        ArrayList<Integer> factDictCols = new ArrayList<Integer>();
-        RowKeyDesc rowkey = cubeDesc.getRowkey();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-        for (int i = 0; i < columns.size(); i++) {
-            TblColRef col = columns.get(i);
-            if (rowkey.isUseDictionary(col) == false)
-                continue;
-
-            String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-            if (cubeDesc.getModel().isFactTable(scanTable)) {
-                factDictCols.add(i);
-            }
-        }
-        this.factDictCols = new int[factDictCols.size()];
-        for (int i = 0; i < factDictCols.size(); i++)
-            this.factDictCols[i] = factDictCols.get(i);
-
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
-    }
-
-    @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-
-        try {
-
-            int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
-            HCatFieldSchema fieldSchema = null;
-            for (int i : factDictCols) {
-                outputKey.set((short) i);
-                fieldSchema = schema.get(flatTableIndexes[i]);
-                Object fieldValue = record.get(fieldSchema.getName(), schema);
-                if (fieldValue == null)
-                    continue;
-                byte[] bytes = Bytes.toBytes(fieldValue.toString());
-                outputValue.set(bytes, 0, bytes.length);
-                context.write(outputKey, outputValue);
-            }
-        } catch (Exception ex) {
-            handleErrorRecord(record, ex);
-        }
-
-    }
-
-    private void handleErrorRecord(HCatRecord record, Exception ex) throws IOException {
-
-        System.err.println("Insane record: " + record.getAll());
-        ex.printStackTrace(System.err);
-
-        errorRecordCounter++;
-        if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
-            if (ex instanceof IOException)
-                throw (IOException) ex;
-            else if (ex instanceof RuntimeException)
-                throw (RuntimeException) ex;
-            else
-                throw new RuntimeException("", ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
deleted file mode 100644
index 89f90ba..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-/**
- * @author yangli9
- */
-public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
-
-    private List<TblColRef> columnList = new ArrayList<TblColRef>();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
-        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeDesc cubeDesc = cube.getDescriptor();
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-        columnList = baseCuboid.getColumns();
-    }
-
-    @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
-        TblColRef col = columnList.get(key.get());
-
-        HashSet<ByteArray> set = new HashSet<ByteArray>();
-        for (Text textValue : values) {
-            ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength()));
-            set.add(value);
-        }
-
-        Configuration conf = context.getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        String outputPath = conf.get(BatchConstants.OUTPUT_PATH);
-        FSDataOutputStream out = fs.create(new Path(outputPath, col.getName()));
-
-        try {
-            for (ByteArray value : set) {
-                out.write(value.data);
-                out.write('\n');
-            }
-        } finally {
-            out.close();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
deleted file mode 100644
index befa16f..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionJob.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.kylin.index.cube;
-//
-//import org.apache.commons.cli.Options;
-//import org.apache.hadoop.fs.FileSystem;
-//import org.apache.hadoop.fs.Path;
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Job;
-//import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-//import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-//import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-//import org.apache.hadoop.util.ToolRunner;
-//
-//import org.apache.kylin.cube.CubeInstance;
-//import org.apache.kylin.cube.CubeManager;
-//import org.apache.kylin.cube.cuboid.Cuboid;
-//import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-//import org.apache.kylin.cube.kv.RowKeyEncoder;
-//import org.apache.kylin.index.AbstractHadoopJob;
-//import org.apache.kylin.metadata.model.cube.CubeDesc;
-//
-///**
-// * @author xjiang
-// *
-// */
-//
-//public class KeyDistributionJob extends AbstractHadoopJob {
-//
-//    public static final String JOB_TITLE = "Kylin Row Key Distribution Job";
-//    public static final String KEY_HEADER_LENGTH = "key_header_length";
-//    public static final String KEY_COLUMN_PERCENTAGE = "key_column_percentage";
-//    public static final String KEY_SPLIT_NUMBER = "key_split_number";
-//
-//    /* (non-Javadoc)
-//     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-//     */
-//    @Override
-//    public int run(String[] args) throws Exception {
-//        Options options = new Options();
-//
-//        try {
-//            options.addOption(OPTION_INPUT_PATH);
-//            options.addOption(OPTION_OUTPUT_PATH);
-//            options.addOption(OPTION_METADATA_URL);
-//            options.addOption(OPTION_CUBE_NAME);
-//            options.addOption(OPTION_KEY_COLUMN_PERCENTAGE);
-//            options.addOption(OPTION_KEY_SPLIT_NUMBER);
-//            parseOptions(options, args);
-//
-//            // start job
-//            String jobName = JOB_TITLE + getOptionsAsString();
-//            System.out.println("Starting: " + jobName);
-//            Job job = Job.getInstanceFromEnv(getConf(), jobName);
-//
-//            // set job configuration - basic 
-//            setJobClasspath(job);
-//            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-//
-//            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-//            FileOutputFormat.setOutputPath(job, output);
-//            //job.getConfiguration().set("dfs.block.size", "67108864");
-//
-//            // set job configuration - key prefix size & key split number
-//            String keyColumnPercentage = getOptionValue(OPTION_KEY_COLUMN_PERCENTAGE);
-//            job.getConfiguration().set(KEY_COLUMN_PERCENTAGE, keyColumnPercentage);
-//            String metadataUrl = validateMetadataUrl(getOptionValue(OPTION_METADATA_URL));
-//            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-//            int keyHeaderLen = getKeyHeaderLength(metadataUrl, cubeName);
-//            job.getConfiguration().set(KEY_HEADER_LENGTH, String.valueOf(keyHeaderLen));
-//            job.getConfiguration().set(KEY_SPLIT_NUMBER, getOptionValue(OPTION_KEY_SPLIT_NUMBER));
-//
-//            // Mapper
-//            job.setInputFormatClass(SequenceFileInputFormat.class);
-//            job.setMapperClass(KeyDistributionMapper.class);
-//            job.setMapOutputKeyClass(Text.class);
-//            job.setMapOutputValueClass(LongWritable.class);
-//
-//            // Combiner, not needed any more as mapper now does the groping
-//            //job.setCombinerClass(KeyDistributionCombiner.class);
-//
-//            // Reducer - only one
-//            job.setReducerClass(KeyDistributionReducer.class);
-//            // use sequence file as output
-//            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-//            // key is text
-//            job.setOutputKeyClass(Text.class);
-//            // value is long
-//            job.setOutputValueClass(LongWritable.class);
-//            job.setNumReduceTasks(1);
-//
-//            FileSystem fs = FileSystem.get(job.getConfiguration());
-//            if (fs.exists(output))
-//                fs.delete(output, true);
-//
-//            return waitForCompletion(job);
-//        } catch (Exception e) {
-//            printUsage(options);
-//            e.printStackTrace(System.err);
-//            return 2;
-//        }
-//    }
-//
-//    private int getKeyHeaderLength(String metadataUrl, String cubeName) {
-//        CubeManager cubeMgr = CubeManager.getInstanceFromEnv(metadataUrl);
-//        CubeInstance cubeInstance = cubeMgr.getCube(cubeName);
-//        CubeDesc cubeDesc = cubeInstance.getDescriptor();
-//        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-//        Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-//        RowKeyEncoder rowKeyEncoder =
-//                (RowKeyEncoder) AbstractRowKeyEncoder.createInstance(cubeInstance.getTheOnlySegment(),
-//                        baseCuboid);
-//
-//        return rowKeyEncoder.getHeaderLength();
-//
-//    }
-//
-//    public static void main(String[] args) throws Exception {
-//        int exitCode = ToolRunner.run(new KeyDistributionJob(), args);
-//        System.exit(exitCode);
-//    }
-// }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java
deleted file mode 100644
index f145bde..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionMapper.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.kylin.index.cube;
-//
-//import java.io.IOException;
-//
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Mapper;
-//
-///**
-// * @author xjiang
-// *
-// */
-//public class KeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-//
-//    private int headerLength;
-//
-//    private Text currentKey;
-//    private long outputLong;
-//    private Text outputKey;
-//    private LongWritable outputValue;
-//    private int columnPercentage;
-//    private int allRowCount;
-//
-//    @Override
-//    protected void setup(Context context) throws IOException {
-//super.publishConfiguration(context.getConfiguration());
-
-//        String percentStr = context.getConfiguration().get(KeyDistributionJob.KEY_COLUMN_PERCENTAGE);
-//        this.columnPercentage = Integer.valueOf(percentStr).intValue();
-//        if (this.columnPercentage <= 0 || this.columnPercentage >= 100) {
-//            this.columnPercentage = 20;
-//        }
-//        String headerLenStr = context.getConfiguration().get(KeyDistributionJob.KEY_HEADER_LENGTH);
-//        this.headerLength = Integer.valueOf(headerLenStr).intValue();
-//
-//        currentKey = new Text();
-//        outputLong = 0;
-//        outputKey = new Text();
-//        outputValue = new LongWritable(1);
-//        allRowCount = 0;
-//    }
-//
-//    @Override
-//    protected void cleanup(Context context) throws IOException, InterruptedException {
-//        emit(context); // emit the last holding record
-//
-//        byte[] zerokey = new byte[] { 0 };
-//        outputKey.set(zerokey);
-//        outputValue.set(allRowCount);
-//        context.write(outputKey, outputValue);
-//    }
-//
-//    @Override
-//    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-//        byte[] bytes = key.getBytes();
-//        int columnLength = bytes.length - this.headerLength;
-//        int columnPrefixLen = columnLength * this.columnPercentage / 100;
-//        if (columnPrefixLen == 0 && columnLength > 0) {
-//            columnPrefixLen = 1;
-//        }
-//        if (columnPrefixLen > 0) {
-//            currentKey.set(bytes, 0, this.headerLength + columnPrefixLen);
-//        } else {
-//            currentKey.set(bytes);
-//        }
-//
-//        allRowCount++;
-//
-//        if (outputKey.getLength() == 0) { // first record
-//            outputKey.set(currentKey);
-//            outputLong = 1;
-//        } else if (outputKey.equals(currentKey)) { // same key, note input is sorted
-//            outputLong++;
-//        } else { // the next key
-//            emit(context);
-//            outputKey.set(currentKey);
-//            outputLong = 1;
-//        }
-//    }
-//
-//    private void emit(Context context) throws IOException, InterruptedException {
-//        if (outputLong == 0)
-//            return;
-//
-//        outputValue.set(outputLong);
-//        context.write(outputKey, outputValue);
-//    }
-// }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java
deleted file mode 100644
index dd02910..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/KeyDistributionReducer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-///*
-// * Copyright 2013-2014 eBay Software Foundation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// *   http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.kylin.index.cube;
-//
-//import java.io.IOException;
-//
-//import org.apache.hadoop.io.LongWritable;
-//import org.apache.hadoop.io.Text;
-//import org.apache.hadoop.mapreduce.Reducer;
-//import org.apache.hadoop.util.StringUtils;
-//import org.slf4j.Logger;
-//import org.slf4j.LoggerFactory;
-//
-///**
-// * @author xjiang
-// *
-// */
-//public class KeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-//
-//    private static final Logger logger = LoggerFactory.getLogger(KeyDistributionReducer.class);
-//
-//    private LongWritable outputValue;
-//    private boolean isTotalCount;
-//    private long totalCount;
-//    private int splitNumber;
-//    private long splitQuota;
-//    private long splitRemain;
-//
-//    @Override
-//    protected void setup(Context context) throws IOException, InterruptedException {
-//        super.publishConfiguration(context.getConfiguration());
-
-//        String splitStr = context.getConfiguration().get(KeyDistributionJob.KEY_SPLIT_NUMBER);
-//        splitNumber = Integer.valueOf(splitStr).intValue();
-//        outputValue = new LongWritable();
-//        isTotalCount = true;
-//        totalCount = 0;
-//        splitQuota = 0;
-//        splitRemain = 0;
-//    }
-//
-//    @Override
-//    protected void cleanup(Context context) throws IOException, InterruptedException {
-//        logger.info("---------------");
-//        long splitCount = splitQuota - splitRemain;
-//        logger.info("Total Count = " + totalCount + ", Left Count = " + splitCount);
-//    }
-//
-//    @Override
-//    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,
-//            InterruptedException {
-//
-//        // calculate split quota
-//        if (isTotalCount) {
-//            for (LongWritable count : values) {
-//                totalCount += count.get();
-//            }
-//            splitQuota = totalCount / splitNumber;
-//            splitRemain = splitQuota;
-//            isTotalCount = false;
-//            return;
-//        }
-//
-//        // output key when split quota is used up 
-//        for (LongWritable count : values) {
-//            splitRemain -= count.get();
-//        }
-//        if (splitRemain <= 0) {
-//            long splitCount = splitQuota - splitRemain;
-//            String hexKey = StringUtils.byteToHexString(key.getBytes());
-//            logger.info(hexKey + "\t\t" + splitCount);
-//
-//            outputValue.set(splitCount);
-//            context.write(key, outputValue);
-//            splitRemain = splitQuota;
-//        }
-//
-//    }
-// }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
deleted file mode 100644
index 73faa6c..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidJob.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- */
-public class MergeCuboidJob extends CuboidJob {
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            System.out.println("Starting: " + jobName);
-            job = Job.getInstance(getConf(), jobName);
-
-            setJobClasspath(job);
-
-            // set inputs
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(MergeCuboidMapper.class);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(Text.class);
-
-            // Reducer - only one
-            job.setReducerClass(CuboidReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(Text.class);
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            setReduceTaskNum(job, config, cubeName, 0);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            logger.error("error in MergeCuboidJob", e);
-            printUsage(options);
-            throw e;
-        } finally {
-            if (job != null)
-                cleanupTempConfFile(job.getConfiguration());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
deleted file mode 100644
index 2528e07..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapper.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.measure.MeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- * @author ysong1, honma
- */
-public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
-
-    private KylinConfig config;
-    private String cubeName;
-    private String segmentName;
-    private CubeManager cubeManager;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment;// Must be unique during a mapper's
-    // life cycle
-
-
-    // for re-encode measures that use dictionary
-    private List<Pair<Integer, MeasureIngester>> dictMeasures;
-    private Map<TblColRef, Dictionary<String>> oldDicts;
-    private Map<TblColRef, Dictionary<String>> newDicts;
-    private List<MeasureDesc> measureDescs;
-    private MeasureCodec codec;
-    private Object[] measureObjs;
-    private ByteBuffer valueBuf;
-    private Text outputValue;
-    
-    private Text outputKey = new Text();
-
-    private byte[] newKeyBuf;
-    private RowKeySplitter rowKeySplitter;
-
-    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
-
-    private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
-
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dictsNeedMerging.get(col);
-        if (ret != null)
-            return ret;
-        else {
-            ret = cubeDesc.getRowkey().isUseDictionary(col);
-            if (ret) {
-                String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
-            }
-            dictsNeedMerging.put(col, ret);
-            return ret;
-        }
-    }
-
-    private String extractJobIDFromPath(String path) {
-        Matcher matcher = JOB_NAME_PATTERN.matcher(path);
-        // check the first occurance
-        if (matcher.find()) {
-            return matcher.group(1);
-        } else {
-            throw new IllegalStateException("Can not extract job ID from file path : " + path);
-        }
-    }
-
-    private CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
-        for (CubeSegment segment : cubeInstance.getSegments()) {
-            String lastBuildJobID = segment.getLastBuildJobID();
-            if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
-                return segment;
-            }
-        }
-
-        throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
-
-    }
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-        super.publishConfiguration(context.getConfiguration());
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
-        cubeManager = CubeManager.getInstance(config);
-        cube = cubeManager.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
-        // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        newKeyBuf = new byte[256];// size will auto-grow
-
-        // decide which source segment
-        InputSplit inputSplit = context.getInputSplit();
-        String filePath = ((FileSplit) inputSplit).getPath().toString();
-        System.out.println("filePath:" + filePath);
-        String jobID = extractJobIDFromPath(filePath);
-        System.out.println("jobID:" + jobID);
-        sourceCubeSegment = findSegmentWithUuid(jobID, cube);
-        System.out.println(sourceCubeSegment);
-
-        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
-
-
-        measureDescs = cubeDesc.getMeasures();
-        codec = new MeasureCodec(measureDescs);
-        measureObjs = new Object[measureDescs.size()];
-        valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        outputValue = new Text();
-
-        dictMeasures = Lists.newArrayList();
-        for (int i = 0; i < measureDescs.size(); i++) {
-            MeasureDesc measureDesc = measureDescs.get(i);
-            MeasureType measureType = measureDesc.getFunction().getMeasureType();
-            if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
-                dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
-            }
-        }
-        if (dictMeasures.size() > 0) {
-            oldDicts = sourceCubeSegment.buildDictionaryMap();
-            newDicts = mergedCubeSegment.buildDictionaryMap();
-        }
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidID = rowKeySplitter.split(key.getBytes(), key.getBytes().length);
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
-
-        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
-        int bufOffset = 0;
-        BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
-        bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
-        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
-            TblColRef col = cuboid.getColumns().get(i);
-
-            if (this.checkNeedMerging(col)) {
-                // if dictionary on fact table column, needs rewrite
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
-                while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
-                }
-
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
-                int idInMergedDict;
-
-                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
-                if (size < 0) {
-                    idInMergedDict = mergedDict.nullId();
-                } else {
-                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
-                }
-
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
-                bufOffset += mergedDict.getSizeOfId();
-            } else {
-                // keep as it is
-                while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
-                }
-
-                System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
-                bufOffset += splittedByteses[i + 1].length;
-            }
-        }
-        byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
-        outputKey.set(newKey, 0, newKey.length);
-
-
-        // re-encode measures if dictionary is used
-        if (dictMeasures.size() > 0) {
-            codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
-            for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
-                int i = pair.getFirst();
-                MeasureIngester ingester = pair.getSecond();
-                measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
-            }
-            valueBuf.clear();
-            codec.encode(measureObjs, valueBuf);
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-            value = outputValue;
-        }
-        
-        context.write(outputKey, value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
deleted file mode 100644
index e46093c..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutableOutputPO;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
-public class MetadataCleanupJob extends AbstractHadoopJob {
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused metadata").create("delete");
-
-    protected static final Logger log = LoggerFactory.getLogger(MetadataCleanupJob.class);
-
-    boolean delete = false;
-
-    private KylinConfig config = null;
-
-    public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
-    public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000l; // 30 days
-
-    /*
-     * (non-Javadoc)
-     *
-     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-     */
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        log.info("----- jobs args: " + Arrays.toString(args));
-        try {
-            options.addOption(OPTION_DELETE);
-            parseOptions(options, args);
-
-            log.info("options: '" + getOptionsAsString() + "'");
-            log.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
-            delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
-
-            config = KylinConfig.getInstanceFromEnv();
-
-            cleanup();
-
-            return 0;
-        } catch (Exception e) {
-            e.printStackTrace(System.err);
-            throw e;
-        }
-    }
-
-    private ResourceStore getStore() {
-        return ResourceStore.getStore(config);
-    }
-
-    private boolean isOlderThanThreshold(long resourceTime) {
-        long currentTime = System.currentTimeMillis();
-
-        if (currentTime - resourceTime > TIME_THREADSHOLD)
-            return true;
-        return false;
-    }
-
-    public void cleanup() throws Exception {
-        CubeManager cubeManager = CubeManager.getInstance(config);
-
-        Set<String> activeResourceList = Sets.newHashSet();
-        for (org.apache.kylin.cube.CubeInstance cube : cubeManager.listAllCubes()) {
-            for (org.apache.kylin.cube.CubeSegment segment : cube.getSegments()) {
-                activeResourceList.addAll(segment.getSnapshotPaths());
-                activeResourceList.addAll(segment.getDictionaryPaths());
-            }
-        }
-
-        List<String> toDeleteResource = Lists.newArrayList();
-
-        // two level resources, snapshot tables and cube statistics
-        for (String resourceRoot : new String[] { ResourceStore.SNAPSHOT_RESOURCE_ROOT }) {
-            ArrayList<String> snapshotTables = getStore().listResources(resourceRoot);
-
-            if (snapshotTables != null) {
-                for (String snapshotTable : snapshotTables) {
-                    ArrayList<String> snapshotNames = getStore().listResources(snapshotTable);
-                    if (snapshotNames != null)
-                        for (String snapshot : snapshotNames) {
-                            if (!activeResourceList.contains(snapshot)) {
-                                long ts = getStore().getResourceTimestamp(snapshot);
-                                if (isOlderThanThreshold(ts))
-                                    toDeleteResource.add(snapshot);
-                            }
-                        }
-                }
-            }
-        }
-
-        // three level resources, only dictionaries
-        ArrayList<String> dictTables = getStore().listResources(ResourceStore.DICT_RESOURCE_ROOT);
-
-        if (dictTables != null) {
-            for (String table : dictTables) {
-                ArrayList<String> tableColNames = getStore().listResources(table);
-                if (tableColNames != null)
-                    for (String tableCol : tableColNames) {
-                        ArrayList<String> dictionaries = getStore().listResources(tableCol);
-                        if (dictionaries != null)
-                            for (String dict : dictionaries)
-                                if (!activeResourceList.contains(dict)) {
-                                    long ts = getStore().getResourceTimestamp(dict);
-                                    if (isOlderThanThreshold(ts))
-                                        toDeleteResource.add(dict);
-                                }
-                    }
-            }
-        }
-
-        // delete old and completed jobs
-        ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv());
-        List<ExecutablePO> allExecutable = executableDao.getJobs();
-        for (ExecutablePO executable : allExecutable) {
-            long lastModified = executable.getLastModified();
-            ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid());
-            if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (output.getStatus().equals(JobStatusEnum.FINISHED.toString()) || output.getStatus().equals(JobStatusEnum.DISCARDED.toString()))) {
-                toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + executable.getUuid());
-                toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + executable.getUuid());
-
-                for (ExecutablePO task : executable.getTasks()) {
-                    toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + task.getUuid());
-                    toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + task.getUuid());
-                }
-            }
-        }
-
-        if (toDeleteResource.size() > 0) {
-            logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n");
-
-            for (String s : toDeleteResource) {
-                logger.info(s);
-                if (delete == true) {
-                    getStore().deleteResource(s);
-                }
-            }
-        } else {
-            logger.info("No resource to be cleaned up from metadata store;");
-        }
-
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new MetadataCleanupJob(), args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java
deleted file mode 100644
index feb4dc4..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidJob.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.hadoop.util.ToolRunner;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-
-public class NDCuboidJob extends CuboidJob {
-
-    public NDCuboidJob() {
-        this.setMapperClass(NDCuboidMapper.class);
-    }
-
-    public static void main(String[] args) throws Exception {
-        CuboidJob job = new NDCuboidJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
deleted file mode 100644
index 8725ed8..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NDCuboidMapper.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
-
-    private static final Logger logger = LoggerFactory.getLogger(NDCuboidMapper.class);
-
-    private Text outputKey = new Text();
-    private String cubeName;
-    private String segmentName;
-    private CubeDesc cubeDesc;
-    private CuboidScheduler cuboidScheduler;
-
-    private int handleCounter;
-    private int skipCounter;
-
-    private byte[] keyBuf = new byte[4096];
-    private RowKeySplitter rowKeySplitter;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration());
-
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        cubeDesc = cube.getDescriptor();
-
-        // initialize CubiodScheduler
-        cuboidScheduler = new CuboidScheduler(cubeDesc);
-
-        rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
-    }
-
-    private int buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
-        int offset = 0;
-
-        // cuboid id
-        System.arraycopy(childCuboid.getBytes(), 0, keyBuf, offset, childCuboid.getBytes().length);
-        offset += childCuboid.getBytes().length;
-
-        // rowkey columns
-        long mask = Long.highestOneBit(parentCuboid.getId());
-        long parentCuboidId = parentCuboid.getId();
-        long childCuboidId = childCuboid.getId();
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
-        int index = 1; // skip cuboidId
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & parentCuboidId) > 0) {// if the this bit position equals
-                                              // 1
-                if ((mask & childCuboidId) > 0) {// if the child cuboid has this
-                                                 // column
-                    System.arraycopy(splitBuffers[index].value, 0, keyBuf, offset, splitBuffers[index].length);
-                    offset += splitBuffers[index].length;
-                }
-                index++;
-            }
-            mask = mask >> 1;
-        }
-
-        return offset;
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        long cuboidId = rowKeySplitter.split(key.getBytes(), key.getLength());
-        Cuboid parentCuboid = Cuboid.findById(cubeDesc, cuboidId);
-
-        Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
-
-        // if still empty or null
-        if (myChildren == null || myChildren.size() == 0) {
-            context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);
-            skipCounter++;
-            if (skipCounter % BatchConstants.COUNTER_MAX == 0) {
-                logger.info("Skipped " + skipCounter + " records!");
-            }
-            return;
-        }
-
-        context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);
-
-        handleCounter++;
-        if (handleCounter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + handleCounter + " records!");
-        }
-
-        for (Long child : myChildren) {
-            Cuboid childCuboid = Cuboid.findById(cubeDesc, child);
-            int keyLength = buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
-            outputKey.set(keyBuf, 0, keyLength);
-            context.write(outputKey, value);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
deleted file mode 100644
index 9c50122..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionJob.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang, ysong1
- * 
- */
-
-public class RangeKeyDistributionJob extends AbstractHadoopJob {
-    protected static final Logger log = LoggerFactory.getLogger(RangeKeyDistributionJob.class);
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-     */
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_CUBE_NAME);
-
-            parseOptions(options, args);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            job = Job.getInstance(getConf(), jobName);
-
-            setJobClasspath(job);
-
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-            // job.getConfiguration().set("dfs.block.size", "67108864");
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(RangeKeyDistributionMapper.class);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(LongWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(RangeKeyDistributionReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(LongWritable.class);
-            job.setNumReduceTasks(1);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            DataModelDesc.RealizationCapacity cubeCapacity = cube.getDescriptor().getModel().getCapacity();
-            int regionSplitSize = KylinConfig.getInstanceFromEnv().getHBaseRegionCut(cubeCapacity.toString());
-            int maxRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMax();
-            int minRegionCount = KylinConfig.getInstanceFromEnv().getHBaseRegionCountMin();
-            job.getConfiguration().set(BatchConstants.REGION_SPLIT_SIZE, String.valueOf(regionSplitSize));
-            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MAX, String.valueOf(maxRegionCount));
-            job.getConfiguration().set(BatchConstants.REGION_NUMBER_MIN, String.valueOf(minRegionCount));
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new RangeKeyDistributionJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
deleted file mode 100644
index 33baf45..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionMapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.mr.KylinMapper;
-
-/**
- * @author ysong1
- * 
- */
-public class RangeKeyDistributionMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
-    private static final long ONE_MEGA_BYTES = 1L * 1024L * 1024L;
-
-    private LongWritable outputValue = new LongWritable(0);
-
-    private long bytesRead = 0;
-
-    private Text lastKey;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        lastKey = key;
-
-        int bytesLength = key.getLength() + value.getLength();
-        bytesRead += bytesLength;
-
-        if (bytesRead >= ONE_MEGA_BYTES) {
-            outputValue.set(bytesRead);
-            context.write(key, outputValue);
-
-            // reset bytesRead
-            bytesRead = 0;
-        }
-
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        if (lastKey != null) {
-            outputValue.set(bytesRead);
-            context.write(lastKey, outputValue);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
deleted file mode 100644
index b3ab4db..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RangeKeyDistributionReducer.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.cube.model.v1.CubeDesc.CubeCapacity;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- * 
- */
-public class RangeKeyDistributionReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
-    public static final long ONE_GIGA_BYTES = 1024L * 1024L * 1024L;
-
-    private static final Logger logger = LoggerFactory.getLogger(RangeKeyDistributionReducer.class);
-
-    private LongWritable outputValue = new LongWritable(0);
-
-    private int minRegionCount = 1;
-    private int maxRegionCount = 500;
-    private int cut = 10;
-    private long bytesRead = 0;
-    private List<Text> gbPoints = new ArrayList<Text>();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        if (context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE) != null) {
-            cut = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_SPLIT_SIZE));
-        }
-
-        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN) != null) {
-            minRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MIN));
-        }
-        
-        if (context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX) != null) {
-            maxRegionCount = Integer.valueOf(context.getConfiguration().get(BatchConstants.REGION_NUMBER_MAX));
-        }
-
-        logger.info("Chosen cut for htable is " + cut + ", max region count=" + maxRegionCount + ", min region count =" + minRegionCount);
-    }
-
-    @Override
-    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-        for (LongWritable v : values) {
-            bytesRead += v.get();
-        }
-
-        if (bytesRead >= ONE_GIGA_BYTES) {
-            gbPoints.add(new Text(key));
-            bytesRead = 0; // reset bytesRead
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        int nRegion = Math.round((float) gbPoints.size() / (float) cut);
-        nRegion = Math.max(minRegionCount, nRegion);
-        nRegion = Math.min(maxRegionCount, nRegion);
-
-        int gbPerRegion = gbPoints.size() / nRegion;
-        gbPerRegion = Math.max(1, gbPerRegion);
-
-        System.out.println(nRegion + " regions");
-        System.out.println(gbPerRegion + " GB per region");
-
-        for (int i = gbPerRegion; i < gbPoints.size(); i += gbPerRegion) {
-            Text key = gbPoints.get(i);
-            outputValue.set(i);
-            System.out.println(StringUtils.byteToHexString(key.getBytes()) + "\t" + outputValue.get());
-            context.write(key, outputValue);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
deleted file mode 100644
index 9372f18..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerJob.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author ysong1
- * 
- */
-public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
-
-    @SuppressWarnings("static-access")
-    protected static final Option rowKeyStatsFilePath = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(rowKeyStatsFilePath);
-
-            parseOptions(options, args);
-
-            String statsFilePath = getOptionValue(rowKeyStatsFilePath);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            job = Job.getInstance(getConf(), jobName);
-
-            setJobClasspath(job);
-
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(RowKeyDistributionCheckerMapper.class);
-            job.setMapOutputKeyClass(Text.class);
-            job.setMapOutputValueClass(LongWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(RowKeyDistributionCheckerReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(Text.class);
-            job.setOutputValueClass(LongWritable.class);
-            job.setNumReduceTasks(1);
-
-            job.getConfiguration().set("rowKeyStatsFilePath", statsFilePath);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new RowKeyDistributionCheckerJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
deleted file mode 100644
index 50b0499..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerMapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.mr.KylinMapper;
-
-/**
- * @author ysong1
- * 
- */
-public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
-    String rowKeyStatsFilePath;
-    byte[][] splitKeys;
-    Map<Text, Long> resultMap;
-    List<Text> keyList;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        rowKeyStatsFilePath = context.getConfiguration().get("rowKeyStatsFilePath");
-        splitKeys = this.getSplits(context.getConfiguration(), new Path(rowKeyStatsFilePath));
-
-        resultMap = new HashMap<Text, Long>();
-        keyList = new ArrayList<Text>();
-        for (int i = 0; i < splitKeys.length; i++) {
-            Text key = new Text(splitKeys[i]);
-            resultMap.put(key, 0L);
-            keyList.add(new Text(splitKeys[i]));
-        }
-    }
-
-    @Override
-    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
-        for (Text t : keyList) {
-            if (key.compareTo(t) < 0) {
-                Long v = resultMap.get(t);
-                long length = key.getLength() + value.getLength();
-                v += length;
-                resultMap.put(t, v);
-                break;
-            }
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        LongWritable outputValue = new LongWritable();
-        for (Entry<Text, Long> kv : resultMap.entrySet()) {
-            outputValue.set(kv.getValue());
-            context.write(kv.getKey(), outputValue);
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    public byte[][] getSplits(Configuration conf, Path path) {
-        List<byte[]> rowkeyList = new ArrayList<byte[]>();
-        SequenceFile.Reader reader = null;
-        try {
-            reader = new SequenceFile.Reader(path.getFileSystem(conf), path, conf);
-            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-            while (reader.next(key, value)) {
-                byte[] tmp = ((Text) key).copyBytes();
-                if (rowkeyList.contains(tmp) == false) {
-                    rowkeyList.add(tmp);
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            IOUtils.closeStream(reader);
-        }
-
-        byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
-
-        return retValue;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
deleted file mode 100644
index 83e503e..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/RowKeyDistributionCheckerReducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.mr.KylinReducer;
-
-/**
- * @author ysong1
- * 
- */
-public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
-    LongWritable outputKey = new LongWritable(0L);
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-    }
-
-    @Override
-    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-
-        long length = 0;
-        for (LongWritable v : values) {
-            length += v.get();
-        }
-
-        outputKey.set(length);
-        context.write(key, outputKey);
-    }
-}