You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/16 05:05:19 UTC

[5/5] incubator-kylin git commit: KYLIN-1010 Job module with only II and tests left

KYLIN-1010 Job module with only II and tests left


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4456bb1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4456bb1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4456bb1d

Branch: refs/heads/2.x-staging
Commit: 4456bb1ddcf249221a1cb1df8186dbbe26e39b4d
Parents: d1f0d33
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Sep 15 19:08:15 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Sep 16 11:04:24 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/cmd/BaseCommandOutput.java |  29 --
 .../apache/kylin/job/cmd/ICommandOutput.java    |  44 --
 .../org/apache/kylin/job/cmd/IJobCommand.java   |  32 --
 .../java/org/apache/kylin/job/cmd/ShellCmd.java | 108 -----
 .../apache/kylin/job/cmd/ShellCmdOutput.java    |  83 ----
 .../engine/mr/common/AbstractHadoopJob.java     |   9 +-
 .../cardinality/ColumnCardinalityMapper.java    | 111 -----
 .../cardinality/ColumnCardinalityReducer.java   |  92 ----
 .../cardinality/HiveColumnCardinalityJob.java   | 107 -----
 .../HiveColumnCardinalityUpdateJob.java         | 154 -------
 .../job/hadoop/cube/NewBaseCuboidMapper.java    | 348 ---------------
 .../job/hadoop/cube/OrphanHBaseCleanJob.java    | 134 ------
 .../job/hadoop/cube/StorageCleanupJob.java      | 323 --------------
 .../apache/kylin/job/monitor/MonitorCLI.java    |  69 ---
 .../kylin/job/monitor/StreamingMonitor.java     | 154 -------
 .../kylin/job/streaming/BootstrapConfig.java    |  74 ----
 .../kylin/job/streaming/CubeStreamConsumer.java | 141 ------
 .../kylin/job/streaming/KafkaDataLoader.java    |  54 ---
 .../kylin/job/streaming/StreamingBootstrap.java | 402 -----------------
 .../kylin/job/streaming/StreamingCLI.java       | 104 -----
 .../apache/kylin/job/tools/CleanHtableCLI.java  |  73 ---
 .../kylin/job/tools/CubeMigrationCLI.java       | 439 -------------------
 .../job/tools/GridTableHBaseBenchmark.java      | 391 -----------------
 .../kylin/job/tools/HbaseStreamingInput.java    | 237 ----------
 .../kylin/job/tools/HtableAlterMetadataCLI.java |  88 ----
 .../org/apache/kylin/job/tools/KafkaVerify.java | 101 -----
 .../apache/kylin/job/tools/RowCounterCLI.java   |  71 ---
 .../kylin/job/tools/StreamingInputAnalyzer.java | 242 ----------
 .../kylin/job/tools/StreamingLogsAnalyser.java  |  96 ----
 .../org/apache/kylin/job/tools/TarGZUtil.java   |  69 ---
 .../apache/kylin/job/tools/TimeHistogram.java   |  85 ----
 .../kylin/job/BuildCubeWithStreamTest.java      |   2 +-
 .../apache/kylin/job/BuildIIWithEngineTest.java |   2 +-
 .../job/tools/ColumnCardinalityReducerTest.java |   4 +-
 .../apache/kylin/rest/service/AdminService.java |   2 +-
 .../apache/kylin/rest/service/CubeService.java  |   4 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |   7 +-
 .../cardinality/ColumnCardinalityMapper.java    | 111 +++++
 .../cardinality/ColumnCardinalityReducer.java   |  92 ++++
 .../cardinality/HiveColumnCardinalityJob.java   | 107 +++++
 .../HiveColumnCardinalityUpdateJob.java         | 154 +++++++
 .../kylin/source/kafka/util/KafkaVerify.java    | 101 +++++
 .../storage/hbase/steps/DeprecatedGCStep.java   |   4 +-
 .../storage/hbase/util/CleanHtableCLI.java      |  73 +++
 .../storage/hbase/util/CubeMigrationCLI.java    | 439 +++++++++++++++++++
 .../hbase/util/GridTableHBaseBenchmark.java     | 391 +++++++++++++++++
 .../storage/hbase/util/HbaseStreamingInput.java | 237 ++++++++++
 .../hbase/util/HtableAlterMetadataCLI.java      |  88 ++++
 .../storage/hbase/util/OrphanHBaseCleanJob.java | 134 ++++++
 .../kylin/storage/hbase/util/RowCounterCLI.java |  71 +++
 .../storage/hbase/util/StorageCleanupJob.java   | 315 +++++++++++++
 .../kylin/storage/hbase/util/TarGZUtil.java     |  69 +++
 .../apache/kylin/job/monitor/MonitorCLI.java    |  69 +++
 .../kylin/job/monitor/StreamingMonitor.java     | 154 +++++++
 .../kylin/job/streaming/BootstrapConfig.java    |  74 ++++
 .../kylin/job/streaming/CubeStreamConsumer.java | 141 ++++++
 .../kylin/job/streaming/KafkaDataLoader.java    |  54 +++
 .../kylin/job/streaming/StreamingBootstrap.java | 402 +++++++++++++++++
 .../kylin/job/streaming/StreamingCLI.java       | 104 +++++
 .../streaming/util/StreamingInputAnalyzer.java  | 242 ++++++++++
 .../streaming/util/StreamingLogsAnalyser.java   |  96 ++++
 .../kylin/streaming/util/TimeHistogram.java     |  85 ++++
 62 files changed, 3816 insertions(+), 4476 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
deleted file mode 100644
index 29b5324..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-/**
- */
-public abstract class BaseCommandOutput implements ICommandOutput {
-
-    @Override
-    public void log(String message) {
-        this.appendOutput(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
deleted file mode 100644
index 6cab6a3..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- * 
- */
-public interface ICommandOutput extends Logger {
-
-    public void setStatus(JobStepStatusEnum status);
-
-    public JobStepStatusEnum getStatus();
-
-    public void appendOutput(String message);
-
-    public String getOutput();
-
-    public void setExitCode(int exitCode);
-
-    public int getExitCode();
-
-    public void reset();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
deleted file mode 100644
index 5a47173..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.exception.JobException;
-
-/**
- * @author xjiang
- * 
- */
-public interface IJobCommand {
-
-    public ICommandOutput execute() throws JobException;
-
-    public void cancel() throws JobException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
deleted file mode 100644
index b93c058..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- * 
- */
-public class ShellCmd implements IJobCommand {
-
-    private static Logger logger = LoggerFactory.getLogger(ShellCmd.class);
-
-    private final String executeCommand;
-    private final ICommandOutput output;
-    private final boolean isAsync;
-    private final CliCommandExecutor cliCommandExecutor;
-
-    private FutureTask<Integer> future;
-
-    private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
-        this.executeCommand = executeCmd;
-        this.output = out;
-        this.cliCommandExecutor = new CliCommandExecutor();
-        this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
-        this.isAsync = async;
-    }
-
-    public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
-        this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
-    }
-
-    @Override
-    public ICommandOutput execute() throws JobException {
-
-        final ExecutorService executor = Executors.newSingleThreadExecutor();
-        future = new FutureTask<Integer>(new Callable<Integer>() {
-            public Integer call() throws JobException, IOException {
-                executor.shutdown();
-                return executeCommand(executeCommand);
-            }
-        });
-        executor.execute(future);
-
-        int exitCode = -1;
-        if (!isAsync) {
-            try {
-                exitCode = future.get();
-                logger.info("finish executing");
-            } catch (CancellationException e) {
-                logger.debug("Command is cancelled");
-                exitCode = -2;
-            } catch (Exception e) {
-                throw new JobException("Error when execute job " + executeCommand, e);
-            } finally {
-                if (exitCode == 0) {
-                    output.setStatus(JobStepStatusEnum.FINISHED);
-                } else if (exitCode == -2) {
-                    output.setStatus(JobStepStatusEnum.DISCARDED);
-                } else {
-                    output.setStatus(JobStepStatusEnum.ERROR);
-                }
-                output.setExitCode(exitCode);
-            }
-        }
-        return output;
-    }
-
-    protected int executeCommand(String command) throws JobException, IOException {
-        output.reset();
-        output.setStatus(JobStepStatusEnum.RUNNING);
-        return cliCommandExecutor.execute(command, output).getFirst();
-    }
-
-    @Override
-    public void cancel() throws JobException {
-        future.cancel(true);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
deleted file mode 100644
index 44609c2..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- * 
- */
-public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
-
-    protected static final Logger logger = LoggerFactory.getLogger(ShellCmdOutput.class);
-
-    protected StringBuilder output;
-    protected int exitCode;
-    protected JobStepStatusEnum status;
-
-    public ShellCmdOutput() {
-        init();
-    }
-
-    private void init() {
-        output = new StringBuilder();
-        exitCode = -1;
-        status = JobStepStatusEnum.NEW;
-    }
-
-    @Override
-    public JobStepStatusEnum getStatus() {
-        return status;
-    }
-
-    @Override
-    public void setStatus(JobStepStatusEnum s) {
-        this.status = s;
-    }
-
-    @Override
-    public String getOutput() {
-        return output.toString();
-    }
-
-    @Override
-    public void appendOutput(String message) {
-        output.append(message).append(System.getProperty("line.separator"));
-        logger.debug(message);
-    }
-
-    @Override
-    public int getExitCode() {
-        return exitCode;
-    }
-
-    @Override
-    public void setExitCode(int code) {
-        exitCode = code;
-    }
-
-    @Override
-    public void reset() {
-        init();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index cd9393a..cf492ef 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -23,7 +23,7 @@ package org.apache.kylin.engine.mr.common;
  *
  */
 
-import static org.apache.hadoop.util.StringUtils.formatTime;
+import static org.apache.hadoop.util.StringUtils.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -58,7 +58,6 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
 import org.apache.kylin.job.common.OptionsHelper;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.MetadataManager;
@@ -189,10 +188,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         String classpath = "";
         try {
             CliCommandExecutor executor = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
-            ShellCmdOutput output = new ShellCmdOutput();
-            executor.execute("mapred classpath", output);
-
-            classpath = output.getOutput().trim().replace(':', ',');
+            String output = executor.execute("mapred classpath").getSecond();
+            classpath = output.trim().replace(':', ',');
         } catch (IOException e) {
             logger.error("Failed to run: 'mapred classpath'.", e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
deleted file mode 100644
index 9b77a47..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-
-/**
- * @author Jack
- * 
- */
-public class ColumnCardinalityMapper<T> extends KylinMapper<T, Object, IntWritable, BytesWritable> {
-
-    private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
-    public static final String DEFAULT_DELIM = ",";
-
-    private int counter = 0;
-
-    private TableDesc tableDesc;
-    private IMRTableInputFormat tableInputFormat;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        Configuration conf = context.getConfiguration();
-        bindCurrentConfiguration(conf);
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        String tableName = conf.get(BatchConstants.TABLE_NAME);
-        tableDesc = MetadataManager.getInstance(config).getTableDesc(tableName);
-        tableInputFormat = MRUtil.getTableInputFormat(tableDesc);
-    }
-
-    @Override
-    public void map(T key, Object value, Context context) throws IOException, InterruptedException {
-        ColumnDesc[] columns = tableDesc.getColumns();
-        String[] values = tableInputFormat.parseMapperInput(value);
-
-        for (int m = 0; m < columns.length; m++) {
-            String field = columns[m].getName();
-            String fieldValue = values[m];
-            if (fieldValue == null)
-                fieldValue = "NULL";
-
-            if (counter < 5 && m < 10) {
-                System.out.println("Get row " + counter + " column '" + field + "'  value: " + fieldValue);
-            }
-
-            if (fieldValue != null)
-                getHllc(m).add(Bytes.toBytes(fieldValue.toString()));
-        }
-
-        counter++;
-    }
-
-    private HyperLogLogPlusCounter getHllc(Integer key) {
-        if (!hllcMap.containsKey(key)) {
-            hllcMap.put(key, new HyperLogLogPlusCounter());
-        }
-        return hllcMap.get(key);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
deleted file mode 100644
index 6953235..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author Jack
- * 
- */
-public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWritable, IntWritable, LongWritable> {
-
-    public static final int ONE = 1;
-    private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-    }
-
-    @Override
-    public void reduce(IntWritable key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
-        int skey = key.get();
-        for (BytesWritable v : values) {
-            ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
-            HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter();
-            hll.readRegisters(buffer);
-            getHllc(skey).merge(hll);
-            hll.clear();
-        }
-    }
-
-    private HyperLogLogPlusCounter getHllc(Integer key) {
-        if (!hllcMap.containsKey(key)) {
-            hllcMap.put(key, new HyperLogLogPlusCounter());
-        }
-        return hllcMap.get(key);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        List<Integer> keys = new ArrayList<Integer>();
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        while (it.hasNext()) {
-            keys.add(it.next());
-        }
-        Collections.sort(keys);
-        it = keys.iterator();
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
-            // context.write(new Text("ErrorRate_" + key), new
-            // LongWritable((long)hllc.getErrorRate()));
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
deleted file mode 100644
index 89d20cf..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityJob.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-
-/**
- * This hadoop job will scan all rows of the hive table and then calculate the cardinality on each column.
- * @author shaoshi
- *
- */
-public class HiveColumnCardinalityJob extends AbstractHadoopJob {
-    public static final String JOB_TITLE = "Kylin Hive Column Cardinality Job";
-
-    @SuppressWarnings("static-access")
-    protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
-
-    public static final String OUTPUT_PATH = BatchConstants.CFG_KYLIN_HDFS_TEMP_DIR + "cardinality";
-
-    public HiveColumnCardinalityJob() {
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_TABLE);
-            options.addOption(OPTION_OUTPUT_PATH);
-
-            parseOptions(options, args);
-
-            // start job
-            String jobName = JOB_TITLE + getOptionsAsString();
-            System.out.println("Starting: " + jobName);
-            Configuration conf = getConf();
-            job = Job.getInstance(conf, jobName);
-
-            setJobClasspath(job);
-
-            String table = getOptionValue(OPTION_TABLE);
-            job.getConfiguration().set(BatchConstants.TABLE_NAME, table);
-
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-            FileOutputFormat.setOutputPath(job, output);
-            job.getConfiguration().set("dfs.block.size", "67108864");
-
-            // Mapper
-            IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table);
-            tableInputFormat.configureJob(job);
-
-            job.setMapperClass(ColumnCardinalityMapper.class);
-            job.setMapOutputKeyClass(IntWritable.class);
-            job.setMapOutputValueClass(BytesWritable.class);
-
-            // Reducer - only one
-            job.setReducerClass(ColumnCardinalityReducer.class);
-            job.setOutputFormatClass(TextOutputFormat.class);
-            job.setOutputKeyClass(IntWritable.class);
-            job.setOutputValueClass(LongWritable.class);
-            job.setNumReduceTasks(1);
-
-            this.deletePath(job.getConfiguration(), output);
-
-            System.out.println("Going to submit HiveColumnCardinalityJob for table '" + table + "'");
-            int result = waitForCompletion(job);
-
-            return result;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
deleted file mode 100644
index 6f8959b..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/HiveColumnCardinalityUpdateJob.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.MetadataManager;
-
-/**
- * This job will update save the cardinality result into Kylin table metadata store.
- * @author shaoshi
- *
- */
-public class HiveColumnCardinalityUpdateJob extends AbstractHadoopJob {
-    public static final String JOB_TITLE = "Kylin Hive Column Cardinality Update Job";
-
-    @SuppressWarnings("static-access")
-    protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true).withDescription("The hive table name").create("table");
-
-    private String table;
-
-    public HiveColumnCardinalityUpdateJob() {
-
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_TABLE);
-            options.addOption(OPTION_OUTPUT_PATH);
-
-            parseOptions(options, args);
-
-            this.table = getOptionValue(OPTION_TABLE).toUpperCase();
-            // start job
-            String jobName = JOB_TITLE + getOptionsAsString();
-            System.out.println("Starting: " + jobName);
-            Configuration conf = getConf();
-            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-
-            updateKylinTableExd(table.toUpperCase(), output.toString(), conf);
-            return 0;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-
-    }
-
-    public void updateKylinTableExd(String tableName, String outPath, Configuration config) throws IOException {
-        List<String> columns = null;
-        try {
-            columns = readLines(new Path(outPath), config);
-        } catch (Exception e) {
-            e.printStackTrace();
-            System.out.println("Failed to resolve cardinality for " + tableName + " from " + outPath);
-            return;
-        }
-
-        StringBuffer cardi = new StringBuffer();
-        Iterator<String> it = columns.iterator();
-        while (it.hasNext()) {
-            String string = (String) it.next();
-            String[] ss = StringUtils.split(string, "\t");
-
-            if (ss.length != 2) {
-                System.out.println("The hadoop cardinality value is not valid " + string);
-                continue;
-            }
-            cardi.append(ss[1]);
-            cardi.append(",");
-        }
-        String scardi = cardi.toString();
-        scardi = scardi.substring(0, scardi.length() - 1);
-        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-        Map<String, String> tableExd = metaMgr.getTableDescExd(tableName);
-        tableExd.put(MetadataConstants.TABLE_EXD_CARDINALITY, scardi);
-        metaMgr.saveTableExd(tableName.toUpperCase(), tableExd);
-    }
-
-    private static List<String> readLines(Path location, Configuration conf) throws Exception {
-        FileSystem fileSystem = FileSystem.get(location.toUri(), conf);
-        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
-        FileStatus[] items = fileSystem.listStatus(location);
-        if (items == null)
-            return new ArrayList<String>();
-        List<String> results = new ArrayList<String>();
-        for (FileStatus item : items) {
-
-            // ignoring files like _SUCCESS
-            if (item.getPath().getName().startsWith("_")) {
-                continue;
-            }
-
-            CompressionCodec codec = factory.getCodec(item.getPath());
-            InputStream stream = null;
-
-            // check if we have a compression codec we need to use
-            if (codec != null) {
-                stream = codec.createInputStream(fileSystem.open(item.getPath()));
-            } else {
-                stream = fileSystem.open(item.getPath());
-            }
-
-            StringWriter writer = new StringWriter();
-            IOUtils.copy(stream, writer, "UTF-8");
-            String raw = writer.toString();
-            for (String str : raw.split("\n")) {
-                results.add(str);
-            }
-        }
-        return results;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
deleted file mode 100644
index 0cc6f71..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewBaseCuboidMapper.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesSplitter;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.lookup.LookupBytesTable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.SourceFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author George Song (ysong1),honma
- */
-public class NewBaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text> {
-
-    private static final Logger logger = LoggerFactory.getLogger(NewBaseCuboidMapper.class);
-
-    private String cubeName;
-    private String segmentName;
-    private Cuboid baseCuboid;
-    private CubeInstance cube;
-    private CubeSegment cubeSegment;
-
-    private CubeDesc cubeDesc;
-    private MetadataManager metadataManager;
-    private TableDesc factTableDesc;
-
-    private boolean byteRowDelimiterInferred = false;
-    private byte byteRowDelimiter;
-
-    private int counter;
-    private Text outputKey = new Text();
-    private Text outputValue = new Text();
-    private Object[] measures;
-    private byte[][] keyBytesBuf;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-    private BytesSplitter bytesSplitter;
-    private AbstractRowKeyEncoder rowKeyEncoder;
-    private MeasureCodec measureCodec;
-
-    // deal with table join
-    private HashMap<String, LookupBytesTable> lookupTables;// name -> table
-    private LinkedList<TableJoin> tableJoins;
-    private LinkedList<Pair<Integer, Integer>> factTblColAsRowKey;// similar as
-    // TableJoin.dimTblColAsRowKey
-    private int[][] measureColumnIndice;
-    private byte[] nullValue;
-
-    private class TableJoin {
-        public LinkedList<Integer> fkIndice;// zero-based join columns on fact
-        // table
-        public String lookupTableName;
-        public String joinType;
-
-        // Pair.first -> zero-based column index in lookup table
-        // Pair.second -> zero based row key index
-        public LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey;
-
-        private TableJoin(String joinType, LinkedList<Integer> fkIndice, String lookupTableName, LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey) {
-            this.joinType = joinType;
-            this.fkIndice = fkIndice;
-            this.lookupTableName = lookupTableName;
-            this.dimTblColAsRowKey = dimTblColAsRowKey;
-        }
-    }
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        metadataManager = MetadataManager.getInstance(config);
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        cubeDesc = cube.getDescriptor();
-        factTableDesc = cubeDesc.getFactTableDesc();
-
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
-        // intermediateTableDesc = new
-        // JoinedFlatTableDesc(cube.getDescriptor());
-
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
-        measureCodec = new MeasureCodec(cubeDesc.getMeasures());
-        measures = new Object[cubeDesc.getMeasures().size()];
-
-        int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
-        keyBytesBuf = new byte[colCount][];
-
-        bytesSplitter = new BytesSplitter(factTableDesc.getColumns().length, 4096);
-
-        nullValue = new byte[] { (byte) '\\', (byte) 'N' };// As in Hive, null
-        // value is
-        // represented by \N
-
-        prepareJoins();
-        prepareMetrics();
-    }
-
-    private void prepareJoins() throws IOException {
-        this.lookupTables = new HashMap<String, LookupBytesTable>();
-        this.tableJoins = new LinkedList<TableJoin>();
-        this.factTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
-
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            JoinDesc join = dim.getJoin();
-            if (join != null) {
-                String joinType = join.getType().toUpperCase();
-                String lookupTableName = dim.getTable();
-
-                // load lookup tables
-                if (!lookupTables.containsKey(lookupTableName)) {
-                    TableDesc tableDesc = metadataManager.getTableDesc(lookupTableName);
-                    ReadableTable htable = SourceFactory.createReadableTable(tableDesc);
-                    LookupBytesTable btable = new LookupBytesTable(tableDesc, join.getPrimaryKey(), htable);
-                    lookupTables.put(lookupTableName, btable);
-                }
-
-                // create join infos
-                LinkedList<Integer> fkIndice = new LinkedList<Integer>();
-                for (TblColRef colRef : join.getForeignKeyColumns()) {
-                    fkIndice.add(colRef.getColumnDesc().getZeroBasedIndex());
-                }
-                this.tableJoins.add(new TableJoin(joinType, fkIndice, lookupTableName, this.findColumnRowKeyRelationships(dim)));
-
-            } else {
-
-                this.factTblColAsRowKey.addAll(this.findColumnRowKeyRelationships(dim));
-            }
-        }
-
-        // put composite keys joins ahead of single key joins
-        Collections.sort(tableJoins, new Comparator<TableJoin>() {
-            @Override
-            public int compare(TableJoin o1, TableJoin o2) {
-                return Integer.valueOf(o2.fkIndice.size()).compareTo(Integer.valueOf(o1.fkIndice.size()));
-            }
-        });
-    }
-
-    private LinkedList<Pair<Integer, Integer>> findColumnRowKeyRelationships(DimensionDesc dim) {
-        LinkedList<Pair<Integer, Integer>> dimTblColAsRowKey = new LinkedList<Pair<Integer, Integer>>();
-        for (TblColRef colRef : dim.getColumnRefs()) {
-            int dimTableIndex = colRef.getColumnDesc().getZeroBasedIndex();
-            int rowKeyIndex = cubeDesc.getRowkey().getRowKeyIndexByColumnName(colRef.getName());
-            dimTblColAsRowKey.add(new Pair<Integer, Integer>(dimTableIndex, rowKeyIndex));
-        }
-        return dimTblColAsRowKey;
-    }
-
-    private void prepareMetrics() {
-        List<MeasureDesc> measures = cubeDesc.getMeasures();
-        int measureSize = measures.size();
-        measureColumnIndice = new int[measureSize][];
-        for (int i = 0; i < measureSize; i++) {
-            FunctionDesc func = measures.get(i).getFunction();
-            List<TblColRef> colRefs = func.getParameter().getColRefs();
-            if (colRefs == null) {
-                measureColumnIndice[i] = null;
-            } else {
-                measureColumnIndice[i] = new int[colRefs.size()];
-                for (int j = 0; j < colRefs.size(); j++) {
-                    TblColRef c = colRefs.get(j);
-                    int factTblIdx = factTableDesc.findColumnByName(c.getName()).getZeroBasedIndex();
-                    measureColumnIndice[i][j] = factTblIdx;
-                }
-            }
-        }
-    }
-
-    private byte[] trimSplitBuffer(SplittedBytes splittedBytes) {
-        return Arrays.copyOf(splittedBytes.value, splittedBytes.length);
-    }
-
-    private byte[] buildKey(SplittedBytes[] splitBuffers) {
-
-        int filledDimension = 0;// debug
-
-        // join lookup tables, and fill into RowKey the columns in lookup table
-        for (TableJoin tableJoin : this.tableJoins) {
-            String dimTblName = tableJoin.lookupTableName;
-            LookupBytesTable dimTbl = this.lookupTables.get(dimTblName);
-            ByteArray[] rawKey = new ByteArray[tableJoin.fkIndice.size()];
-            for (int i = 0; i < tableJoin.fkIndice.size(); ++i) {
-                rawKey[i] = new ByteArray(trimSplitBuffer(splitBuffers[tableJoin.fkIndice.get(i)]));
-            }
-            Array<ByteArray> key = new Array<ByteArray>(rawKey);
-            ByteArray[] dimRow = dimTbl.getRow(key);
-            if (dimRow == null) {
-                if (tableJoin.joinType.equalsIgnoreCase("INNER")) {
-                    return null;
-                } else if (tableJoin.joinType.equalsIgnoreCase("LEFT")) {
-                    for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
-                        keyBytesBuf[relation.getSecond()] = nullValue;
-                        filledDimension++;
-                    }
-                }
-            } else {
-                for (Pair<Integer, Integer> relation : tableJoin.dimTblColAsRowKey) {
-                    keyBytesBuf[relation.getSecond()] = dimRow[relation.getFirst()].array();
-                    filledDimension++;
-                }
-            }
-        }
-
-        // fill into RowKey the columns in fact table
-        for (Pair<Integer, Integer> relation : this.factTblColAsRowKey) {
-            keyBytesBuf[relation.getSecond()] = trimSplitBuffer(splitBuffers[relation.getFirst()]);
-            filledDimension++;
-        }
-
-        assert filledDimension == keyBytesBuf.length;
-
-        // all the row key slots(keyBytesBuf) should be complete now
-        return rowKeyEncoder.encode(keyBytesBuf);
-    }
-
-    private void buildValue(SplittedBytes[] splitBuffers) {
-
-        for (int i = 0; i < measures.length; i++) {
-            byte[] valueBytes = getValueBytes(splitBuffers, i);
-            measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
-        }
-
-        valueBuf.clear();
-        measureCodec.encode(measures, valueBuf);
-    }
-
-    private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
-        MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
-        ParameterDesc paramDesc = desc.getFunction().getParameter();
-        int[] flatTableIdx = this.measureColumnIndice[measureIdx];
-
-        byte[] result = null;
-
-        // constant
-        if (flatTableIdx == null) {
-            result = Bytes.toBytes(paramDesc.getValue());
-        }
-        // column values
-        else {
-            for (int i = 0; i < flatTableIdx.length; i++) {
-                SplittedBytes split = splitBuffers[flatTableIdx[i]];
-                result = Arrays.copyOf(split.value, split.length);
-            }
-        }
-
-        if (desc.getFunction().isCount()) {
-            result = Bytes.toBytes("1");
-        }
-
-        return result;
-    }
-
-    @Override
-    public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
-        // combining the hive table flattening logic into base cuboid building.
-        // the input of this mapper is the fact table rows
-
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-
-        if (!byteRowDelimiterInferred)
-            byteRowDelimiter = bytesSplitter.inferByteRowDelimiter(value.getBytes(), value.getLength(), factTableDesc.getColumns().length);
-
-        bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
-
-        try {
-            byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
-            if (rowKey == null)
-                return;// skip this fact table row
-
-            outputKey.set(rowKey, 0, rowKey.length);
-
-            buildValue(bytesSplitter.getSplitBuffers());
-            outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
-            context.write(outputKey, outputValue);
-
-        } catch (Throwable t) {
-            logger.error("", t);
-            context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Error records").increment(1L);
-            return;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
deleted file mode 100644
index 5c94542..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/OrphanHBaseCleanJob.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class OrphanHBaseCleanJob extends AbstractHadoopJob {
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_WHITELIST = OptionBuilder.withArgName("whitelist").hasArg().isRequired(true).withDescription("metadata store whitelist, separated with comma").create("whitelist");
-
-    protected static final Logger logger = LoggerFactory.getLogger(OrphanHBaseCleanJob.class);
-
-    boolean delete = false;
-    Set<String> metastoreWhitelistSet = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        logger.info("jobs args: " + Arrays.toString(args));
-        try {
-            options.addOption(OPTION_DELETE);
-            options.addOption(OPTION_WHITELIST);
-            parseOptions(options, args);
-
-            logger.info("options: '" + getOptionsAsString() + "'");
-            logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
-            delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
-            String[] metastoreWhitelist = getOptionValue(OPTION_WHITELIST).split(",");
-
-            for (String ms : metastoreWhitelist) {
-                logger.info("metadata store in white list: " + ms);
-                metastoreWhitelistSet.add(ms);
-            }
-
-            Configuration conf = HBaseConfiguration.create(getConf());
-
-            cleanUnusedHBaseTables(conf);
-
-            return 0;
-        } catch (Exception e) {
-            e.printStackTrace(System.err);
-            throw e;
-        }
-    }
-
-    private void cleanUnusedHBaseTables(Configuration conf) throws IOException {
-
-        // get all kylin hbase tables
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
-        String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
-        HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
-        List<String> allTablesNeedToBeDropped = new ArrayList<String>();
-        for (HTableDescriptor desc : tableDescriptors) {
-            String host = desc.getValue(IRealizationConstants.HTableTag);
-            if (!metastoreWhitelistSet.contains(host)) {
-                logger.info("HTable {} is recognized as orphan because its tag is {}", desc.getTableName(), host);
-                //collect orphans
-                allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
-            } else {
-                logger.info("HTable {} belongs to {}", desc.getTableName(), host);
-            }
-        }
-
-        if (delete == true) {
-            // drop tables
-            for (String htableName : allTablesNeedToBeDropped) {
-                logger.info("Deleting HBase table " + htableName);
-                if (hbaseAdmin.tableExists(htableName)) {
-                    if (hbaseAdmin.isTableEnabled(htableName)) {
-                        hbaseAdmin.disableTable(htableName);
-                    }
-
-                    hbaseAdmin.deleteTable(htableName);
-                    logger.info("Deleted HBase table " + htableName);
-                } else {
-                    logger.info("HBase table" + htableName + " does not exist");
-                }
-            }
-        } else {
-            System.out.println("--------------- Tables To Be Dropped ---------------");
-            for (String htableName : allTablesNeedToBeDropped) {
-                System.out.println(htableName);
-            }
-            System.out.println("----------------------------------------------------");
-        }
-
-        hbaseAdmin.close();
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new OrphanHBaseCleanJob(), args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
deleted file mode 100644
index fdde11f..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.cmd.ICommandOutput;
-import org.apache.kylin.job.cmd.ShellCmd;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.JobException;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class StorageCleanupJob extends AbstractHadoopJob {
-
-    @SuppressWarnings("static-access")
-    private static final Option OPTION_DELETE = OptionBuilder.withArgName("delete").hasArg().isRequired(false).withDescription("Delete the unused storage").create("delete");
-
-    protected static final Logger logger = LoggerFactory.getLogger(StorageCleanupJob.class);
-
-    public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
-
-    boolean delete = false;
-
-    protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.hadoop.util.Tool#run(java.lang.String[])
-     */
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        logger.info("jobs args: " + Arrays.toString(args));
-        try {
-            options.addOption(OPTION_DELETE);
-            parseOptions(options, args);
-
-            logger.info("options: '" + getOptionsAsString() + "'");
-            logger.info("delete option value: '" + getOptionValue(OPTION_DELETE) + "'");
-            delete = Boolean.parseBoolean(getOptionValue(OPTION_DELETE));
-
-            Configuration conf = HBaseConfiguration.create(getConf());
-
-            cleanUnusedIntermediateHiveTable(conf);
-            cleanUnusedHdfsFiles(conf);
-            cleanUnusedHBaseTables(conf);
-
-            return 0;
-        } catch (Exception e) {
-            printUsage(options);
-            throw e;
-        }
-    }
-
-    private void cleanUnusedHBaseTables(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
-        CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-        IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-        // get all kylin hbase tables
-        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
-        String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix;
-        HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*");
-        List<String> allTablesNeedToBeDropped = new ArrayList<String>();
-        for (HTableDescriptor desc : tableDescriptors) {
-            String host = desc.getValue(IRealizationConstants.HTableTag);
-            String creationTime = desc.getValue(IRealizationConstants.HTableCreationTime);
-            if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) {
-                //only take care htables that belongs to self, and created more than 2 days
-                if (StringUtils.isEmpty(creationTime) || (System.currentTimeMillis() - Long.valueOf(creationTime) > TIME_THREADSHOLD)) {
-                    allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString());
-                }
-            }
-        }
-
-        // remove every segment htable from drop list
-        for (CubeInstance cube : cubeMgr.listAllCubes()) {
-            for (CubeSegment seg : cube.getSegments()) {
-                String tablename = seg.getStorageLocationIdentifier();
-                if (allTablesNeedToBeDropped.contains(tablename)) {
-                    allTablesNeedToBeDropped.remove(tablename);
-                    logger.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus());
-                }
-            }
-        }
-
-        // remove every ii segment htable from drop list
-        for (IIInstance ii : iiManager.listAllIIs()) {
-            for (IISegment seg : ii.getSegments()) {
-                String tablename = seg.getStorageLocationIdentifier();
-
-                if (allTablesNeedToBeDropped.contains(tablename)) {
-                    allTablesNeedToBeDropped.remove(tablename);
-                    logger.info("Exclude table " + tablename + " from drop list, as the table belongs to ii " + ii.getName() + " with status " + ii.getStatus());
-                }
-            }
-        }
-
-        if (delete == true) {
-            // drop tables
-            for (String htableName : allTablesNeedToBeDropped) {
-                logger.info("Deleting HBase table " + htableName);
-                if (hbaseAdmin.tableExists(htableName)) {
-                    if (hbaseAdmin.isTableEnabled(htableName)) {
-                        hbaseAdmin.disableTable(htableName);
-                    }
-
-                    hbaseAdmin.deleteTable(htableName);
-                    logger.info("Deleted HBase table " + htableName);
-                } else {
-                    logger.info("HBase table" + htableName + " does not exist");
-                }
-            }
-        } else {
-            System.out.println("--------------- Tables To Be Dropped ---------------");
-            for (String htableName : allTablesNeedToBeDropped) {
-                System.out.println(htableName);
-            }
-            System.out.println("----------------------------------------------------");
-        }
-
-        hbaseAdmin.close();
-    }
-
-    private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
-        JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-        CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-        FileSystem fs = FileSystem.get(conf);
-        List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
-        // GlobFilter filter = new
-        // GlobFilter(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
-        // + "/kylin-.*");
-        FileStatus[] fStatus = fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()));
-        for (FileStatus status : fStatus) {
-            String path = status.getPath().getName();
-            // System.out.println(path);
-            if (path.startsWith(JobInstance.JOB_WORKING_DIR_PREFIX)) {
-                String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + path;
-                allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
-            }
-        }
-
-        List<String> allJobs = executableManager.getAllJobIds();
-        for (String jobId : allJobs) {
-            // only remove FINISHED and DISCARDED job intermediate files
-            final ExecutableState state = executableManager.getOutput(jobId).getState();
-            if (!state.isFinalState()) {
-                String path = JobInstance.getJobWorkingDir(jobId, engineConfig.getHdfsWorkingDirectory());
-                allHdfsPathsNeedToBeDeleted.remove(path);
-                logger.info("Remove " + path + " from deletion list, as the path belongs to job " + jobId + " with status " + state);
-            }
-        }
-
-        // remove every segment working dir from deletion list
-        for (CubeInstance cube : cubeMgr.listAllCubes()) {
-            for (CubeSegment seg : cube.getSegments()) {
-                String jobUuid = seg.getLastBuildJobID();
-                if (jobUuid != null && jobUuid.equals("") == false) {
-                    String path = JobInstance.getJobWorkingDir(jobUuid, engineConfig.getHdfsWorkingDirectory());
-                    allHdfsPathsNeedToBeDeleted.remove(path);
-                    logger.info("Remove " + path + " from deletion list, as the path belongs to segment " + seg + " of cube " + cube.getName());
-                }
-            }
-        }
-
-        if (delete == true) {
-            // remove files
-            for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
-                logger.info("Deleting hdfs path " + hdfsPath);
-                Path p = new Path(hdfsPath);
-                if (fs.exists(p) == true) {
-                    fs.delete(p, true);
-                    logger.info("Deleted hdfs path " + hdfsPath);
-                } else {
-                    logger.info("Hdfs path " + hdfsPath + "does not exist");
-                }
-            }
-        } else {
-            System.out.println("--------------- HDFS Path To Be Deleted ---------------");
-            for (String hdfsPath : allHdfsPathsNeedToBeDeleted) {
-                System.out.println(hdfsPath);
-            }
-            System.out.println("-------------------------------------------------------");
-        }
-
-    }
-
-    private void cleanUnusedIntermediateHiveTable(Configuration conf) throws IOException {
-        int uuidLength = 36;
-        final String useDatabaseHql = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";";
-        StringBuilder buf = new StringBuilder();
-        buf.append("hive -e \"");
-        buf.append(useDatabaseHql);
-        buf.append("show tables " + "\'kylin_intermediate_*\'" + "; ");
-        buf.append("\"");
-
-        ShellCmd cmd = new ShellCmd(buf.toString(), null, 0, null, null, false);
-        ICommandOutput output = null;
-
-        try {
-            output = cmd.execute();
-        } catch (JobException e) {
-            e.printStackTrace();
-        }
-
-        if (output == null)
-            return;
-        String outputStr = output.getOutput();
-        BufferedReader reader = new BufferedReader(new StringReader(outputStr));
-        String line = null;
-        List<String> allJobs = executableManager.getAllJobIds();
-        List<String> allHiveTablesNeedToBeDeleted = new ArrayList<String>();
-        List<String> workingJobList = new ArrayList<String>();
-
-        for (String jobId : allJobs) {
-            // only remove FINISHED and DISCARDED job intermediate table
-            final ExecutableState state = executableManager.getOutput(jobId).getState();
-
-            if (!state.isFinalState()) {
-                workingJobList.add(jobId);
-                logger.info("Remove intermediate hive table with job id " + jobId + " with job status " + state);
-            }
-        }
-
-        while ((line = reader.readLine()) != null) {
-            if (line.startsWith("kylin_intermediate_")) {
-                boolean isNeedDel = false;
-                String uuid = line.substring(line.length() - uuidLength, line.length());
-                uuid = uuid.replace("_", "-");
-                //Check whether it's a hive table in use
-                if (allJobs.contains(uuid) && !workingJobList.contains(uuid)) {
-                    isNeedDel = true;
-                }
-
-                if (isNeedDel) {
-                    allHiveTablesNeedToBeDeleted.add(line);
-                }
-            }
-        }
-
-        if (delete == true) {
-            buf.delete(0, buf.length());
-            buf.append("hive -e \"");
-            buf.append(useDatabaseHql);
-            for (String delHive : allHiveTablesNeedToBeDeleted) {
-                buf.append("drop table if exists " + delHive + "; ");
-                logger.info("Remove " + delHive + " from hive tables.");
-            }
-            buf.append("\"");
-            cmd = new ShellCmd(buf.toString(), null, 0, null, null, false);
-
-            try {
-                cmd.execute();
-            } catch (JobException e) {
-                e.printStackTrace();
-            }
-        } else {
-            System.out.println("------ Intermediate Hive Tables To Be Dropped ------");
-            for (String hiveTable : allHiveTablesNeedToBeDeleted) {
-                System.out.println(hiveTable);
-            }
-            System.out.println("----------------------------------------------------");
-        }
-
-        if (reader != null)
-            reader.close();
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new StorageCleanupJob(), args);
-        System.exit(exitCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java b/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
deleted file mode 100644
index 7b9831a..0000000
--- a/job/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
-    private static final Logger logger = LoggerFactory.getLogger(MonitorCLI.class);
-
-    public static void main(String[] args) {
-        Preconditions.checkArgument(args[0].equals("monitor"));
-
-        int i = 1;
-        List<String> receivers = null;
-        String host = null;
-        String tableName = null;
-        String authorization = null;
-        String cubeName = null;
-        String projectName = "default";
-        while (i < args.length) {
-            String argName = args[i];
-            switch (argName) {
-            case "-receivers":
-                receivers = Lists.newArrayList(StringUtils.split(args[++i], ";"));
-                break;
-            case "-host":
-                host = args[++i];
-                break;
-            case "-tableName":
-                tableName = args[++i];
-                break;
-            case "-authorization":
-                authorization = args[++i];
-                break;
-            case "-cubeName":
-                cubeName = args[++i];
-                break;
-            case "-projectName":
-                projectName = args[++i];
-                break;
-            default:
-                throw new RuntimeException("invalid argName:" + argName);
-            }
-            i++;
-        }
-        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
-        final StreamingMonitor streamingMonitor = new StreamingMonitor();
-        if (tableName != null) {
-            logger.info(String.format("check query tableName:%s host:%s receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
-            Preconditions.checkNotNull(host);
-            Preconditions.checkNotNull(authorization);
-            Preconditions.checkNotNull(tableName);
-            streamingMonitor.checkCountAll(receivers, host, authorization, projectName, tableName);
-        }
-        if (cubeName != null) {
-            logger.info(String.format("check cube cubeName:%s receivers:%s", cubeName, StringUtils.join(receivers, ";")));
-            streamingMonitor.checkCube(receivers, cubeName,host);
-        }
-        System.exit(0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
deleted file mode 100644
index e23f065..0000000
--- a/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamingMonitor.class);
-
-    public void checkCountAll(List<String> receivers, String host, String authorization, String projectName, String tableName) {
-        String title = "checkCountAll job(host:" + host + " tableName:" + tableName + ") ";
-        StringBuilder stringBuilder = new StringBuilder();
-        String url = host + "/kylin/api/query";
-        PostMethod request = new PostMethod(url);
-        try {
-
-            request.addRequestHeader("Authorization", "Basic " + authorization);
-            request.addRequestHeader("Content-Type", "application/json");
-            String query = String.format("{\"sql\":\"select count(*) from %s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", tableName, projectName);
-            request.setRequestEntity(new ByteArrayRequestEntity(query.getBytes()));
-
-            int statusCode = new HttpClient().executeMethod(request);
-            String msg = Bytes.toString(request.getResponseBody());
-            stringBuilder.append("host:").append(host).append("\n");
-            stringBuilder.append("query:").append(query).append("\n");
-            stringBuilder.append("statusCode:").append(statusCode).append("\n");
-            if (statusCode == 200) {
-                title += "succeed";
-                final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
-                stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
-                stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
-            } else {
-                title += "failed";
-                stringBuilder.append("response:").append(msg).append("\n");
-            }
-        } catch (Exception e) {
-            final StringWriter out = new StringWriter();
-            e.printStackTrace(new PrintWriter(out));
-            title += "failed";
-            stringBuilder.append(out.toString());
-        } finally {
-            request.releaseConnection();
-        }
-        logger.info("title:" + title);
-        logger.info("content:" + stringBuilder.toString());
-        sendMail(receivers, title, stringBuilder.toString());
-    }
-
-    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<Long, Long>> gaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
-                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
-            }
-        }
-        return gaps;
-    }
-
-    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        Preconditions.checkNotNull(cube);
-        final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
-        logger.info("totally " + segments.size() + " cubeSegments");
-        Collections.sort(segments);
-        return segments;
-    }
-
-    public static final List<Pair<String, String>> findOverlaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else {
-                overlaps.add(Pair.newPair(first.getName(), second.getName()));
-            }
-        }
-        return overlaps;
-    }
-
-    public void checkCube(List<String> receivers, String cubeName, String host) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        if (cube == null) {
-            logger.info("cube:" + cubeName + " does not exist");
-            return;
-        }
-        List<Pair<Long, Long>> gaps = findGaps(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        StringBuilder content = new StringBuilder();
-        if (!gaps.isEmpty()) {
-            content.append("all gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new Function<Pair<Long, Long>, String>() {
-                @Nullable
-                @Override
-                public String apply(Pair<Long, Long> input) {
-                    return parseInterval(input);
-                }
-            }), "\n")).append("\n");
-        }
-        if (!overlaps.isEmpty()) {
-            content.append("all overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
-        }
-        if (content.length() > 0) {
-            logger.info(content.toString());
-            sendMail(receivers, String.format("%s has gaps or overlaps on host %s", cubeName, host), content.toString());
-        } else {
-            logger.info("no gaps or overlaps");
-        }
-    }
-
-    private String parseInterval(Pair<Long, Long> interval) {
-        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new Date(interval.getFirst()).toString(), interval.getSecond(), new Date(interval.getSecond()).toString());
-    }
-
-    private void sendMail(List<String> receivers, String title, String content) {
-        final MailService mailService = new MailService(KylinConfig.getInstanceFromEnv());
-        mailService.sendMail(receivers, title, content, false);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4456bb1d/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
deleted file mode 100644
index 029d4d2..0000000
--- a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-/**
- */
-public class BootstrapConfig {
-
-    private String streaming;
-    private int partitionId = -1;
-
-    //one off default value set to true
-    private boolean oneOff = true;
-    private long start = 0L;
-    private long end = 0L; 
-    private long margin = 0L;
-    
-
-    private boolean fillGap;
-
-    public long getMargin() {
-        return margin;
-    }
-
-    public void setMargin(long margin) {
-        this.margin = margin;
-    }
-
-    public boolean isOneOff() {
-        return oneOff;
-    }
-
-    public void setOneOff(boolean oneOff) {
-        this.oneOff = oneOff;
-    }
-
-    public long getStart() {
-        return start;
-    }
-
-    public void setStart(long start) {
-        this.start = start;
-    }
-
-    public long getEnd() {
-        return end;
-    }
-
-    public void setEnd(long end) {
-        this.end = end;
-    }
-
-    public String getStreaming() {
-        return streaming;
-    }
-
-    public void setStreaming(String streaming) {
-        this.streaming = streaming;
-    }
-
-    public int getPartitionId() {
-        return partitionId;
-    }
-
-    public void setPartitionId(int partitionId) {
-        this.partitionId = partitionId;
-    }
-
-    public boolean isFillGap() {
-        return fillGap;
-    }
-
-    public void setFillGap(boolean fillGap) {
-        this.fillGap = fillGap;
-    }
-}