You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/11 13:49:46 UTC

[07/51] [partial] kylin git commit: KYLIN-1416 keep only website in document branch

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
deleted file mode 100644
index b1a17e7..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexJob extends AbstractHadoopJob {
-    protected static final Logger log = LoggerFactory.getLogger(InvertedIndexJob.class);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_II_NAME);
-            options.addOption(OPTION_TABLE_NAME);
-            options.addOption(OPTION_OUTPUT_PATH);
-            parseOptions(options, args);
-
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            String iiname = getOptionValue(OPTION_II_NAME);
-            String intermediateTable = getOptionValue(OPTION_TABLE_NAME);
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            // ----------------------------------------------------------------------------
-
-            System.out.println("Starting: " + job.getJobName());
-
-            IIInstance ii = getII(iiname);
-            short sharding = ii.getDescriptor().getSharding();
-
-            setJobClasspath(job);
-
-            setupMapper(intermediateTable);
-            setupReducer(output, sharding);
-            attachMetadata(ii);
-
-            return waitForCompletion(job);
-
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        } finally {
-            if (job != null)
-                cleanupTempConfFile(job.getConfiguration());
-        }
-    }
-
-    private IIInstance getII(String iiName) {
-        IIManager mgr = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-        IIInstance ii = mgr.getII(iiName);
-        if (ii == null)
-            throw new IllegalArgumentException("No Inverted Index found by name " + iiName);
-        return ii;
-    }
-
-    private void attachMetadata(IIInstance ii) throws IOException {
-
-        Configuration conf = job.getConfiguration();
-        attachKylinPropsAndMetadata(ii, conf);
-
-        IISegment seg = ii.getFirstSegment();
-        conf.set(BatchConstants.CFG_II_NAME, ii.getName());
-        conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
-    }
-
-    private void setupMapper(String intermediateTable) throws IOException {
-
-        String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);
-        HCatInputFormat.setInput(job, dbTableNames[0], dbTableNames[1]);
-
-        job.setInputFormatClass(HCatInputFormat.class);
-
-        job.setMapperClass(InvertedIndexMapper.class);
-        job.setMapOutputKeyClass(LongWritable.class);
-        job.setMapOutputValueClass(ImmutableBytesWritable.class);
-        job.setPartitionerClass(InvertedIndexPartitioner.class);
-    }
-
-    private void setupReducer(Path output, short sharding) throws IOException {
-        job.setReducerClass(InvertedIndexReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(ImmutableBytesWritable.class);
-
-        job.setNumReduceTasks(sharding);
-
-        FileOutputFormat.setOutputPath(job, output);
-
-        job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString());
-
-        deletePath(job.getConfiguration(), output);
-    }
-
-    public static void main(String[] args) throws Exception {
-        InvertedIndexJob job = new InvertedIndexJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
deleted file mode 100644
index b3baafe..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hive.hcatalog.data.HCatRecord;
-import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hive.hcatalog.data.schema.HCatSchema;
-import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, LongWritable, ImmutableBytesWritable> {
-
-    private TableRecordInfo info;
-    private TableRecord rec;
-
-    private LongWritable outputKey;
-    private ImmutableBytesWritable outputValue;
-    private HCatSchema schema = null;
-    private List<HCatFieldSchema> fields;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
-        IIManager mgr = IIManager.getInstance(config);
-        IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
-        this.info = new TableRecordInfo(seg);
-        this.rec = this.info.createTableRecord();
-
-        outputKey = new LongWritable();
-        outputValue = new ImmutableBytesWritable(rec.getBytes());
-
-        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
-
-        fields = schema.getFields();
-    }
-
-    @Override
-    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-
-        rec.reset();
-        for (int i = 0; i < fields.size(); i++) {
-            Object fieldValue = record.get(i);
-            rec.setValueString(i, fieldValue == null ? null : fieldValue.toString());
-        }
-
-        outputKey.set(rec.getTimestamp());
-        // outputValue's backing bytes array is the same as rec
-
-        context.write(outputKey, outputValue);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
deleted file mode 100644
index 113d4ed..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexPartitioner extends Partitioner<LongWritable, ImmutableBytesWritable> implements Configurable {
-
-    private Configuration conf;
-    private TableRecordInfo info;
-    private TableRecord rec;
-
-    @Override
-    public int getPartition(LongWritable key, ImmutableBytesWritable value, int numPartitions) {
-        rec.setBytes(value.get(), value.getOffset(), value.getLength());
-        return rec.getShard();
-    }
-
-    @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-        try {
-            KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
-            IIManager mgr = IIManager.getInstance(config);
-            IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-            IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
-            this.info = new TableRecordInfo(seg);
-            this.rec = this.info.createTableRecord();
-        } catch (IOException e) {
-            throw new RuntimeException("", e);
-        }
-    }
-
-    @Override
-    public Configuration getConf() {
-        return conf;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
deleted file mode 100644
index 37b02b8..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.invertedindex.index.Slice;
-import org.apache.kylin.invertedindex.index.SliceBuilder;
-import org.apache.kylin.invertedindex.index.TableRecord;
-import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class InvertedIndexReducer extends KylinReducer<LongWritable, ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
-
-    private TableRecordInfo info;
-    private TableRecord rec;
-    private SliceBuilder builder;
-    private IIKeyValueCodec kv;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
-        IIManager mgr = IIManager.getInstance(config);
-        IIInstance ii = mgr.getII(conf.get(BatchConstants.CFG_II_NAME));
-        IISegment seg = ii.getSegment(conf.get(BatchConstants.CFG_II_SEGMENT_NAME), SegmentStatusEnum.NEW);
-        info = new TableRecordInfo(seg);
-        rec = info.createTableRecord();
-        builder = null;
-        kv = new IIKeyValueCodec(info.getDigest());
-    }
-
-    @Override
-    public void reduce(LongWritable key, Iterable<ImmutableBytesWritable> values, Context context) //
-            throws IOException, InterruptedException {
-        for (ImmutableBytesWritable v : values) {
-            rec.setBytes(v.get(), v.getOffset(), v.getLength());
-
-            if (builder == null) {
-                builder = new SliceBuilder(info, rec.getShard());
-            }
-
-            //TODO: to delete this log
-            System.out.println(rec.getShard() + " - " + rec);
-
-            Slice slice = builder.append(rec);
-            if (slice != null) {
-                output(slice, context);
-            }
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        Slice slice = builder.close();
-        if (slice != null) {
-            output(slice, context);
-        }
-    }
-
-    private void output(Slice slice, Context context) throws IOException, InterruptedException {
-        for (Pair<ImmutableBytesWritable, ImmutableBytesWritable> pair : kv.encodeKeyValue(slice)) {
-            context.write(pair.getFirst(), pair.getSecond());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
deleted file mode 100644
index 8ac5650..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author ysong1
- * 
- */
-@SuppressWarnings("static-access")
-public class RandomKeyDistributionJob extends AbstractHadoopJob {
-
-    protected static final Logger log = LoggerFactory.getLogger(RandomKeyDistributionJob.class);
-
-    static final Option OPTION_KEY_CLASS = OptionBuilder.withArgName("keyclass").hasArg().isRequired(true).withDescription("Key Class").create("keyclass");
-    static final Option OPTION_REGION_MB = OptionBuilder.withArgName("regionmb").hasArg().isRequired(true).withDescription("MB per Region").create("regionmb");
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_KEY_CLASS);
-            options.addOption(OPTION_REGION_MB);
-
-            parseOptions(options, args);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            job = Job.getInstance(getConf(), jobName);
-
-            setJobClasspath(job);
-
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-
-            String keyClass = getOptionValue(OPTION_KEY_CLASS);
-            Class<?> keyClz = Class.forName(keyClass);
-
-            int regionMB = Integer.parseInt(getOptionValue(OPTION_REGION_MB));
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(RandomKeyDistributionMapper.class);
-            job.setMapOutputKeyClass(keyClz);
-            job.setMapOutputValueClass(NullWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(RandomKeyDistributionReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(keyClz);
-            job.setOutputValueClass(NullWritable.class);
-            job.setNumReduceTasks(1);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            // total map input MB
-            double totalMapInputMB = this.getTotalMapInputMB();
-            int regionCount = Math.max(1, (int) (totalMapInputMB / regionMB));
-            int mapSampleNumber = 1000;
-            System.out.println("Total Map Input MB: " + totalMapInputMB);
-            System.out.println("Region Count: " + regionCount);
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.MAPPER_SAMPLE_NUMBER, String.valueOf(mapSampleNumber));
-            job.getConfiguration().set(BatchConstants.REGION_NUMBER, String.valueOf(regionCount));
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new RandomKeyDistributionJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
deleted file mode 100644
index 3914830..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.common.util.RandomSampler;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- * 
- */
-public class RandomKeyDistributionMapper<KEY extends Writable, VALUE> extends KylinMapper<KEY, VALUE, KEY, NullWritable> {
-
-    private Configuration conf;
-    private int sampleNumber;
-    private List<KEY> allKeys;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        conf = context.getConfiguration();
-        allKeys = new ArrayList<KEY>();
-        sampleNumber = Integer.parseInt(conf.get(BatchConstants.MAPPER_SAMPLE_NUMBER));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException {
-        KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
-        ReflectionUtils.copy(conf, key, keyCopy);
-        allKeys.add(keyCopy);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        RandomSampler<KEY> sampler = new RandomSampler<KEY>();
-        List<KEY> sampleResult = sampler.sample(allKeys, sampleNumber);
-        for (KEY k : sampleResult) {
-            context.write(k, NullWritable.get());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
deleted file mode 100644
index b96d18b..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.mr.KylinReducer;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- * 
- */
-public class RandomKeyDistributionReducer<KEY extends Writable> extends KylinReducer<KEY, NullWritable, KEY, NullWritable> {
-
-    private Configuration conf;
-    private int regionNumber;
-    private List<KEY> allSplits;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.publishConfiguration(context.getConfiguration());
-
-        conf = context.getConfiguration();
-        allSplits = new ArrayList<KEY>();
-        regionNumber = Integer.parseInt(context.getConfiguration().get(BatchConstants.REGION_NUMBER));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void reduce(KEY key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
-        KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
-        ReflectionUtils.copy(conf, key, keyCopy);
-        allSplits.add(keyCopy);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        int stepLength = allSplits.size() / regionNumber;
-        for (int i = stepLength; i < allSplits.size(); i += stepLength) {
-            context.write(allSplits.get(i), NullWritable.get());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java
deleted file mode 100644
index 1cc20df..0000000
--- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.impl.threadpool;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableContext;
-
-/**
- * Created by qianzhou on 12/16/14.
- */
-public class DefaultContext implements ExecutableContext {
-
-    private final ConcurrentMap<String, Executable> runningJobs;
-    private final KylinConfig kylinConfig;
-
-    public DefaultContext(ConcurrentMap<String, Executable> runningJobs, KylinConfig kylinConfig) {
-        this.runningJobs = runningJobs;
-        this.kylinConfig = kylinConfig;
-    }
-
-    @Override
-    public Object getSchedulerContext() {
-        return null;
-    }
-
-    @Override
-    public KylinConfig getConfig() {
-        return kylinConfig;
-    }
-
-    void addRunningJob(Executable executable) {
-        runningJobs.put(executable.getId(), executable);
-    }
-
-    void removeRunningJob(Executable executable) {
-        runningJobs.remove(executable.getId());
-    }
-
-    public Map<String, Executable> getRunningJobs() {
-        return Collections.unmodifiableMap(runningJobs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
deleted file mode 100644
index 46592f7..0000000
--- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.impl.threadpool;
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.kylin.common.lock.JobLock;
-import org.apache.kylin.job.Scheduler;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-/**
- * Created by qianzhou on 12/15/14.
- */
-public class DefaultScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
-
-    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-
-    private ExecutableManager executableManager;
-    private FetcherRunner fetcher;
-    private ScheduledExecutorService fetcherPool;
-    private ExecutorService jobPool;
-    private DefaultContext context;
-
-    private Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
-    private volatile boolean initialized = false;
-    private volatile boolean hasStarted = false;
-    private CuratorFramework zkClient;
-    private JobEngineConfig jobEngineConfig;
-    private InterProcessMutex sharedLock;
-
-    private static final DefaultScheduler INSTANCE = new DefaultScheduler();
-
-    private DefaultScheduler() {
-    }
-
-    private class FetcherRunner implements Runnable {
-
-        @Override
-        synchronized public void run() {
-            try {
-                // logger.debug("Job Fetcher is running...");
-                Map<String, Executable> runningJobs = context.getRunningJobs();
-                if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
-                    logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
-                    return;
-                }
-
-                int nRunning = 0, nReady = 0, nOthers = 0;
-                for (final String id : executableManager.getAllJobIds()) {
-                    if (runningJobs.containsKey(id)) {
-                        // logger.debug("Job id:" + id + " is already running");
-                        nRunning++;
-                        continue;
-                    }
-                    final Output output = executableManager.getOutput(id);
-                    if ((output.getState() != ExecutableState.READY)) {
-                        // logger.debug("Job id:" + id + " not runnable");
-                        nOthers++;
-                        continue;
-                    }
-                    nReady++;
-                    AbstractExecutable executable = executableManager.getJob(id);
-                    String jobDesc = executable.toString();
-                    logger.info(jobDesc + " prepare to schedule");
-                    try {
-                        context.addRunningJob(executable);
-                        jobPool.execute(new JobRunner(executable));
-                        logger.info(jobDesc + " scheduled");
-                    } catch (Exception ex) {
-                        context.removeRunningJob(executable);
-                        logger.warn(jobDesc + " fail to schedule", ex);
-                    }
-                }
-                logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nOthers + " others");
-            } catch (Exception e) {
-                logger.warn("Job Fetcher caught a exception " + e);
-            }
-        }
-    }
-
-    private class JobRunner implements Runnable {
-
-        private final AbstractExecutable executable;
-
-        public JobRunner(AbstractExecutable executable) {
-            this.executable = executable;
-        }
-
-        @Override
-        public void run() {
-            try {
-                executable.execute(context);
-                // trigger the next step asap
-                fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
-            } catch (ExecuteException e) {
-                logger.error("ExecuteException job:" + executable.getId(), e);
-            } catch (Exception e) {
-                logger.error("unknown error execute job:" + executable.getId(), e);
-            } finally {
-                context.removeRunningJob(executable);
-            }
-        }
-    }
-
-    public static DefaultScheduler getInstance() {
-        return INSTANCE;
-    }
-
-    @Override
-    public void stateChanged(CuratorFramework client, ConnectionState newState) {
-        if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
-            try {
-                shutdown();
-            } catch (SchedulerException e) {
-                throw new RuntimeException("failed to shutdown scheduler", e);
-            }
-        }
-    }
-
-    @Override
-    public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
-        if (!initialized) {
-            initialized = true;
-        } else {
-            return;
-        }
-
-        this.jobEngineConfig = jobEngineConfig;
-        jobLock.lock();
-
-        executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
-        //load all executable, set them to a consistent status
-        fetcherPool = Executors.newScheduledThreadPool(1);
-        int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
-        jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
-        context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
-
-        for (AbstractExecutable executable : executableManager.getAllExecutables()) {
-            if (executable.getStatus() == ExecutableState.READY) {
-                executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status");
-            }
-        }
-        executableManager.updateAllRunningJobsToError();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                logger.debug("Closing zk connection");
-                try {
-                    shutdown();
-                    jobLock.unlock();
-                } catch (SchedulerException e) {
-                    logger.error("error shutdown scheduler", e);
-                }
-            }
-        });
-
-        fetcher = new FetcherRunner();
-        fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
-        hasStarted = true;
-    }
-
-    @Override
-    public void shutdown() throws SchedulerException {
-        fetcherPool.shutdown();
-        jobPool.shutdown();
-    }
-
-    @Override
-    public boolean stop(AbstractExecutable executable) throws SchedulerException {
-        if (hasStarted) {
-            return true;
-        } else {
-            //TODO should try to stop this executable
-            return true;
-        }
-    }
-
-    public boolean hasStarted() {
-        return this.hasStarted;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
deleted file mode 100644
index 1aa72f8..0000000
--- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJob.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.invertedindex;
-
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-/**
- * Created by shaoshi on 1/15/15.
- */
-public class IIJob extends DefaultChainedExecutable {
-
-    public IIJob() {
-        super();
-    }
-
-    private static final String II_INSTANCE_NAME = "iiName";
-    private static final String SEGMENT_ID = "segmentId";
-
-    void setIIName(String name) {
-        setParam(II_INSTANCE_NAME, name);
-    }
-
-    public String getIIName() {
-        return getParam(II_INSTANCE_NAME);
-    }
-
-    void setSegmentId(String segmentId) {
-        setParam(SEGMENT_ID, segmentId);
-    }
-
-    public String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
deleted file mode 100644
index 68ad36b..0000000
--- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.invertedindex;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.AbstractJobBuilder;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.hadoop.dict.CreateInvertedIndexDictionaryJob;
-import org.apache.kylin.job.hadoop.hive.IIJoinedFlatTableDesc;
-import org.apache.kylin.job.hadoop.invertedindex.IIBulkLoadJob;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHFileJob;
-import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
-import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Created by shaoshi on 1/15/15.
- */
-public final class IIJobBuilder extends AbstractJobBuilder {
-
-    public IIJobBuilder(JobEngineConfig engineConfig) {
-        super(engineConfig);
-    }
-
-    public IIJob buildJob(IISegment seg) {
-        checkPreconditions(seg);
-
-        IIJob result = initialJob(seg, "BUILD");
-        final String jobId = result.getId();
-        final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
-        final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId);
-        final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
-        final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
-        final String iiPath = iiRootPath + "*";
-
-        final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId);
-        result.addTask(intermediateHiveTableStep);
-
-        result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId, factDistinctColumnsPath));
-
-        result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-
-        result.addTask(createInvertedIndexStep(seg, intermediateHiveTableName, iiRootPath));
-
-        // create htable step
-        result.addTask(createCreateHTableStep(seg));
-
-        // generate hfiles step
-        result.addTask(createConvertToHfileStep(seg, iiPath, jobId));
-
-        // bulk load step
-        result.addTask(createBulkLoadStep(seg, jobId));
-
-        return result;
-    }
-
-    private IIJob initialJob(IISegment seg, String type) {
-        IIJob result = new IIJob();
-        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
-        format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
-        result.setIIName(seg.getIIInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
-        result.setSubmitter(this.submitter);
-        return result;
-    }
-
-    private void checkPreconditions(IISegment seg) {
-        Preconditions.checkNotNull(seg, "segment cannot be null");
-        Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
-    }
-
-    private void appendMapReduceParameters(StringBuilder builder, JobEngineConfig engineConfig) {
-        try {
-            String jobConf = engineConfig.getHadoopJobConfFilePath(RealizationCapacity.MEDIUM);
-            if (jobConf != null && jobConf.length() > 0) {
-                builder.append(" -conf ").append(jobConf);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private String getIIDistinctColumnsPath(IISegment seg, String jobUuid) {
-        return getJobWorkingDir(jobUuid) + "/" + seg.getIIInstance().getName() + "/ii_distinct_columns";
-    }
-
-    private String getHFilePath(IISegment seg, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/hfile/";
-    }
-
-    private MapReduceExecutable createFactDistinctColumnsStep(IISegment seg, String factTableName, String jobId, String output) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(IIDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, engineConfig);
-        appendExecCmdParameters(cmd, "tablename", factTableName);
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "output", output);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getIIInstance().getName() + "_Step");
-
-        result.setMapReduceParams(cmd.toString());
-        return result;
-    }
-
-    private HadoopShellExecutable createBuildDictionaryStep(IISegment seg, String factDistinctColumnsPath) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateInvertedIndexDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
-    private MapReduceExecutable createInvertedIndexStep(IISegment seg, String intermediateHiveTable, String iiOutputTempPath) {
-        // base cuboid job
-        MapReduceExecutable buildIIStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, engineConfig);
-
-        buildIIStep.setName(ExecutableConstants.STEP_NAME_BUILD_II);
-
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "tablename", intermediateHiveTable);
-        appendExecCmdParameters(cmd, "output", iiOutputTempPath);
-        appendExecCmdParameters(cmd, "jobname", ExecutableConstants.STEP_NAME_BUILD_II);
-
-        buildIIStep.setMapReduceParams(cmd.toString());
-        buildIIStep.setMapReduceJobClass(InvertedIndexJob.class);
-        return buildIIStep;
-    }
-
-    private HadoopShellExecutable createCreateHTableStep(IISegment seg) {
-        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
-        createHtableStep.setJobParams(cmd.toString());
-        createHtableStep.setJobClass(IICreateHTableJob.class);
-
-        return createHtableStep;
-    }
-
-    private MapReduceExecutable createConvertToHfileStep(IISegment seg, String inputPath, String jobId) {
-        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_II_TO_HFILE);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, engineConfig);
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getIIInstance().getName() + "_Step");
-
-        createHFilesStep.setMapReduceParams(cmd.toString());
-        createHFilesStep.setMapReduceJobClass(IICreateHFileJob.class);
-
-        return createHFilesStep;
-    }
-
-    private HadoopShellExecutable createBulkLoadStep(IISegment seg, String jobId) {
-        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "iiname", seg.getIIInstance().getName());
-
-        bulkLoadStep.setJobParams(cmd.toString());
-        bulkLoadStep.setJobClass(IIBulkLoadJob.class);
-
-        return bulkLoadStep;
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
deleted file mode 100644
index 3c79f8c..0000000
--- a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.manager;
-
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutableOutputPO;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.exception.IllegalStateTranferException;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ChainedExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.DefaultOutput;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.Output;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- */
-public class ExecutableManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
-    private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
-    @SuppressWarnings("unused")
-    private final KylinConfig config;
-
-    private ExecutableDao executableDao;
-
-    public static ExecutableManager getInstance(KylinConfig config) {
-        ExecutableManager r = CACHE.get(config);
-        if (r == null) {
-            r = new ExecutableManager(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one singleton exist");
-            }
-
-        }
-        return r;
-    }
-
-    private ExecutableManager(KylinConfig config) {
-        logger.info("Using metadata url: " + config);
-        this.config = config;
-        this.executableDao = ExecutableDao.getInstance(config);
-    }
-
-    public void addJob(AbstractExecutable executable) {
-        try {
-            executableDao.addJob(parse(executable));
-            addJobOutput(executable);
-        } catch (PersistentException e) {
-            logger.error("fail to submit job:" + executable.getId(), e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void addJobOutput(AbstractExecutable executable) throws PersistentException {
-        ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
-        executableOutputPO.setUuid(executable.getId());
-        executableDao.addJobOutput(executableOutputPO);
-        if (executable instanceof DefaultChainedExecutable) {
-            for (AbstractExecutable subTask : ((DefaultChainedExecutable) executable).getTasks()) {
-                addJobOutput(subTask);
-            }
-        }
-    }
-
-    //for ut
-    public void deleteJob(String jobId) {
-        try {
-            executableDao.deleteJob(jobId);
-        } catch (PersistentException e) {
-            logger.error("fail to delete job:" + jobId, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public AbstractExecutable getJob(String uuid) {
-        try {
-            return parseTo(executableDao.getJob(uuid));
-        } catch (PersistentException e) {
-            logger.error("fail to get job:" + uuid, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Output getOutput(String uuid) {
-        try {
-            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid);
-            Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid);
-            return parseOutput(jobOutput);
-        } catch (PersistentException e) {
-            logger.error("fail to get job output:" + uuid, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
-        final DefaultOutput result = new DefaultOutput();
-        result.setExtra(jobOutput.getInfo());
-        result.setState(ExecutableState.valueOf(jobOutput.getStatus()));
-        result.setVerboseMsg(jobOutput.getContent());
-        result.setLastModified(jobOutput.getLastModified());
-        return result;
-    }
-
-    public Map<String, Output> getAllOutputs() {
-        try {
-            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
-            HashMap<String, Output> result = Maps.newHashMap();
-            for (ExecutableOutputPO jobOutput : jobOutputs) {
-                result.put(jobOutput.getId(), parseOutput(jobOutput));
-            }
-            return result;
-        } catch (PersistentException e) {
-            logger.error("fail to get all job output:", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<AbstractExecutable> getAllExecutables() {
-        try {
-            List<AbstractExecutable> ret = Lists.newArrayList();
-            for (ExecutablePO po : executableDao.getJobs()) {
-                try {
-                    AbstractExecutable ae = parseTo(po);
-                    ret.add(ae);
-                } catch (IllegalArgumentException e) {
-                    logger.error("error parsing one executabePO: ", e);
-                }
-            }
-            return ret;
-        } catch (PersistentException e) {
-            logger.error("error get All Jobs", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<String> getAllJobIds() {
-        try {
-            return executableDao.getJobIds();
-        } catch (PersistentException e) {
-            logger.error("error get All Job Ids", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void updateAllRunningJobsToError() {
-        try {
-            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
-            for (ExecutableOutputPO executableOutputPO : jobOutputs) {
-                if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
-                    executableOutputPO.setStatus(ExecutableState.ERROR.toString());
-                    executableDao.updateJobOutput(executableOutputPO);
-                }
-            }
-        } catch (PersistentException e) {
-            logger.error("error reset job status from RUNNING to ERROR", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void resumeJob(String jobId) {
-        AbstractExecutable job = getJob(jobId);
-        if (job == null) {
-            return;
-        }
-        updateJobOutput(jobId, ExecutableState.READY, null, null);
-        if (job instanceof DefaultChainedExecutable) {
-            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
-            for (AbstractExecutable task : tasks) {
-                if (task.getStatus() == ExecutableState.ERROR) {
-                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
-                    break;
-                }
-            }
-        }
-    }
-
-    public void discardJob(String jobId) {
-        AbstractExecutable job = getJob(jobId);
-        if (job instanceof DefaultChainedExecutable) {
-            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
-            for (AbstractExecutable task : tasks) {
-                if (!task.getStatus().isFinalState()) {
-                    updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null);
-                }
-            }
-        }
-        updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
-    }
-
-    public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) {
-        try {
-            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
-            Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId);
-            ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
-            if (newStatus != null && oldStatus != newStatus) {
-                if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
-                    throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus);
-                }
-                jobOutput.setStatus(newStatus.toString());
-            }
-            if (info != null) {
-                jobOutput.setInfo(info);
-            }
-            if (output != null) {
-                jobOutput.setContent(output);
-            }
-            executableDao.updateJobOutput(jobOutput);
-            logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
-        } catch (PersistentException e) {
-            logger.error("error change job:" + jobId + " to " + newStatus.toString());
-            throw new RuntimeException(e);
-        }
-    }
-
-    //for migration only
-    //TODO delete when migration finished
-    public void resetJobOutput(String jobId, ExecutableState state, String output) {
-        try {
-            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
-            jobOutput.setStatus(state.toString());
-            if (output != null) {
-                jobOutput.setContent(output);
-            }
-            executableDao.updateJobOutput(jobOutput);
-        } catch (PersistentException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void addJobInfo(String id, Map<String, String> info) {
-        if (info == null) {
-            return;
-        }
-        try {
-            ExecutableOutputPO output = executableDao.getJobOutput(id);
-            Preconditions.checkArgument(output != null, "there is no related output for job id:" + id);
-            output.getInfo().putAll(info);
-            executableDao.updateJobOutput(output);
-        } catch (PersistentException e) {
-            logger.error("error update job info, id:" + id + "  info:" + info.toString());
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void addJobInfo(String id, String key, String value) {
-        Map<String, String> info = Maps.newHashMap();
-        info.put(key, value);
-        addJobInfo(id, info);
-    }
-
-    private static ExecutablePO parse(AbstractExecutable executable) {
-        ExecutablePO result = new ExecutablePO();
-        result.setName(executable.getName());
-        result.setUuid(executable.getId());
-        result.setType(executable.getClass().getName());
-        result.setParams(executable.getParams());
-        if (executable instanceof ChainedExecutable) {
-            List<ExecutablePO> tasks = Lists.newArrayList();
-            for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
-                tasks.add(parse(task));
-            }
-            result.setTasks(tasks);
-        }
-        return result;
-    }
-
-    private static AbstractExecutable parseTo(ExecutablePO executablePO) {
-        if (executablePO == null) {
-            return null;
-        }
-        String type = executablePO.getType();
-        try {
-            Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class);
-            Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor();
-            AbstractExecutable result = constructor.newInstance();
-            result.setId(executablePO.getUuid());
-            result.setName(executablePO.getName());
-            result.setParams(executablePO.getParams());
-            List<ExecutablePO> tasks = executablePO.getTasks();
-            if (tasks != null && !tasks.isEmpty()) {
-                Preconditions.checkArgument(result instanceof ChainedExecutable);
-                for (ExecutablePO subTask : tasks) {
-                    ((ChainedExecutable) result).addTask(parseTo(subTask));
-                }
-            }
-            return result;
-        } catch (ReflectiveOperationException e) {
-            throw new IllegalArgumentException("cannot parse this job:" + executablePO.getId(), e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
deleted file mode 100644
index b6e5af5..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.tools;
-
-import java.io.IOException;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by honma on 11/11/14.
- */
-public class CleanHtableCLI extends AbstractHadoopJob {
-
-    protected static final Logger log = LoggerFactory.getLogger(CleanHtableCLI.class);
-
-    String tableName;
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-        try {
-
-            clean();
-
-            return 0;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    private void clean() throws IOException {
-        Configuration conf = HBaseConfiguration.create();
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
-
-        for (HTableDescriptor descriptor : hbaseAdmin.listTables()) {
-            String name = descriptor.getNameAsString().toLowerCase();
-            if (name.startsWith("kylin") || name.startsWith("_kylin")) {
-                String x = descriptor.getValue("KYLIN_HOST");
-                System.out.println("table name " + descriptor.getNameAsString() + " host: " + x);
-                System.out.println(descriptor);
-                System.out.println();
-            }
-        }
-        hbaseAdmin.close();
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new CleanHtableCLI(), args);
-        System.exit(exitCode);
-    }
-}