You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:31 UTC

[29/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
index 1676429..2fb28c0 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IIDistinctColumnsReducer.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
index 16b8ca1..741dd62 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.job.hadoop.invertedindex;
 
 import java.io.IOException;
+import java.util.ArrayList;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
@@ -32,11 +33,13 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,6 +105,26 @@ public class InvertedIndexJob extends AbstractHadoopJob {
         conf.set(BatchConstants.CFG_II_SEGMENT_NAME, seg.getName());
     }
 
+    protected void attachKylinPropsAndMetadata(IIInstance ii, Configuration conf) throws IOException {
+        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        
+        // write II / model_desc / II_desc / dict / table
+        ArrayList<String> dumpList = new ArrayList<String>();
+        dumpList.add(ii.getResourcePath());
+        dumpList.add(ii.getDescriptor().getModel().getResourcePath());
+        dumpList.add(ii.getDescriptor().getResourcePath());
+
+        for (String tableName : ii.getDescriptor().getModel().getAllTables()) {
+            TableDesc table = metaMgr.getTableDesc(tableName);
+            dumpList.add(table.getResourcePath());
+        }
+        for (IISegment segment : ii.getSegments()) {
+            dumpList.addAll(segment.getDictionaryPaths());
+        }
+
+        attachKylinPropsAndMetadata(dumpList, conf);
+    }
+
     private void setupMapper(String intermediateTable) throws IOException {
 
         String[] dbTableNames = HadoopUtil.parseHiveTableName(intermediateTable);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
index 1d30ee7..0efd585 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexMapper.java
@@ -30,13 +30,13 @@ import org.apache.hive.hcatalog.data.schema.HCatSchema;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
index fa4dccf..584c96b 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexPartitioner.java
@@ -27,13 +27,13 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
index d9b5aee..9476428 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/InvertedIndexReducer.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -32,8 +34,6 @@ import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIRow;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
deleted file mode 100644
index 0f94d32..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionJob.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- * 
- */
-@SuppressWarnings("static-access")
-public class RandomKeyDistributionJob extends AbstractHadoopJob {
-
-    protected static final Logger log = LoggerFactory.getLogger(RandomKeyDistributionJob.class);
-
-    static final Option OPTION_KEY_CLASS = OptionBuilder.withArgName("keyclass").hasArg().isRequired(true).withDescription("Key Class").create("keyclass");
-    static final Option OPTION_REGION_MB = OptionBuilder.withArgName("regionmb").hasArg().isRequired(true).withDescription("MB per Region").create("regionmb");
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_INPUT_PATH);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_KEY_CLASS);
-            options.addOption(OPTION_REGION_MB);
-
-            parseOptions(options, args);
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            job = Job.getInstance(getConf(), jobName);
-
-            setJobClasspath(job);
-            
-            addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-
-            String keyClass = getOptionValue(OPTION_KEY_CLASS);
-            Class<?> keyClz = Class.forName(keyClass);
-
-            int regionMB = Integer.parseInt(getOptionValue(OPTION_REGION_MB));
-
-            // Mapper
-            job.setInputFormatClass(SequenceFileInputFormat.class);
-            job.setMapperClass(RandomKeyDistributionMapper.class);
-            job.setMapOutputKeyClass(keyClz);
-            job.setMapOutputValueClass(NullWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(RandomKeyDistributionReducer.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            job.setOutputKeyClass(keyClz);
-            job.setOutputValueClass(NullWritable.class);
-            job.setNumReduceTasks(1);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            // total map input MB
-            double totalMapInputMB = this.getTotalMapInputMB();
-            int regionCount = Math.max(1, (int) (totalMapInputMB / regionMB));
-            int mapSampleNumber = 1000;
-            System.out.println("Total Map Input MB: " + totalMapInputMB);
-            System.out.println("Region Count: " + regionCount);
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.MAPPER_SAMPLE_NUMBER, String.valueOf(mapSampleNumber));
-            job.getConfiguration().set(BatchConstants.REGION_NUMBER, String.valueOf(regionCount));
-
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new RandomKeyDistributionJob(), args);
-        System.exit(exitCode);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
deleted file mode 100644
index e3e743e..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionMapper.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.util.RandomSampler;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- * 
- */
-public class RandomKeyDistributionMapper<KEY extends Writable, VALUE> extends KylinMapper<KEY, VALUE, KEY, NullWritable> {
-
-    private Configuration conf;
-    private int sampleNumber;
-    private List<KEY> allKeys;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        conf = context.getConfiguration();
-        allKeys = new ArrayList<KEY>();
-        sampleNumber = Integer.parseInt(conf.get(BatchConstants.MAPPER_SAMPLE_NUMBER));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void map(KEY key, VALUE value, Context context) throws IOException, InterruptedException {
-        KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
-        ReflectionUtils.copy(conf, key, keyCopy);
-        allKeys.add(keyCopy);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        RandomSampler<KEY> sampler = new RandomSampler<KEY>();
-        List<KEY> sampleResult = sampler.sample(allKeys, sampleNumber);
-        for (KEY k : sampleResult) {
-            context.write(k, NullWritable.get());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
deleted file mode 100644
index bc6d379..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/RandomKeyDistributionReducer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.invertedindex;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.job.constant.BatchConstants;
-
-/**
- * @author ysong1
- * 
- */
-public class RandomKeyDistributionReducer<KEY extends Writable> extends KylinReducer<KEY, NullWritable, KEY, NullWritable> {
-
-
-    private Configuration conf;
-    private int regionNumber;
-    private List<KEY> allSplits;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        conf = context.getConfiguration();
-        allSplits = new ArrayList<KEY>();
-        regionNumber = Integer.parseInt(context.getConfiguration().get(BatchConstants.REGION_NUMBER));
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void reduce(KEY key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
-        KEY keyCopy = (KEY) ReflectionUtils.newInstance(key.getClass(), conf);
-        ReflectionUtils.copy(conf, key, keyCopy);
-        allSplits.add(keyCopy);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        int stepLength = allSplits.size() / regionNumber;
-        for (int i = stepLength; i < allSplits.size(); i += stepLength) {
-            context.write(allSplits.get(i), NullWritable.get());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java
deleted file mode 100644
index e959ae2..0000000
--- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultContext.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableContext;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- */
-public class DefaultContext implements ExecutableContext {
-
-    private final ConcurrentMap<String, Executable> runningJobs;
-    private final KylinConfig kylinConfig;
-
-    public DefaultContext(ConcurrentMap<String, Executable> runningJobs, KylinConfig kylinConfig) {
-        this.runningJobs = runningJobs;
-        this.kylinConfig = kylinConfig;
-    }
-    @Override
-    public Object getSchedulerContext() {
-        return null;
-    }
-
-    @Override
-    public KylinConfig getConfig() {
-        return kylinConfig;
-    }
-
-    void addRunningJob(Executable executable) {
-        runningJobs.put(executable.getId(), executable);
-    }
-
-    void removeRunningJob(Executable executable) {
-        runningJobs.remove(executable.getId());
-    }
-
-    public Map<String, Executable> getRunningJobs() {
-        return Collections.unmodifiableMap(runningJobs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
deleted file mode 100644
index 8a83870..0000000
--- a/job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import com.google.common.collect.Maps;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.kylin.job.Scheduler;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.concurrent.*;
-
-/**
- */
-public class DefaultScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
-
-
-    private ExecutableManager executableManager;
-    private FetcherRunner fetcher;
-    private ScheduledExecutorService fetcherPool;
-    private ExecutorService jobPool;
-    private DefaultContext context;
-
-    private Logger logger = LoggerFactory.getLogger(DefaultScheduler.class);
-    private volatile boolean initialized = false;
-    private volatile boolean hasStarted = false;
-    private JobEngineConfig jobEngineConfig;
-
-    private static final DefaultScheduler INSTANCE = new DefaultScheduler();
-
-    private DefaultScheduler() {
-    }
-
-    private class FetcherRunner implements Runnable {
-
-        @Override
-        synchronized public void run() {
-            // logger.debug("Job Fetcher is running...");
-            Map<String, Executable> runningJobs = context.getRunningJobs();
-            if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
-                logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
-                return;
-            }
-
-            int nRunning = 0, nReady = 0, nOthers = 0;
-            for (final String id : executableManager.getAllJobIds()) {
-                if (runningJobs.containsKey(id)) {
-                    // logger.debug("Job id:" + id + " is already running");
-                    nRunning++;
-                    continue;
-                }
-                final Output output = executableManager.getOutput(id);
-                if ((output.getState() != ExecutableState.READY)) {
-                    // logger.debug("Job id:" + id + " not runnable");
-                    nOthers++;
-                    continue;
-                }
-                nReady++;
-                AbstractExecutable executable = executableManager.getJob(id);
-                String jobDesc = executable.toString();
-                logger.info(jobDesc + " prepare to schedule");
-                try {
-                    context.addRunningJob(executable);
-                    jobPool.execute(new JobRunner(executable));
-                    logger.info(jobDesc + " scheduled");
-                } catch (Exception ex) {
-                    context.removeRunningJob(executable);
-                    logger.warn(jobDesc + " fail to schedule", ex);
-                }
-            }
-            logger.info("Job Fetcher: " + nRunning + " running, " + runningJobs.size() + " actual running, " + nReady + " ready, " + nOthers + " others");
-        }
-    }
-
-    private class JobRunner implements Runnable {
-
-        private final AbstractExecutable executable;
-
-        public JobRunner(AbstractExecutable executable) {
-            this.executable = executable;
-        }
-
-        @Override
-        public void run() {
-            try {
-                executable.execute(context);
-                // trigger the next step asap
-                fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
-            } catch (ExecuteException e) {
-                logger.error("ExecuteException job:" + executable.getId(), e);
-            } catch (Exception e) {
-                logger.error("unknown error execute job:" + executable.getId(), e);
-            } finally {
-                context.removeRunningJob(executable);
-            }
-        }
-    }
-
-    public static DefaultScheduler getInstance() {
-        return INSTANCE;
-    }
-
-    @Override
-    public void stateChanged(CuratorFramework client, ConnectionState newState) {
-        if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
-            try {
-                shutdown();
-            } catch (SchedulerException e) {
-                throw new RuntimeException("failed to shutdown scheduler", e);
-            }
-        }
-    }
-
-    @Override
-    public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
-        if (!initialized) {
-            initialized = true;
-        } else {
-            return;
-        }
-
-        this.jobEngineConfig = jobEngineConfig;
-
-        if (jobLock.lock() == false) {
-            throw new IllegalStateException("Cannot start job scheduler due to lack of job lock");
-        }
-
-        executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
-        //load all executable, set them to a consistent status
-        fetcherPool = Executors.newScheduledThreadPool(1);
-        int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
-        jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
-        context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
-
-        for (AbstractExecutable executable : executableManager.getAllExecutables()) {
-            if (executable.getStatus() == ExecutableState.READY) {
-                executableManager.updateJobOutput(executable.getId(), ExecutableState.ERROR, null, "scheduler initializing work to reset job to ERROR status");
-            }
-        }
-        executableManager.updateAllRunningJobsToError();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                logger.debug("Closing zk connection");
-                try {
-                    shutdown();
-                    jobLock.unlock();
-                } catch (SchedulerException e) {
-                    logger.error("error shutdown scheduler", e);
-                }
-            }
-        });
-
-        fetcher = new FetcherRunner();
-        fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
-        hasStarted = true;
-    }
-
-    @Override
-    public void shutdown() throws SchedulerException {
-        fetcherPool.shutdown();
-        jobPool.shutdown();
-    }
-
-    @Override
-    public boolean stop(AbstractExecutable executable) throws SchedulerException {
-        if (hasStarted) {
-            return true;
-        } else {
-            //TODO should try to stop this executable
-            return true;
-        }
-    }
-
-    public boolean hasStarted() {
-        return this.hasStarted;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
deleted file mode 100644
index 2a72064..0000000
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/AbstractInMemCubeBuilder.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.kylin.job.inmemcubing;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GridTable;
-import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An interface alike abstract class. Hold common tunable parameters and nothing more.
- */
-abstract public class AbstractInMemCubeBuilder {
-
-    private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
-
-    final protected CubeDesc cubeDesc;
-    final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
-    
-    protected int taskThreadCount = 4;
-    protected int reserveMemoryMB = 100;
-
-    public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
-        if(cubeDesc == null)
-            throw new NullPointerException();
-        if (dictionaryMap == null)
-            throw new IllegalArgumentException("dictionary cannot be null");
-        
-        this.cubeDesc = cubeDesc;
-        this.dictionaryMap = dictionaryMap;
-    }
-    
-    public void setConcurrentThreads(int n) {
-        this.taskThreadCount = n;
-    }
-
-    public void setReserveMemoryMB(int mb) {
-        this.reserveMemoryMB = mb;
-    }
-
-    public Runnable buildAsRunnable(final BlockingQueue<List<String>> input, final ICuboidWriter output) {
-        return new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    build(input, output);
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        };
-    }
-    
-    abstract public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException;
-
-    protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
-        long startTime = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
-        IGTScanner scanner = gridTable.scan(req);
-        for (GTRecord record : scanner) {
-            output.write(cuboidId, record);
-        }
-        scanner.close();
-        logger.info("Cuboid " + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
-    }
-    
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
deleted file mode 100644
index 1ba7e4d..0000000
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/ConcurrentDiskStore.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.HashSet;
-import java.util.NoSuchElementException;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTRowBlock;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A disk store that allows concurrent read and exclusive write.
- */
-public class ConcurrentDiskStore implements IGTStore, Closeable {
-
-    private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
-    private static final boolean debug = true;
-
-    private static final int STREAM_BUFFER_SIZE = 8192;
-
-    final private GTInfo info;
-    final private Object lock;
-
-    final private File diskFile;
-    final private boolean delOnClose;
-
-    private Writer activeWriter;
-    private HashSet<Reader> activeReaders = new HashSet<Reader>();
-    private FileChannel writeChannel;
-    private FileChannel readChannel; // sharable across multi-threads
-
-    public ConcurrentDiskStore(GTInfo info) throws IOException {
-        this(info, File.createTempFile("ConcurrentDiskStore", ""), true);
-    }
-
-    public ConcurrentDiskStore(GTInfo info, File diskFile) throws IOException {
-        this(info, diskFile, false);
-    }
-
-    private ConcurrentDiskStore(GTInfo info, File diskFile, boolean delOnClose) throws IOException {
-        this.info = info;
-        this.lock = this;
-        this.diskFile = diskFile;
-        this.delOnClose = delOnClose;
-
-        // in case user forget to call close()
-        if (delOnClose)
-            diskFile.deleteOnExit();
-
-        if (debug)
-            logger.debug(this + " disk file " + diskFile.getAbsolutePath());
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return info;
-    }
-
-    @Override
-    public IGTStoreWriter rebuild(int shard) throws IOException {
-        return newWriter(0);
-    }
-
-    @Override
-    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
-        throw new IllegalStateException("does not support append yet");
-        //return newWriter(diskFile.length());
-    }
-
-    private IGTStoreWriter newWriter(long startOffset) throws IOException {
-        synchronized (lock) {
-            if (activeWriter != null || !activeReaders.isEmpty())
-                throw new IllegalStateException();
-
-            openWriteChannel(startOffset);
-            activeWriter = new Writer(startOffset);
-            return activeWriter;
-        }
-    }
-
-    private void closeWriter(Writer w) {
-        synchronized (lock) {
-            if (activeWriter != w)
-                throw new IllegalStateException();
-
-            activeWriter = null;
-            closeWriteChannel();
-        }
-    }
-
-    @Override
-    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
-        return newReader();
-    }
-
-    private IGTStoreScanner newReader() throws IOException {
-        synchronized (lock) {
-            if (activeWriter != null)
-                throw new IllegalStateException();
-
-            openReadChannel();
-            Reader r = new Reader(0);
-            activeReaders.add(r);
-            return r;
-        }
-    }
-
-    private void closeReader(Reader r) throws IOException {
-        synchronized (lock) {
-            if (activeReaders.contains(r) == false)
-                throw new IllegalStateException();
-
-            activeReaders.remove(r);
-            if (activeReaders.isEmpty())
-                closeReadChannel();
-        }
-    }
-
-    private class Reader implements IGTStoreScanner {
-        final DataInputStream din;
-        long fileLen;
-        long readOffset;
-
-        GTRowBlock block = GTRowBlock.allocate(info);
-        GTRowBlock next = null;
-
-        Reader(long startOffset) throws IOException {
-            this.fileLen = diskFile.length();
-            this.readOffset = startOffset;
-
-            if (debug)
-                logger.debug(ConcurrentDiskStore.this + " read start @ " + readOffset);
-
-            InputStream in = new InputStream() {
-                byte[] tmp = new byte[1];
-
-                @Override
-                public int read() throws IOException {
-                    int n = read(tmp, 0, 1);
-                    if (n <= 0)
-                        return -1;
-                    else
-                        return (int) tmp[0];
-                }
-
-                @Override
-                public int read(byte[] b, int off, int len) throws IOException {
-                    if (available() <= 0)
-                        return -1;
-
-                    int lenToGo = Math.min(available(), len);
-                    int nRead = 0;
-                    while (lenToGo > 0) {
-                        int n = readChannel.read(ByteBuffer.wrap(b, off, lenToGo), readOffset);
-
-                        lenToGo -= n;
-                        nRead += n;
-                        off += n;
-                        readOffset += n;
-                    }
-                    return nRead;
-                }
-
-                @Override
-                public int available() throws IOException {
-                    return (int) (fileLen - readOffset);
-                }
-            };
-            din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (next != null)
-                return true;
-
-            try {
-                if (din.available() > 0) {
-                    block.importFrom(din);
-                    next = block;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-
-            return next != null;
-        }
-
-        @Override
-        public GTRowBlock next() {
-            if (next == null) {
-                hasNext();
-                if (next == null)
-                    throw new NoSuchElementException();
-            }
-            GTRowBlock r = next;
-            next = null;
-            return r;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void close() throws IOException {
-            din.close();
-            closeReader(this);
-
-            if (debug)
-                logger.debug(ConcurrentDiskStore.this + " read end @ " + readOffset);
-        }
-    }
-    
-    private class Writer implements IGTStoreWriter {
-        final DataOutputStream dout;
-        long writeOffset;
-
-        Writer(long startOffset) {
-            this.writeOffset = startOffset;
-            
-            if (debug)
-                logger.debug(ConcurrentDiskStore.this + " write start @ " + writeOffset);
-
-            OutputStream out = new OutputStream() {
-                byte[] tmp = new byte[1];
-
-                @Override
-                public void write(int b) throws IOException {
-                    tmp[0] = (byte) b;
-                    write(tmp, 0, 1);
-                }
-
-                @Override
-                public void write(byte[] bytes, int offset, int length) throws IOException {
-                    while (length > 0) {
-                        int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), writeOffset);
-                        offset += n;
-                        length -= n;
-                        writeOffset += n;
-                    }
-                }
-            };
-            dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
-        }
-        
-        @Override
-        public void write(GTRowBlock block) throws IOException {
-            block.export(dout);
-        }
-        
-        @Override
-        public void close() throws IOException {
-            dout.close();
-            closeWriter(this);
-
-            if (debug)
-                logger.debug(ConcurrentDiskStore.this + " write end @ " + writeOffset);
-        }
-    }
-
-    private void openWriteChannel(long startOffset) throws IOException {
-        if (startOffset > 0) { // TODO does not support append yet
-            writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
-        } else {
-            diskFile.delete();
-            writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
-        }
-    }
-
-    private void closeWriteChannel() {
-        IOUtils.closeQuietly(writeChannel);
-        writeChannel = null;
-    }
-
-    private void openReadChannel() throws IOException {
-        if (readChannel == null) {
-            readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
-        }
-    }
-
-    private void closeReadChannel() throws IOException {
-        IOUtils.closeQuietly(readChannel);
-        readChannel = null;
-    }
-
-    @Override
-    public void close() throws IOException {
-        synchronized (lock) {
-            if (activeWriter != null || !activeReaders.isEmpty())
-                throw new IllegalStateException();
-
-            if (delOnClose) {
-                diskFile.delete();
-            }
-
-            if (debug)
-                logger.debug(this + " closed");
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "ConcurrentDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
deleted file mode 100644
index ce6541f..0000000
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/DoggedCubeBuilder.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.kylin.job.inmemcubing;
-
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.job.inmemcubing.InMemCubeBuilder.CuboidResult;
-import org.apache.kylin.metadata.measure.MeasureAggregators;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.TimeUnit;
-
-/**
- * When base cuboid does not fit in memory, cut the input into multiple splits and merge the split outputs at last.
- */
-public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
-
-    private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class);
-
-    private int splitRowThreshold = Integer.MAX_VALUE;
-    private int unitRows = 1000;
-
-    public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
-        super(cubeDesc, dictionaryMap);
-    }
-
-    public void setSplitRowThreshold(int rowThreshold) {
-        this.splitRowThreshold = rowThreshold;
-        this.unitRows = Math.min(unitRows, rowThreshold);
-    }
-
-    @Override
-    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
-        new BuildOnce().build(input, output);
-    }
-
-    private class BuildOnce {
-
-        public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
-            final List<SplitThread> splits = new ArrayList<SplitThread>();
-            final Merger merger = new Merger();
-
-            long start = System.currentTimeMillis();
-            logger.info("Dogged Cube Build start");
-
-            try {
-                SplitThread last = null;
-                boolean eof = false;
-
-                while (!eof) {
-
-                    if (last != null && shouldCutSplit(splits)) {
-                        cutSplit(last);
-                        last = null;
-                    }
-
-                    checkException(splits);
-
-                    if (last == null) {
-                        last = new SplitThread();
-                        splits.add(last);
-                        last.start();
-                        logger.info("Split #" + splits.size() + " kickoff");
-                    }
-
-                    eof = feedSomeInput(input, last, unitRows);
-                }
-
-                for (SplitThread split : splits) {
-                    split.join();
-                }
-                checkException(splits);
-                logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");
-
-                merger.mergeAndOutput(splits, output);
-
-            } catch (Throwable e) {
-                logger.error("Dogged Cube Build error", e);
-                if (e instanceof Error)
-                    throw (Error) e;
-                else if (e instanceof RuntimeException)
-                    throw (RuntimeException) e;
-                else
-                    throw new IOException(e);
-            } finally {
-                closeGirdTables(splits);
-                logger.info("Dogged Cube Build end, totally took " + (System.currentTimeMillis() - start) + " ms");
-                ensureExit(splits);
-                logger.info("Dogged Cube Build return");
-            }
-        }
-
-        private void closeGirdTables(List<SplitThread> splits) {
-            for (SplitThread split : splits) {
-                if (split.buildResult != null) {
-                    for (CuboidResult r : split.buildResult.values()) {
-                        try {
-                            r.table.close();
-                        } catch (Throwable e) {
-                            logger.error("Error closing grid table " + r.table, e);
-                        }
-                    }
-                }
-            }
-        }
-
-        private void ensureExit(List<SplitThread> splits) throws IOException {
-            try {
-                for (int i = 0; i < splits.size(); i++) {
-                    SplitThread split = splits.get(i);
-                    if (split.isAlive()) {
-                        abort(splits);
-                    }
-                }
-            } catch (Throwable e) {
-                logger.error("Dogged Cube Build error", e);
-            }
-        }
-
-        private void checkException(List<SplitThread> splits) throws IOException {
-            for (int i = 0; i < splits.size(); i++) {
-                SplitThread split = splits.get(i);
-                if (split.exception != null)
-                    abort(splits);
-            }
-        }
-
-        private void abort(List<SplitThread> splits) throws IOException {
-            for (SplitThread split : splits) {
-                split.builder.abort();
-            }
-
-            ArrayList<Throwable> errors = new ArrayList<Throwable>();
-            for (SplitThread split : splits) {
-                try {
-                    split.join();
-                } catch (InterruptedException e) {
-                    errors.add(e);
-                }
-                if (split.exception != null)
-                    errors.add(split.exception);
-            }
-
-            if (errors.isEmpty()) {
-                return;
-            } else if (errors.size() == 1) {
-                Throwable t = errors.get(0);
-                if (t instanceof IOException)
-                    throw (IOException) t;
-                else
-                    throw new IOException(t);
-            } else {
-                for (Throwable t : errors)
-                    logger.error("Exception during in-mem cube build", t);
-                throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
-            }
-        }
-
-        private boolean feedSomeInput(BlockingQueue<List<String>> input, SplitThread split, int n) {
-            try {
-                int i = 0;
-                while (i < n) {
-                    List<String> record = input.take();
-                    i++;
-
-                    while (split.inputQueue.offer(record, 1, TimeUnit.SECONDS) == false) {
-                        if (split.exception != null)
-                            return true; // got some error
-                    }
-                    split.inputRowCount++;
-
-                    if (record == null || record.isEmpty()) {
-                        return true;
-                    }
-                }
-                return false;
-
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        private void cutSplit(SplitThread last) {
-            try {
-                // signal the end of input
-                while (last.isAlive()) {
-                    if (last.inputQueue.offer(Collections.<String> emptyList())) {
-                        break;
-                    }
-                    Thread.sleep(1000);
-                }
-
-                // wait cuboid build done
-                while (last.isAlive()) {
-                    if (last.builder.isAllCuboidDone()) {
-                        break;
-                    }
-                    Thread.sleep(1000);
-                }
-            } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        private boolean shouldCutSplit(List<SplitThread> splits) {
-            int systemAvailMB = MemoryBudgetController.getSystemAvailMB();
-            int nSplit = splits.size();
-            long splitRowCount = nSplit == 0 ? 0 : splits.get(nSplit - 1).inputRowCount;
-
-            logger.debug(splitRowCount + " records went into split #" + nSplit + "; " + systemAvailMB + " MB left, " + reserveMemoryMB + " MB threshold");
-
-            return splitRowCount >= splitRowThreshold || systemAvailMB <= reserveMemoryMB;
-        }
-    }
-
-    private class SplitThread extends Thread {
-        final BlockingQueue<List<String>> inputQueue = new ArrayBlockingQueue<List<String>>(16);
-        final InMemCubeBuilder builder;
-
-        ConcurrentNavigableMap<Long, CuboidResult> buildResult;
-        long inputRowCount = 0;
-        RuntimeException exception;
-
-        public SplitThread() {
-            this.builder = new InMemCubeBuilder(cubeDesc, dictionaryMap);
-            this.builder.setConcurrentThreads(taskThreadCount);
-            this.builder.setReserveMemoryMB(reserveMemoryMB);
-        }
-
-        @Override
-        public void run() {
-            try {
-                buildResult = builder.build(inputQueue);
-            } catch (Exception e) {
-                if (e instanceof RuntimeException)
-                    this.exception = (RuntimeException) e;
-                else
-                    this.exception = new RuntimeException(e);
-            }
-        }
-    }
-
-    private class Merger {
-
-        MeasureAggregators reuseAggrs;
-        Object[] reuseMetricsArray;
-        ByteArray reuseMetricsSpace;
-
-        long lastCuboidColumnCount;
-        ImmutableBitSet lastMetricsColumns;
-
-        Merger() {
-            MeasureDesc[] measures = CuboidToGridTableMapping.getMeasureSequenceOnGridTable(cubeDesc);
-            reuseAggrs = new MeasureAggregators(measures);
-            reuseMetricsArray = new Object[measures.length];
-        }
-
-        public void mergeAndOutput(List<SplitThread> splits, ICuboidWriter output) throws IOException {
-            if (splits.size() == 1) {
-                for (CuboidResult cuboidResult : splits.get(0).buildResult.values()) {
-                    outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
-                    cuboidResult.table.close();
-                }
-                return;
-            }
-
-            LinkedList<MergeSlot> open = Lists.newLinkedList();
-            for (SplitThread split : splits) {
-                open.add(new MergeSlot(split));
-            }
-
-            PriorityQueue<MergeSlot> heap = new PriorityQueue<MergeSlot>();
-
-            while (true) {
-                // ready records in open slots and add to heap
-                while (!open.isEmpty()) {
-                    MergeSlot slot = open.removeFirst();
-                    if (slot.fetchNext()) {
-                        heap.add(slot);
-                    }
-                }
-
-                // find the smallest on heap
-                MergeSlot smallest = heap.poll();
-                if (smallest == null)
-                    break;
-                open.add(smallest);
-
-                // merge with slots having the same key
-                if (smallest.isSameKey(heap.peek())) {
-                    Object[] metrics = getMetricsValues(smallest.currentRecord);
-                    reuseAggrs.reset();
-                    reuseAggrs.aggregate(metrics);
-                    do {
-                        MergeSlot slot = heap.poll();
-                        open.add(slot);
-                        metrics = getMetricsValues(slot.currentRecord);
-                        reuseAggrs.aggregate(metrics);
-                    } while (smallest.isSameKey(heap.peek()));
-
-                    reuseAggrs.collectStates(metrics);
-                    setMetricsValues(smallest.currentRecord, metrics);
-                }
-
-                output.write(smallest.currentCuboidId, smallest.currentRecord);
-            }
-        }
-
-        private void setMetricsValues(GTRecord record, Object[] metricsValues) {
-            ImmutableBitSet metrics = getMetricsColumns(record);
-
-            if (reuseMetricsSpace == null) {
-                reuseMetricsSpace = new ByteArray(record.getInfo().getMaxColumnLength(metrics));
-            }
-
-            record.setValues(metrics, reuseMetricsSpace, metricsValues);
-        }
-
-        private Object[] getMetricsValues(GTRecord record) {
-            ImmutableBitSet metrics = getMetricsColumns(record);
-            return record.getValues(metrics, reuseMetricsArray);
-        }
-
-        private ImmutableBitSet getMetricsColumns(GTRecord record) {
-            // metrics columns always come after dimension columns
-            if (lastCuboidColumnCount == record.getInfo().getColumnCount())
-                return lastMetricsColumns;
-
-            int to = record.getInfo().getColumnCount();
-            int from = to - reuseMetricsArray.length;
-            lastCuboidColumnCount = record.getInfo().getColumnCount();
-            lastMetricsColumns = new ImmutableBitSet(from, to);
-            return lastMetricsColumns;
-        }
-    }
-
-    private static class MergeSlot implements Comparable<MergeSlot> {
-
-        final Iterator<CuboidResult> cuboidIterator;
-        IGTScanner scanner;
-        Iterator<GTRecord> recordIterator;
-
-        long currentCuboidId;
-        GTRecord currentRecord;
-
-        public MergeSlot(SplitThread split) {
-            cuboidIterator = split.buildResult.values().iterator();
-        }
-
-        public boolean fetchNext() throws IOException {
-            if (recordIterator == null) {
-                if (cuboidIterator.hasNext()) {
-                    CuboidResult cuboid = cuboidIterator.next();
-                    currentCuboidId = cuboid.cuboidId;
-                    scanner = cuboid.table.scan(new GTScanRequest(cuboid.table.getInfo(), null, null, null));
-                    recordIterator = scanner.iterator();
-                } else {
-                    return false;
-                }
-            }
-
-            if (recordIterator.hasNext()) {
-                currentRecord = recordIterator.next();
-                return true;
-            } else {
-                scanner.close();
-                recordIterator = null;
-                return fetchNext();
-            }
-        }
-
-        @Override
-        public int compareTo(MergeSlot o) {
-            long cuboidComp = this.currentCuboidId - o.currentCuboidId;
-            if (cuboidComp != 0)
-                return cuboidComp < 0 ? -1 : 1;
-
-            // note GTRecord.equals() don't work because the two GTRecord comes from different GridTable
-            ImmutableBitSet pk = this.currentRecord.getInfo().getPrimaryKey();
-            for (int i = 0; i < pk.trueBitCount(); i++) {
-                int c = pk.trueBitAt(i);
-                int comp = this.currentRecord.get(c).compareTo(o.currentRecord.get(c));
-                if (comp != 0)
-                    return comp;
-            }
-            return 0;
-        }
-
-        public boolean isSameKey(MergeSlot o) {
-            if (o == null)
-                return false;
-            else
-                return this.compareTo(o) == 0;
-        }
-
-    };
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
deleted file mode 100644
index 91d4a2a..0000000
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/ICuboidWriter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import org.apache.kylin.storage.gridtable.GTRecord;
-
-import java.io.IOException;
-
-/**
- */
-public interface ICuboidWriter {
-    void write(long cuboidId, GTRecord record) throws IOException;
-}