You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/01/14 15:15:56 UTC

[21/51] [partial] incubator-kylin git commit: cleanup for migration from github.com

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobEngine.java b/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
deleted file mode 100644
index b86fe36..0000000
--- a/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.engine;
-
-/** 
- * @author George Song (ysong1), xduo
- * 
- */
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.kylinolap.common.KylinConfig;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.math.stat.descriptive.rank.Percentile;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.framework.recipes.locks.InterProcessMutex;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.quartz.Scheduler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.exception.JobException;
-
-public class JobEngine implements ConnectionStateListener {
-
-    private static Logger log = LoggerFactory.getLogger(JobEngine.class);
-
-    private final String engineID;
-
-    private final JobEngineConfig engineConfig;
-    private final QuatzScheduler scheduler;
-    private InterProcessMutex sharedLock;
-    private CuratorFramework zkClient;
-
-    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-    private static final ConcurrentHashMap<JobEngineConfig, JobEngine> CACHE = new ConcurrentHashMap<JobEngineConfig, JobEngine>();
-    private int daemonJobIntervalInSeconds;
-
-    public static JobEngine getInstance(String engineID, JobEngineConfig engineCfg) throws JobException {
-        JobEngine r = CACHE.get(engineCfg);
-        if (r == null) {
-            r = new JobEngine(engineID, engineCfg);
-            CACHE.putIfAbsent(engineCfg, r);
-        }
-        return r;
-    }
-
-    private JobEngine(String engineID, JobEngineConfig context) throws JobException {
-        if (context.getZookeeperString() == null || context.getZookeeperString().equals("")) {
-            throw new IllegalArgumentException("Zookeeper connection string is null or empty");
-        }
-        log.info("Using metadata url: " + context.getConfig());
-
-        this.engineID = engineID;
-        this.engineConfig = context;
-        this.scheduler = new QuatzScheduler();
-
-        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
-        this.zkClient = CuratorFrameworkFactory.newClient(context.getZookeeperString(), retryPolicy);
-        this.zkClient.start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            public void run() {
-                log.debug("Closing HBASE connection");
-                releaseLock();
-            }
-        });
-    }
-
-    private void releaseLock() {
-        try {
-            if (sharedLock != null && sharedLock.isAcquiredInThisProcess()) {
-                sharedLock.release();
-            }
-            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
-                // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
-                if (zkClient.checkExists().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID) != null) {
-                    zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void start(int daemonJobIntervalInSeconds) throws Exception {
-        this.daemonJobIntervalInSeconds = daemonJobIntervalInSeconds;
-
-        sharedLock = new InterProcessMutex(zkClient, ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
-        log.info("Trying to obtain the shared lock...");
-        // current thread will be blocked until the lock is got
-        sharedLock.acquire();
-
-        log.info("Obtained the shared lock. Starting job scheduler...");
-        zkClient.setData().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID, Bytes.toBytes(this.engineID));
-        startScheduler();
-    }
-
-    private void startScheduler() throws JobException, IOException {
-        String logDir = KylinConfig.getInstanceFromEnv().getKylinJobLogDir();
-        new File(logDir).mkdirs();
-
-        log.info("Starting scheduler.");
-        this.scheduler.start();
-        this.scheduler.scheduleFetcher(this.daemonJobIntervalInSeconds, this.engineConfig);
-    }
-
-    public void start() throws Exception {
-        start(JobConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS);
-    }
-
-    public void stop() throws JobException {
-        releaseLock();
-        this.scheduler.stop();
-    }
-
-    @Override
-    public void stateChanged(CuratorFramework client, ConnectionState newState) {
-        if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
-            releaseLock();
-        }
-    }
-
-    public void interruptJob(JobInstance jobInstance, JobStep jobStep) throws IOException, JobException {
-        // kill the running step
-        this.scheduler.interrupt(jobInstance, jobStep);
-    }
-
-    public Scheduler getScheduler() {
-        return this.scheduler.getScheduler();
-    }
-
-    // Job engine metrics related methods
-
-    // <StepID, Duration Seconds>
-    public static ConcurrentHashMap<String, Double> JOB_DURATION = new ConcurrentHashMap<String, Double>();
-
-    public int getNumberOfJobStepsExecuted() {
-        return JOB_DURATION.values().size();
-    }
-
-    public String getPrimaryEngineID() throws Exception {
-        byte[] data = zkClient.getData().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
-        if (data == null) {
-            return "";
-        } else {
-            return Bytes.toString(data);
-        }
-    }
-
-    public double getMinJobStepDuration() {
-        double[] all = getJobStepDuration();
-        Arrays.sort(all);
-
-        if (all.length > 0) {
-            return all[0];
-        } else {
-            return 0;
-        }
-    }
-
-    private double[] getJobStepDuration() {
-        Collection<Double> values = JOB_DURATION.values();
-        Double[] all = (Double[]) values.toArray(new Double[values.size()]);
-        return ArrayUtils.toPrimitive(all);
-    }
-
-    public double getMaxJobStepDuration() {
-        double[] all = getJobStepDuration();
-        Arrays.sort(all);
-
-        if (all.length > 1) {
-            return all[all.length - 1];
-        } else {
-            return 0;
-        }
-    }
-
-    public double getPercentileJobStepDuration(double percentile) {
-        Collection<Double> values = JOB_DURATION.values();
-        Double[] all = (Double[]) values.toArray(new Double[values.size()]);
-        Percentile p = new Percentile(percentile);
-        return p.evaluate(ArrayUtils.toPrimitive(all));
-    }
-
-    public Integer getScheduledJobsSzie() {
-        return scheduler.getScheduledJobs();
-    }
-
-    public int getEngineThreadPoolSize() {
-        return scheduler.getThreadPoolSize();
-    }
-
-    public int getNumberOfIdleSlots() {
-        return scheduler.getIdleSlots();
-    }
-
-    public int getNumberOfJobStepsRunning() {
-        return scheduler.getRunningJobs();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java b/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
deleted file mode 100644
index 5527cf6..0000000
--- a/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.engine;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.tools.OptionsHelper;
-import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
-
-/**
- * @author ysong1
- */
-public class JobEngineConfig {
-    private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
-    public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
-
-    private String getHadoopJobConfFilePath(CubeCapacity capaticy, boolean appendSuffix) throws IOException {
-        String hadoopJobConfFile;
-        if (appendSuffix)
-            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
-        else
-            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
-
-        String path = System.getProperty(KylinConfig.KYLIN_CONF);
-
-        if (path == null) {
-            path = System.getenv(KylinConfig.KYLIN_CONF);
-        }
-
-        if (path != null) {
-            path = path + File.separator + hadoopJobConfFile;
-        }
-
-        if (null == path || !new File(path).exists()) {
-            File defaultFilePath = new File("/etc/kylin/" + hadoopJobConfFile);
-
-            if (defaultFilePath.exists()) {
-                path = defaultFilePath.getAbsolutePath();
-            } else {
-                logger.debug("Search conf file " + hadoopJobConfFile + "  from classpath ...");
-                InputStream is = JobEngineConfig.class.getClassLoader().getResourceAsStream(hadoopJobConfFile);
-                if (is == null) {
-                    logger.debug("Can't get " + hadoopJobConfFile + " from classpath");
-                    logger.debug("No " + hadoopJobConfFile + " file were found");
-                } else {
-                    File tmp = File.createTempFile(HADOOP_JOB_CONF_FILENAME, ".xml");
-                    inputStreamToFile(is, tmp);
-                    path = tmp.getAbsolutePath();
-                }
-            }
-        }
-
-        if (null == path || !new File(path).exists()) {
-            return "";
-        }
-
-        return OptionsHelper.convertToFileURL(path);
-    }
-
-    public String getHadoopJobConfFilePath(CubeCapacity capaticy) throws IOException {
-        String path = getHadoopJobConfFilePath(capaticy, true);
-        if (!StringUtils.isEmpty(path)) {
-            logger.info("Chosen job conf is : " + path);
-            return path;
-        } else {
-            path = getHadoopJobConfFilePath(capaticy, false);
-            if (!StringUtils.isEmpty(path)) {
-                logger.info("Chosen job conf is : " + path);
-                return path;
-            }
-        }
-        return "";
-    }
-
-    private void inputStreamToFile(InputStream ins, File file) throws IOException {
-        OutputStream os = new FileOutputStream(file);
-        int bytesRead = 0;
-        byte[] buffer = new byte[8192];
-        while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
-            os.write(buffer, 0, bytesRead);
-        }
-        os.close();
-        ins.close();
-    }
-
-    // there should be no setters
-    private final KylinConfig config;
-
-    public JobEngineConfig(KylinConfig config) {
-        this.config = config;
-    }
-
-    public KylinConfig getConfig() {
-        return config;
-    }
-
-    public String getHdfsWorkingDirectory() {
-        return config.getHdfsWorkingDirectory();
-    }
-
-    /**
-     * @return the kylinJobJarPath
-     */
-    public String getKylinJobJarPath() {
-        return config.getKylinJobJarPath();
-    }
-
-    /**
-     * @return the runAsRemoteCommand
-     */
-    public boolean isRunAsRemoteCommand() {
-        return config.getRunAsRemoteCommand();
-    }
-
-    /**
-     * @return the zookeeperString
-     */
-    public String getZookeeperString() {
-        return config.getZookeeperString();
-    }
-
-    /**
-     * @return the remoteHadoopCliHostname
-     */
-    public String getRemoteHadoopCliHostname() {
-        return config.getRemoteHadoopCliHostname();
-    }
-
-    /**
-     * @return the remoteHadoopCliUsername
-     */
-    public String getRemoteHadoopCliUsername() {
-        return config.getRemoteHadoopCliUsername();
-    }
-
-    /**
-     * @return the remoteHadoopCliPassword
-     */
-    public String getRemoteHadoopCliPassword() {
-        return config.getRemoteHadoopCliPassword();
-    }
-    
-    public String getMapReduceCmdExtraArgs() {
-        return config.getMapReduceCmdExtraArgs();
-    }
-
-    /**
-     * @return the yarnStatusServiceUrl
-     */
-    public String getYarnStatusServiceUrl() {
-        return config.getYarnStatusServiceUrl();
-    }
-
-    /**
-     * @return the maxConcurrentJobLimit
-     */
-    public int getMaxConcurrentJobLimit() {
-        return config.getMaxConcurrentJobLimit();
-    }
-
-    /**
-     * @return the timeZone
-     */
-    public String getTimeZone() {
-        return config.getTimeZone();
-    }
-
-    /**
-     * @return the adminDls
-     */
-    public String getAdminDls() {
-        return config.getAdminDls();
-    }
-
-    /**
-     * @return the jobStepTimeout
-     */
-    public long getJobStepTimeout() {
-        return config.getJobStepTimeout();
-    }
-
-    /**
-     * @return the asyncJobCheckInterval
-     */
-    public int getAsyncJobCheckInterval() {
-        return config.getYarnStatusCheckIntervalSeconds();
-    }
-
-    /**
-     * @return the flatTableByHive
-     */
-    public boolean isFlatTableByHive() {
-        return config.getFlatTableByHive();
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Object#hashCode()
-     */
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((config == null) ? 0 : config.hashCode());
-        return result;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see java.lang.Object#equals(java.lang.Object)
-     */
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        JobEngineConfig other = (JobEngineConfig) obj;
-        if (config == null) {
-            if (other.config != null)
-                return false;
-        } else if (!config.equals(other.config))
-            return false;
-        return true;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java b/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
deleted file mode 100644
index 2389f27..0000000
--- a/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.engine;
-
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.quartz.Job;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.util.StringSplitter;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStatusEnum;
-import com.kylinolap.job.flow.JobFlow;
-
-/**
- * @author ysong1, xduo
- * 
- */
-public class JobFetcher implements Job {
-
-    private static final Logger log = LoggerFactory.getLogger(JobFetcher.class);
-
-    public static final int JOB_THRESHOLD = 10;
-
-    @Override
-    public void execute(JobExecutionContext context) throws JobExecutionException {
-
-        JobEngineConfig engineConfig = (JobEngineConfig) context.getJobDetail().getJobDataMap().get(JobConstants.PROP_ENGINE_CONTEXT);
-
-        JobDAO jobDAO = JobDAO.getInstance(engineConfig.getConfig());
-
-        try {
-            // get all pending jobs
-            List<JobInstance> pendingJobList = jobDAO.listAllJobs(JobStatusEnum.PENDING);
-
-            log.debug(pendingJobList.size() + " pending jobs");
-            int leftJobs = JOB_THRESHOLD;
-            Random rand = new Random();
-            int maxConcurrentJobCount = engineConfig.getMaxConcurrentJobLimit();
-
-            for (JobInstance jobInstance : pendingJobList) {
-                @SuppressWarnings("unchecked")
-                ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) context.getScheduler().getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
-
-                if (jobFlows.size() >= maxConcurrentJobCount) {
-                    // If too many job instances in current job context, just
-                    // wait.
-                    break;
-                }
-
-                try {
-                    // there should be only 1 job for a certain job running
-                    boolean cubeHasRunningJob = false;
-                    for (String s : jobFlows.keySet()) {
-                        String[] tmp = StringSplitter.split(s, ".");
-                        String cubename = tmp[0];
-                        String jobid = tmp[1];
-                        if (cubename.equals(jobInstance.getRelatedCube())) {
-                            log.info("There is already a job of cube " + jobInstance.getRelatedCube() + " running, job uuid is " + jobid);
-                            cubeHasRunningJob = true;
-                            break;
-                        }
-                    }
-
-                    if (cubeHasRunningJob == false && jobFlows.containsKey(JobInstance.getJobIdentity(jobInstance)) == false) {
-                        // create job flow
-                        JobFlow jobFlow = new JobFlow(jobInstance, engineConfig);
-                        jobFlows.put(JobInstance.getJobIdentity(jobInstance), jobFlow);
-
-                        // schedule the 1st step
-                        Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
-                        JobDetail firstStep = jobFlow.getFirst();
-                        context.getScheduler().scheduleJob(firstStep, trigger);
-
-                        log.info("Job " + jobInstance.getUuid() + " has been scheduled with the first step " + firstStep.getKey().toString());
-                    }
-                } catch (Exception e) {
-                    log.error("Failed to trigger the job detail", e);
-                }
-
-                if (--leftJobs < 0) {
-                    log.info("Too many pending jobs!");
-                    break;
-                }
-                long ms = Math.abs(rand.nextLong() % 10L);
-                Thread.sleep(ms * 1000L);
-            }
-        } catch (Throwable t) {
-            log.error(t.getMessage());
-            throw new JobExecutionException(t);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java b/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
deleted file mode 100644
index f080c37..0000000
--- a/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.engine;
-
-import java.io.IOException;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.quartz.JobBuilder;
-import org.quartz.JobDetail;
-import org.quartz.JobKey;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
-import org.quartz.SimpleScheduleBuilder;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-import org.quartz.UnableToInterruptJobException;
-import org.quartz.impl.StdSchedulerFactory;
-import org.quartz.impl.matchers.GroupMatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.cmd.IJobCommand;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.exception.JobException;
-import com.kylinolap.job.flow.JobFlow;
-import com.kylinolap.job.flow.JobFlowListener;
-
-/**
- * @author xduo
- * 
- */
-public class QuatzScheduler {
-
-    private static Logger log = LoggerFactory.getLogger(QuatzScheduler.class);
-
-    private Scheduler scheduler;
-    private JobFlowListener globalJobListener;
-
-    // public static void scheduleJobFlow(Scheduler scheduler, JobFlow jobFlow)
-    // throws JobException {
-    // // schedule the 1st step
-    // Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
-    // JobDetail firstStep = jobFlow.getFirst();
-    // try {
-    // scheduler.scheduleJob(firstStep, trigger);
-    // } catch (SchedulerException e) {
-    // throw new JobException(e);
-    // }
-    // }
-
-    public QuatzScheduler() throws JobException {
-        this.globalJobListener = new JobFlowListener(JobConstants.GLOBAL_LISTENER_NAME);
-        StdSchedulerFactory sf = new StdSchedulerFactory();
-        Properties schedulerProperties = new Properties();
-        int numberOfProcessors = Runtime.getRuntime().availableProcessors();
-        schedulerProperties.setProperty("org.quartz.threadPool.threadCount", String.valueOf(numberOfProcessors));
-        schedulerProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");
-
-        try {
-            sf.initialize(schedulerProperties);
-            this.scheduler = sf.getScheduler();
-            this.scheduler.getListenerManager().addJobListener(this.globalJobListener, GroupMatcher.jobGroupEquals(JobConstants.CUBE_JOB_GROUP_NAME));
-
-            // cubename.jobUuid -> job flow
-            this.scheduler.getContext().put(JobConstants.PROP_JOB_RUNTIME_FLOWS, new ConcurrentHashMap<String, JobFlow>());
-
-            // put the scheduler in standby mode first
-            this.scheduler.standby();
-        } catch (SchedulerException e) {
-            throw new JobException(e);
-        }
-    }
-
-    public void start() throws JobException {
-        try {
-            this.scheduler.start();
-        } catch (SchedulerException e) {
-            throw new JobException(e);
-        }
-    }
-
-    public void scheduleFetcher(int intervalInSeconds, JobEngineConfig engineConfig) throws JobException {
-        JobDetail job = JobBuilder.newJob(JobFetcher.class).withIdentity(JobFetcher.class.getCanonicalName(), JobConstants.DAEMON_JOB_GROUP_NAME).build();
-        job.getJobDataMap().put(JobConstants.PROP_ENGINE_CONTEXT, engineConfig);
-
-        Trigger trigger = TriggerBuilder.newTrigger().startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(intervalInSeconds).repeatForever()).build();
-
-        try {
-            this.scheduler.scheduleJob(job, trigger);
-        } catch (SchedulerException e) {
-            throw new JobException(e);
-        }
-    }
-
-    public boolean interrupt(JobInstance jobInstance, JobStep jobStep) throws JobException, IOException {
-        JobKey jobKey = new JobKey(JobInstance.getStepIdentity(jobInstance, jobStep), JobConstants.CUBE_JOB_GROUP_NAME);
-
-        boolean res = false;
-        try {
-            JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
-
-            IJobCommand iJobStepCmd = (IJobCommand) jobDetail.getJobDataMap().get(JobConstants.PROP_JOB_CMD_EXECUTOR);
-            if (null != iJobStepCmd) {
-                iJobStepCmd.cancel();
-            }
-
-            jobDetail.getJobDataMap().put(JobConstants.PROP_JOB_KILLED, true);
-            this.scheduler.addJob(jobDetail, true, true);
-
-            @SuppressWarnings("unchecked")
-            ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) this.scheduler.getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
-            jobFlows.remove(JobInstance.getJobIdentity(jobInstance));
-        } catch (UnableToInterruptJobException e) {
-            log.error(e.getLocalizedMessage(), e);
-            throw new JobException(e);
-        } catch (SchedulerException e) {
-            log.error(e.getLocalizedMessage(), e);
-            throw new JobException(e);
-        }
-        return res;
-    }
-
-    public void stop() throws JobException {
-        try {
-            this.scheduler.standby();
-        } catch (SchedulerException e) {
-            throw new JobException(e);
-        }
-    }
-
-    public Scheduler getScheduler() {
-        return this.scheduler;
-    }
-
-    // // metrics
-
-    public int getThreadPoolSize() {
-        try {
-            return scheduler.getMetaData().getThreadPoolSize();
-        } catch (SchedulerException e) {
-            log.error("Can't get scheduler metadata!", e);
-            return 0;
-        }
-    }
-
-    public int getRunningJobs() {
-        try {
-            return this.scheduler.getCurrentlyExecutingJobs().size();
-        } catch (SchedulerException e) {
-            log.error("Can't get scheduler metadata!", e);
-            return 0;
-        }
-    }
-
-    public int getIdleSlots() {
-        try {
-            return this.scheduler.getMetaData().getThreadPoolSize() - this.scheduler.getCurrentlyExecutingJobs().size();
-        } catch (SchedulerException e) {
-            log.error("Can't get scheduler metadata!", e);
-            return 0;
-        }
-    }
-
-    public int getScheduledJobs() {
-        int allTriggersCount = 0;
-        try {
-            for (String groupName : scheduler.getJobGroupNames()) {
-                allTriggersCount += scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName)).size();
-            }
-        } catch (SchedulerException e) {
-            log.error("Can't get scheduler metadata!", e);
-        }
-        return allTriggersCount;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java b/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
deleted file mode 100644
index 1b404cb..0000000
--- a/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.exception;
-
-/**
- * @author ysong1
- * 
- */
-public class InvalidJobInstanceException extends Exception {
-
-    private static final long serialVersionUID = 2045169570038227895L;
-
-    public InvalidJobInstanceException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java b/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
deleted file mode 100644
index d1050d0..0000000
--- a/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.exception;
-
-/**
- * @author xduo
- * 
- */
-public class InvalidJobStatusException extends Exception {
-
-    /**
-     * @param string
-     */
-    public InvalidJobStatusException(String string) {
-    }
-
-    private static final long serialVersionUID = -8549756520626114000L;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/JobException.java b/job/src/main/java/com/kylinolap/job/exception/JobException.java
deleted file mode 100644
index 6c4bc6a..0000000
--- a/job/src/main/java/com/kylinolap/job/exception/JobException.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.exception;
-
-/**
- * @author xduo
- * 
- */
-public class JobException extends Exception {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * 
-     */
-    public JobException() {
-        super();
-    }
-
-    /**
-     * @param message
-     * @param cause
-     */
-    public JobException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    /**
-     * @param message
-     */
-    public JobException(String message) {
-        super(message);
-    }
-
-    /**
-     * @param cause
-     */
-    public JobException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java b/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
deleted file mode 100644
index 1b8777e..0000000
--- a/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.flow;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.quartz.DateBuilder;
-import org.quartz.DateBuilder.IntervalUnit;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.cmd.ICommandOutput;
-import com.kylinolap.job.cmd.JobCommandFactory;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.exception.JobException;
-
-/**
- * @author xduo
- * 
- */
-public class AsyncJobFlowNode extends JobFlowNode {
-
-    @Override
-    public void execute(JobExecutionContext context) throws JobExecutionException {
-        this.currentJobDetail = context.getJobDetail();
-        JobDataMap data = this.currentJobDetail.getJobDataMap();
-        JobFlow jobFlow = (JobFlow) data.get(JobConstants.PROP_JOB_FLOW);
-        JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
-        KylinConfig config = engineConfig.getConfig();
-        String jobInstanceID = data.getString(JobConstants.PROP_JOBINSTANCE_UUID);
-        int jobStepID = data.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
-        ICommandOutput output = (ICommandOutput) data.get(JobConstants.PROP_JOB_CMD_OUTPUT);
-
-        try {
-            if (data.getBoolean(JobConstants.PROP_JOB_KILLED)) {
-                log.info(this.currentJobDetail.getKey() + " is killed");
-                return;
-            }
-
-            if (output == null) {
-                JobInstance jobInstance = updateJobStep(jobInstanceID, jobStepID, config, JobStepStatusEnum.RUNNING, System.currentTimeMillis(), null, null);
-
-                String command = data.getString(JobConstants.PROP_COMMAND);
-                jobCmd = JobCommandFactory.getJobCommand(command, jobInstance, jobStepID, engineConfig);
-                output = jobCmd.execute();
-                data.put(JobConstants.PROP_JOB_CMD_OUTPUT, output);
-                data.put(JobConstants.PROP_JOB_CMD_EXECUTOR, jobCmd);
-                context.getScheduler().addJob(this.currentJobDetail, true, true);
-
-                JobStepStatusEnum stepStatus = output.getStatus();
-                updateJobStep(jobInstanceID, jobStepID, config, stepStatus, null, stepStatus.isComplete() ? System.currentTimeMillis() : null, output.getOutput());
-
-                context.setResult(output.getExitCode());
-                scheduleStatusChecker(context);
-                log.debug("Start async job " + currentJobDetail.getKey());
-            } else {
-                JobInstance jobInstance = JobDAO.getInstance(engineConfig.getConfig()).getJob(jobInstanceID);
-                JobStep jobStep = jobInstance.getSteps().get(jobStepID);
-
-                log.debug("Start to check hadoop job status of " + currentJobDetail.getKey());
-                JobStepStatusEnum stepStatus = output.getStatus();
-
-                if ((System.currentTimeMillis() - jobStep.getExecStartTime()) / 1000 >= engineConfig.getJobStepTimeout()) {
-                    throw new JobException("Job step " + jobStep.getName() + " timeout.");
-                }
-
-                updateJobStep(jobInstance.getUuid(), jobStepID, config, stepStatus, null, stepStatus.isComplete() ? System.currentTimeMillis() : null, output.getOutput());
-
-                if (!stepStatus.isComplete()) {
-                    scheduleStatusChecker(context);
-                }
-
-                context.setResult(0);
-                log.debug("Status of async job " + currentJobDetail.getKey() + ":" + stepStatus);
-            }
-        } catch (Throwable t) {
-            handleException(jobInstanceID, jobStepID, config, t);
-        }
-
-    }
-
-    private void scheduleStatusChecker(JobExecutionContext context) throws SchedulerException {
-        JobDataMap jobDataMap = this.currentJobDetail.getJobDataMap();
-        JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
-        JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
-        int interval = engineConfig.getAsyncJobCheckInterval();
-        log.debug("Trigger a status check job in " + interval + " seconds for job " + currentJobDetail.getKey());
-
-        Trigger trigger = TriggerBuilder.newTrigger().startAt(DateBuilder.futureDate(interval, IntervalUnit.SECOND)).build();
-        Set<Trigger> triggers = new HashSet<Trigger>();
-        triggers.add(trigger);
-        context.getScheduler().scheduleJob(currentJobDetail, triggers, true);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlow.java b/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
deleted file mode 100644
index 9a58107..0000000
--- a/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.flow;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.quartz.JobBuilder;
-import org.quartz.JobDetail;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStepCmdTypeEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-
-/**
- * @author xduo
- */
-public class JobFlow {
-
-    private static Logger log = LoggerFactory.getLogger(JobFlow.class);
-
-    private final List<JobDetail> flowNodes;
-    private final JobInstance jobInstance;
-    private final JobEngineConfig engineConfig;
-
-    public JobFlow(JobInstance job, JobEngineConfig context) {
-        this.engineConfig = context;
-        this.jobInstance = job;
-        // validate job instance
-        List<JobStep> sortedSteps = jobInstance.getSteps();
-
-        if (sortedSteps == null || sortedSteps.size() == 0) {
-            throw new IllegalStateException("Steps of job " + jobInstance.getUuid() + " is null or empty!");
-        }
-
-        // sort the steps by step_sequenceID
-        Collections.sort(sortedSteps);
-        // find the 1st runnable job
-        int firstStepIndex = findFirstStep(sortedSteps);
-
-        log.info("Job " + jobInstance.getUuid() + " will be started at step " + firstStepIndex + " (sequence number)");
-
-        flowNodes = new LinkedList<JobDetail>();
-        for (int i = firstStepIndex; i < sortedSteps.size(); i++) {
-            JobDetail node = createJobFlowNode(jobInstance, i);
-            flowNodes.add(node);
-        }
-    }
-
-    public JobInstance getJobInstance() {
-        return jobInstance;
-    }
-
-    public JobEngineConfig getJobengineConfig() {
-        return engineConfig;
-    }
-
-    public JobDetail getFirst() {
-        if (flowNodes.isEmpty()) {
-            return null;
-        }
-        return flowNodes.get(0);
-    }
-
-    public JobDetail getNext(JobDetail jobFlowNode) {
-        int targetIndex = -1;
-        for (int index = 0; index < flowNodes.size(); index++) {
-            if (flowNodes.get(index).equals(jobFlowNode)) {
-                targetIndex = index;
-            }
-        }
-
-        if (targetIndex != -1 && flowNodes.size() > targetIndex + 1) {
-            return flowNodes.get(targetIndex + 1);
-        }
-        return null;
-    }
-
-    private int findFirstStep(List<JobStep> stepList) {
-        int firstJobIndex = 0;
-        for (int i = 0; i < stepList.size(); i++) {
-            JobStep currentStep = stepList.get(i);
-            if (currentStep.getStatus().isRunable() == false) {
-                continue;
-            } else {
-                firstJobIndex = i;
-                break;
-            }
-        }
-        return firstJobIndex;
-    }
-
-    private JobDetail createJobFlowNode(final JobInstance jobInstance, final int stepSeqId) {
-        JobStep step = jobInstance.getSteps().get(stepSeqId);
-
-        if (jobInstance.getName() == null || step.getName() == null) {
-            throw new IllegalArgumentException("JobInstance name or JobStep name cannot be null!");
-        }
-
-        // submit job the different groups based on isRunAsync property
-        JobDetail jobFlowNode = JobBuilder.newJob(step.isRunAsync() ? AsyncJobFlowNode.class : JobFlowNode.class).withIdentity(JobInstance.getStepIdentity(jobInstance, step), JobConstants.CUBE_JOB_GROUP_NAME).storeDurably().build();
-
-        // add job flow to node
-        jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOB_FLOW, this);
-
-        // add command to flow node
-        String execCmd = (step.getCmdType() == JobStepCmdTypeEnum.SHELL_CMD || step.getCmdType() == JobStepCmdTypeEnum.SHELL_CMD_HADOOP) ? wrapExecCmd(jobInstance, step.getExecCmd(), String.valueOf(step.getSequenceID())) : step.getExecCmd();
-        jobFlowNode.getJobDataMap().put(JobConstants.PROP_COMMAND, execCmd);
-
-        // add job instance and step sequenceID to flow node
-        jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOBINSTANCE_UUID, jobInstance.getUuid());
-        jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOBSTEP_SEQ_ID, step.getSequenceID());
-
-        // add async flag to flow node
-        jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOB_ASYNC, step.isRunAsync());
-
-        return jobFlowNode;
-    }
-
-    private String wrapExecCmd(JobInstance job, String cmd, String suffix) {
-        if (StringUtils.isBlank(cmd))
-            return cmd;
-
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        try {
-            FileUtils.forceMkdir(new File(config.getKylinJobLogDir()));
-        } catch (IOException e) {
-            throw new RuntimeException("Create log dir " + config.getKylinJobLogDir() + " failed.", e);
-        }
-        String log = config.getKylinJobLogDir() + "/" + job.getUuid() + "_" + suffix + ".log";
-
-        String mkLogDir = "mkdir -p " + config.getKylinJobLogDir();
-        return mkLogDir + ";" + "set -o pipefail; " + cmd + " 2>&1 | tee " + log;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
deleted file mode 100644
index 8274d44..0000000
--- a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.flow;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.JobListener;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.MailService;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.exception.CubeIntegrityException;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStatusEnum;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-
-/**
- * Handle kylin job and cube change update.
- * 
- * @author George Song (ysong1), xduo
- * 
- */
-public class JobFlowListener implements JobListener {
-
-    private static Logger log = LoggerFactory.getLogger(JobFlowListener.class);
-
-    private String name;
-
-    public JobFlowListener(String name) {
-        if (name == null) {
-            throw new IllegalArgumentException("Listener name cannot be null!");
-        }
-        this.name = name;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
-        log.info(context.getJobDetail().getKey() + " was executed.");
-        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
-        JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
-        JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
-        String jobUuid = jobDataMap.getString(JobConstants.PROP_JOBINSTANCE_UUID);
-        int stepSeqID = jobDataMap.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
-        KylinConfig config = engineConfig.getConfig();
-
-        JobInstance jobInstance = null;
-        JobStep jobStep = null;
-        try {
-            jobInstance = JobDAO.getInstance(config).getJob(jobUuid);
-            jobStep = jobInstance.getSteps().get(stepSeqID);
-            CubeInstance cube = CubeManager.getInstance(config).getCube(jobInstance.getRelatedCube());
-
-            log.info(context.getJobDetail().getKey() + " status: " + jobStep.getStatus());
-            switch (jobStep.getStatus()) {
-            case FINISHED:
-                // Ensure we are using the latest metadata
-                CubeManager.getInstance(config).loadCubeCache(cube);
-                updateKylinJobOnSuccess(jobInstance, stepSeqID, engineConfig);
-                updateCubeSegmentInfoOnSucceed(jobInstance, engineConfig);
-                notifyUsers(jobInstance, engineConfig);
-                scheduleNextJob(context, jobInstance);
-                break;
-            case ERROR:
-                updateKylinJobStatus(jobInstance, stepSeqID, engineConfig);
-                notifyUsers(jobInstance, engineConfig);
-                break;
-            case DISCARDED:
-                // Ensure we are using the latest metadata
-                CubeManager.getInstance(config).loadCubeCache(cube);
-                updateCubeSegmentInfoOnDiscard(jobInstance, engineConfig);
-                notifyUsers(jobInstance, engineConfig);
-                break;
-            default:
-                break;
-            }
-        } catch (Exception e) {
-            log.error(e.getMessage(), e);
-            handleException(jobUuid, stepSeqID, config, e);
-        } finally {
-            if (null != jobInstance && jobInstance.getStatus().isComplete()) {
-                try {
-                    context.getScheduler().deleteJob(context.getJobDetail().getKey());
-                    @SuppressWarnings("unchecked")
-                    ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) context.getScheduler().getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
-                    jobFlows.remove(JobInstance.getJobIdentity(jobInstance));
-                } catch (SchedulerException e) {
-                    log.error(e.getMessage(), e);
-                }
-            }
-        }
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.quartz.JobListener#jobToBeExecuted(org.quartz.JobExecutionContext)
-     */
-    @Override
-    public void jobToBeExecuted(JobExecutionContext context) {
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.quartz.JobListener#jobExecutionVetoed(org.quartz.JobExecutionContext)
-     */
-    @Override
-    public void jobExecutionVetoed(JobExecutionContext context) {
-    }
-
-    /**
-     * @param context
-     * @param jobInstance
-     */
-    protected void scheduleNextJob(JobExecutionContext context, JobInstance jobInstance) {
-        try {
-            // schedule next job
-            JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
-            JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
-            JobDetail nextJob = (JobDetail) jobFlow.getNext(context.getJobDetail());
-            if (nextJob != null) {
-                try {
-                    Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
-                    log.debug("Job " + context.getJobDetail().getKey() + " will now chain to Job " + nextJob.getKey() + "");
-
-                    context.getScheduler().scheduleJob(nextJob, trigger);
-
-                } catch (SchedulerException se) {
-                    log.error("Error encountered during chaining to Job " + nextJob.getKey() + "", se);
-                }
-            }
-
-            context.getScheduler().deleteJob(context.getJobDetail().getKey());
-        } catch (SchedulerException e) {
-            log.error(e.getLocalizedMessage(), e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * @param jobInstance
-     * @param stepId
-     */
-    private void updateKylinJobStatus(JobInstance jobInstance, int stepId, JobEngineConfig engineConfig) {
-        validate(jobInstance);
-        List<JobStep> steps = jobInstance.getSteps();
-        Collections.sort(steps);
-
-        JobStep jobStep = jobInstance.getSteps().get(stepId);
-
-        long duration = jobStep.getExecEndTime() - jobStep.getExecStartTime();
-        jobInstance.setDuration(jobInstance.getDuration() + (duration > 0 ? duration : 0) / 1000);
-        jobInstance.setMrWaiting(jobInstance.getMrWaiting() + jobStep.getExecWaitTime());
-
-        try {
-            JobDAO.getInstance(engineConfig.getConfig()).updateJobInstance(jobInstance);
-        } catch (IOException e) {
-            e.printStackTrace();
-            log.error(e.getLocalizedMessage(), e);
-        }
-    }
-
-    private void updateKylinJobOnSuccess(JobInstance jobInstance, int stepId, JobEngineConfig engineConfig) {
-        validate(jobInstance);
-        List<JobStep> steps = jobInstance.getSteps();
-        Collections.sort(steps);
-
-        JobStep jobStep = jobInstance.getSteps().get(stepId);
-        jobInstance.setExecStartTime(steps.get(0).getExecStartTime());
-
-        long duration = jobStep.getExecEndTime() - jobStep.getExecStartTime();
-        jobInstance.setDuration(jobInstance.getDuration() + (duration > 0 ? duration / 1000 : 0));
-        jobInstance.setMrWaiting(jobInstance.getMrWaiting() + jobStep.getExecWaitTime());
-        if (jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
-            jobInstance.setExecEndTime(steps.get(steps.size() - 1).getExecEndTime());
-        }
-
-        try {
-            JobDAO.getInstance(engineConfig.getConfig()).updateJobInstance(jobInstance);
-        } catch (IOException e) {
-            e.printStackTrace();
-            log.error(e.getLocalizedMessage(), e);
-        }
-    }
-
-    private void updateCubeSegmentInfoOnDiscard(JobInstance jobInstance, JobEngineConfig engineConfig) throws IOException, CubeIntegrityException {
-        CubeManager cubeMgr = CubeManager.getInstance(engineConfig.getConfig());
-        CubeInstance cubeInstance = cubeMgr.getCube(jobInstance.getRelatedCube());
-        cubeMgr.updateSegmentOnJobDiscard(cubeInstance, jobInstance.getRelatedSegment());
-    }
-
-    private void updateCubeSegmentInfoOnSucceed(JobInstance jobInstance, JobEngineConfig engineConfig) throws CubeIntegrityException, IOException {
-        if (jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
-            validate(jobInstance);
-
-            log.info("Updating cube segment " + jobInstance.getRelatedSegment() + " for cube " + jobInstance.getRelatedCube());
-
-            long cubeSize = 0;
-            JobStep convertToHFileStep = jobInstance.findStep(JobConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
-            if (null != convertToHFileStep) {
-                String cubeSizeString = convertToHFileStep.getInfo(JobInstance.HDFS_BYTES_WRITTEN);
-                if (cubeSizeString == null || cubeSizeString.equals("")) {
-                    throw new RuntimeException("Can't get cube segment size.");
-                }
-                cubeSize = Long.parseLong(cubeSizeString) / 1024;
-            } else {
-                log.info("No step with name '" + JobConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE + "' is found");
-            }
-
-            CubeManager cubeMgr = CubeManager.getInstance(engineConfig.getConfig());
-            CubeInstance cubeInstance = cubeMgr.getCube(jobInstance.getRelatedCube());
-            CubeSegment newSegment = cubeInstance.getSegmentById(jobInstance.getUuid());
-
-            long sourceCount = 0;
-            long sourceSize = 0;
-            switch (jobInstance.getType()) {
-            case BUILD:
-                JobStep baseCuboidStep = jobInstance.findStep(JobConstants.STEP_NAME_BUILD_BASE_CUBOID);
-                if (null != baseCuboidStep) {
-                    String sourceRecordsCount = baseCuboidStep.getInfo(JobInstance.SOURCE_RECORDS_COUNT);
-                    if (sourceRecordsCount == null || sourceRecordsCount.equals("")) {
-                        throw new RuntimeException("Can't get cube source record count.");
-                    }
-                    sourceCount = Long.parseLong(sourceRecordsCount);
-                } else {
-                    log.info("No step with name '" + JobConstants.STEP_NAME_BUILD_BASE_CUBOID + "' is found");
-                }
-
-                JobStep createFlatTableStep = jobInstance.findStep(JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-                if (null != createFlatTableStep) {
-                    String sourceRecordsSize = createFlatTableStep.getInfo(JobInstance.SOURCE_RECORDS_SIZE);
-                    if (sourceRecordsSize == null || sourceRecordsSize.equals("")) {
-                        throw new RuntimeException("Can't get cube source record size.");
-                    }
-                    sourceSize = Long.parseLong(sourceRecordsSize);
-                } else {
-                    log.info("No step with name '" + JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE + "' is found");
-                }
-
-                if (cubeInstance.needMergeImmediatelyAfterBuild(newSegment)) {
-                    for (CubeSegment seg : cubeInstance.getSegment(CubeSegmentStatusEnum.READY)) {
-                        sourceCount += seg.getSourceRecords();
-                        sourceSize += seg.getSourceRecordsSize();
-                    }
-                }
-                break;
-            case MERGE:
-                for (CubeSegment seg : cubeInstance.getMergingSegments()) {
-                    sourceCount += seg.getSourceRecords();
-                    sourceSize += seg.getSourceRecordsSize();
-                }
-                break;
-            }
-
-            cubeMgr.updateSegmentOnJobSucceed(cubeInstance, jobInstance.getType(), jobInstance.getRelatedSegment(), jobInstance.getUuid(), jobInstance.getExecEndTime(), cubeSize, sourceCount, sourceSize);
-            log.info("Update cube segment succeed" + jobInstance.getRelatedSegment() + " for cube " + jobInstance.getRelatedCube());
-        }
-    }
-
-    private void validate(JobInstance jobInstance) {
-        List<JobStep> steps = jobInstance.getSteps();
-        if (steps == null || steps.size() == 0) {
-            throw new RuntimeException("Steps of job " + jobInstance.getUuid() + " is null or empty!");
-        }
-    }
-
-    private void handleException(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, Throwable t) {
-        log.error(t.getLocalizedMessage(), t);
-        String exceptionMsg = "Failed with Exception:" + ExceptionUtils.getFullStackTrace(t);
-        try {
-            JobInstance jobInstance = JobDAO.getInstance(config).getJob(jobInstanceUuid);
-            jobInstance.getSteps().get(jobInstanceStepSeqId).setStatus(JobStepStatusEnum.ERROR);
-            // String output =
-            // jobInstance.getSteps().get(jobInstanceStepSeqId).getCmdOutput();
-            // jobInstance.getSteps().get(jobInstanceStepSeqId).setCmdOutput(output
-            // + "\n" + exceptionMsg);
-            jobInstance.getSteps().get(jobInstanceStepSeqId).setExecEndTime(System.currentTimeMillis());
-            JobDAO.getInstance(config).updateJobInstance(jobInstance);
-
-            String output = JobDAO.getInstance(config).getJobOutput(jobInstanceUuid, jobInstanceStepSeqId).getOutput();
-            output = output + "\n" + exceptionMsg;
-            JobDAO.getInstance(config).saveJobOutput(jobInstanceUuid, jobInstanceStepSeqId, output);
-        } catch (IOException e1) {
-            log.error(e1.getLocalizedMessage(), e1);
-        }
-    }
-
-    /**
-     * @param jobInstance
-     */
-    protected void notifyUsers(JobInstance jobInstance, JobEngineConfig engineConfig) {
-        KylinConfig config = engineConfig.getConfig();
-        String cubeName = jobInstance.getRelatedCube();
-        CubeInstance cubeInstance = CubeManager.getInstance(config).getCube(cubeName);
-        String finalStatus = null;
-        String content = JobConstants.NOTIFY_EMAIL_TEMPLATE;
-        String logMsg = "";
-
-        switch (jobInstance.getStatus()) {
-        case FINISHED:
-            finalStatus = "SUCCESS";
-            break;
-        case ERROR:
-            for (JobStep step : jobInstance.getSteps()) {
-                if (step.getStatus() == JobStepStatusEnum.ERROR) {
-                    try {
-                        logMsg = JobDAO.getInstance(config).getJobOutput(step).getOutput();
-                    } catch (IOException e) {
-                        log.error(e.getLocalizedMessage(), e);
-                    }
-                }
-            }
-            finalStatus = "FAILED";
-            break;
-        case DISCARDED:
-            finalStatus = "DISCARDED";
-        default:
-            break;
-        }
-
-        if (null == finalStatus) {
-            return;
-        }
-
-        try {
-            InetAddress inetAddress = InetAddress.getLocalHost();
-            content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
-        } catch (UnknownHostException e) {
-            log.error(e.getLocalizedMessage(), e);
-        }
-
-        content = content.replaceAll("\\$\\{job_name\\}", jobInstance.getName());
-        content = content.replaceAll("\\$\\{result\\}", finalStatus);
-        content = content.replaceAll("\\$\\{cube_name\\}", cubeName);
-        content = content.replaceAll("\\$\\{start_time\\}", new Date(jobInstance.getExecStartTime()).toString());
-        content = content.replaceAll("\\$\\{duration\\}", jobInstance.getDuration() / 60 + "mins");
-        content = content.replaceAll("\\$\\{mr_waiting\\}", jobInstance.getMrWaiting() / 60 + "mins");
-        content = content.replaceAll("\\$\\{last_update_time\\}", new Date(jobInstance.getLastModified()).toString());
-        content = content.replaceAll("\\$\\{submitter\\}", jobInstance.getSubmitter());
-        content = content.replaceAll("\\$\\{error_log\\}", logMsg);
-
-        
-        MailService mailService = new MailService();
-        try {
-            List<String> users = new ArrayList<String>();
-
-            if (null != cubeInstance.getDescriptor().getNotifyList()) {
-                users.addAll(cubeInstance.getDescriptor().getNotifyList());
-            }
-
-            if (null != engineConfig.getAdminDls()) {
-                String[] adminDls = engineConfig.getAdminDls().split(",");
-
-                for (String adminDl : adminDls) {
-                    users.add(adminDl);
-                }
-            }
-
-            log.info("prepare to send email to:"+users);
-            
-            log.info("job name:"+jobInstance.getName());
-            
-            log.info("submitter:"+jobInstance.getSubmitter());
-            
-            if (users.size() > 0) {
-                log.info("notify list:"+users);
-                mailService.sendMail(users, "["+ finalStatus + "] - [Kylin Cube Build Job]-" + cubeName, content);
-                log.info("notified users:"+users);
-            }
-        } catch (IOException e) {
-            log.error(e.getLocalizedMessage(), e);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
deleted file mode 100644
index e539121..0000000
--- a/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.flow;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.cmd.ICommandOutput;
-import com.kylinolap.job.cmd.IJobCommand;
-import com.kylinolap.job.cmd.JobCommandFactory;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngine;
-import com.kylinolap.job.engine.JobEngineConfig;
-
-/**
- * @author xduo
- * 
- */
-public class JobFlowNode implements InterruptableJob {
-
-    protected static final Logger log = LoggerFactory.getLogger(JobFlowNode.class);
-
-    protected JobDetail currentJobDetail;
-    protected IJobCommand jobCmd;
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
-     */
-    @Override
-    public void execute(JobExecutionContext context) throws JobExecutionException {
-        this.currentJobDetail = context.getJobDetail();
-        JobDataMap data = this.currentJobDetail.getJobDataMap();
-        JobFlow jobFlow = (JobFlow) data.get(JobConstants.PROP_JOB_FLOW);
-        JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
-        String jobInstanceID = data.getString(JobConstants.PROP_JOBINSTANCE_UUID);
-        int jobStepID = data.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
-        String command = data.getString(JobConstants.PROP_COMMAND);
-        KylinConfig config = engineConfig.getConfig();
-
-        try {
-            JobInstance jobInstance = updateJobStep(jobInstanceID, jobStepID, config, JobStepStatusEnum.RUNNING, System.currentTimeMillis(), null, null);
-
-            jobCmd = JobCommandFactory.getJobCommand(command, jobInstance, jobStepID, engineConfig);
-            data.put(JobConstants.PROP_JOB_CMD_EXECUTOR, jobCmd);
-            context.getScheduler().addJob(this.currentJobDetail, true, true);
-
-            ICommandOutput output = jobCmd.execute();
-
-            if (data.getBoolean(JobConstants.PROP_JOB_KILLED)) {
-                return;
-            }
-
-            int exitCode = output.getExitCode();
-            updateJobStep(jobInstanceID, jobStepID, config, output.getStatus(), null, System.currentTimeMillis(), output.getOutput());
-            context.setResult(exitCode);
-
-            log.info("Job status for " + context.getJobDetail().getKey() + " has been updated.");
-            log.info("cmd:" + command);
-            log.info("output:" + output.getOutput());
-            log.info("exitCode:" + exitCode);
-
-        } catch (Throwable t) {
-            handleException(jobInstanceID, jobStepID, config, t);
-        }
-    }
-
-    @Override
-    public void interrupt() throws UnableToInterruptJobException {
-    }
-
-    protected JobInstance updateJobStep(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, JobStepStatusEnum newStatus, Long execStartTime, Long execEndTime, String output) throws IOException {
-        // set step status to running
-        JobInstance jobInstance = JobDAO.getInstance(config).getJob(jobInstanceUuid);
-        JobStep currentStep = null;
-
-        try {
-            currentStep = jobInstance.getSteps().get(jobInstanceStepSeqId);
-            JobStepStatusEnum currentStatus = currentStep.getStatus();
-            boolean hasChange = false;
-
-            if (null != execStartTime) {
-                hasChange = true;
-                currentStep.setExecStartTime(execStartTime);
-            }
-            if (null != execEndTime) {
-                hasChange = true;
-                currentStep.setExecEndTime(execEndTime);
-            }
-            if (null != output) {
-                hasChange = true;
-                // currentStep.setCmdOutput(output);
-                JobDAO.getInstance(config).saveJobOutput(currentStep, output);
-            }
-            if (JobStepStatusEnum.WAITING == currentStatus && (JobStepStatusEnum.RUNNING == newStatus || JobStepStatusEnum.FINISHED == newStatus)) {
-                hasChange = true;
-                currentStep.setExecWaitTime((System.currentTimeMillis() - currentStep.getExecStartTime()) / 1000);
-            }
-            if (null != newStatus) {
-                hasChange = true;
-                currentStep.setStatus(newStatus);
-            }
-
-            if (hasChange) {
-                JobDAO.getInstance(config).updateJobInstance(jobInstance);
-            }
-        } catch (IOException e) {
-            log.error(e.getLocalizedMessage(), e);
-        }
-
-        if (null != execEndTime) {
-            JobEngine.JOB_DURATION.put(JobInstance.getStepIdentity(jobInstance, currentStep) + " - " + String.valueOf(currentStep.getExecStartTime()), (double) (currentStep.getExecEndTime() - currentStep.getExecStartTime()) / 1000);
-        }
-
-        return jobInstance;
-    }
-
-    protected void handleException(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, Throwable t) {
-        log.error(t.getLocalizedMessage(), t);
-        String exceptionMsg = "Failed with Exception:" + ExceptionUtils.getFullStackTrace(t);
-        try {
-            JobDAO dao = JobDAO.getInstance(config);
-            JobInstance jobInstance = dao.getJob(jobInstanceUuid);
-            JobStep jobStep = jobInstance.getSteps().get(jobInstanceStepSeqId);
-            jobStep.setStatus(JobStepStatusEnum.ERROR);
-            jobStep.setExecEndTime(System.currentTimeMillis());
-            dao.updateJobInstance(jobInstance);
-
-            String output = dao.getJobOutput(jobInstanceUuid, jobInstanceStepSeqId).getOutput();
-            output = output + "\n" + exceptionMsg;
-            dao.saveJobOutput(jobInstanceUuid, jobInstanceStepSeqId, output);
-        } catch (IOException e1) {
-            log.error(e1.getLocalizedMessage(), e1);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
deleted file mode 100644
index 4cd59a9..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.hadoop;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.persistence.ResourceStore;
-import com.kylinolap.common.util.StringSplitter;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.exception.JobException;
-import com.kylinolap.job.tools.OptionsHelper;
-import com.kylinolap.metadata.model.schema.TableDesc;
-
-@SuppressWarnings("static-access")
-public abstract class AbstractHadoopJob extends Configured implements Tool {
-    protected static final Logger log = LoggerFactory.getLogger(AbstractHadoopJob.class);
-
-    protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
-    protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
-    protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
-    protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
-    protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
-    protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
-    protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
-    protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
-    protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
-    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
-    protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
-    protected static final Option OPTION_KEY_COLUMN_PERCENTAGE = OptionBuilder.withArgName("rowkey column percentage").hasArg().isRequired(true).withDescription("Percentage of row key columns").create("columnpercentage");
-    protected static final Option OPTION_KEY_SPLIT_NUMBER = OptionBuilder.withArgName("key split number").hasArg().isRequired(true).withDescription("Number of key split range").create("splitnumber");
-
-    protected String name;
-    protected String description;
-    protected boolean isAsync = false;
-    protected OptionsHelper optionsHelper = new OptionsHelper();
-
-    protected Job job;
-
-    protected void parseOptions(Options options, String[] args) throws ParseException {
-        optionsHelper.parseOptions(options, args);
-    }
-
-    public void printUsage(Options options) {
-        optionsHelper.printUsage(getClass().getSimpleName(), options);
-    }
-
-    public Option[] getOptions() {
-        return optionsHelper.getOptions();
-    }
-
-    public String getOptionsAsString() {
-        return optionsHelper.getOptionsAsString();
-    }
-
-    protected String getOptionValue(Option option) {
-        return optionsHelper.getOptionValue(option);
-    }
-
-    protected boolean hasOption(Option option) {
-        return optionsHelper.hasOption(option);
-    }
-
-    protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
-        int retVal = 0;
-        long start = System.nanoTime();
-
-        if (isAsync) {
-            job.submit();
-        } else {
-            job.waitForCompletion(true);
-            retVal = job.isSuccessful() ? 0 : 1;
-        }
-
-        log.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures.  Time taken ") + StringUtils.formatTime((System.nanoTime() - start) / 1000000L));
-
-        return retVal;
-    }
-
-    protected static void runJob(Tool job, String[] args) {
-        try {
-            int exitCode = ToolRunner.run(job, args);
-            System.exit(exitCode);
-        } catch (Exception e) {
-            e.printStackTrace(System.err);
-            System.exit(5);
-        }
-    }
-
-    public void addInputDirs(String input, Job job) throws IOException {
-        for (String inp : StringSplitter.split(input, ",")) {
-            inp = inp.trim();
-            if (inp.endsWith("/*")) {
-                inp = inp.substring(0, inp.length() - 2);
-                FileSystem fs = FileSystem.get(job.getConfiguration());
-                Path path = new Path(inp);
-                FileStatus[] fileStatuses = fs.listStatus(path);
-                boolean hasDir = false;
-                for (FileStatus stat : fileStatuses) {
-                    if (stat.isDirectory()) {
-                        hasDir = true;
-                        addInputDirs(stat.getPath().toString(), job);
-                    }
-                }
-                if (fileStatuses.length > 0 && !hasDir) {
-                    addInputDirs(path.toString(), job);
-                }
-            } else {
-                System.out.println("Add input " + inp);
-                FileInputFormat.addInputPath(job, new Path(inp));
-            }
-        }
-    }
-
-    protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
-        File tmp = File.createTempFile("kylin_job_meta", "");
-        tmp.delete(); // we need a directory, so delete the file first
-
-        File metaDir = new File(tmp, "meta");
-        metaDir.mkdirs();
-        metaDir.getParentFile().deleteOnExit();
-
-        // write kylin.properties
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        File kylinPropsFile = new File(metaDir, "kylin.properties");
-        kylinConfig.writeProperties(kylinPropsFile);
-
-        // write cube / cube_desc / dict / table
-        ArrayList<String> dumpList = new ArrayList<String>();
-        dumpList.add(cube.getResourcePath());
-        dumpList.add(cube.getDescriptor().getResourcePath());
-        if (cube.isInvertedIndex()) {
-            dumpList.add(cube.getInvertedIndexDesc().getResourcePath());
-        }
-        for (TableDesc table : cube.getDescriptor().listTables()) {
-            dumpList.add(table.getResourcePath());
-        }
-
-        for (CubeSegment segment : cube.getSegments()) {
-            dumpList.addAll(segment.getDictionaryPaths());
-        }
-
-        dumpResources(kylinConfig, metaDir, dumpList);
-
-        // hadoop distributed cache
-        conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
-    }
-
-    private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
-        ResourceStore from = ResourceStore.getStore(kylinConfig);
-        KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
-        ResourceStore to = ResourceStore.getStore(localConfig);
-        for (String path : dumpList) {
-            InputStream in = from.getResource(path);
-            if (in == null)
-                throw new IllegalStateException("No resource found at -- " + path);
-            long ts = from.getResourceTimestamp(path);
-            to.putResource(path, in, ts);
-            log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
-        }
-    }
-
-    protected void deletePath(Configuration conf, Path path) throws IOException {
-        FileSystem fs = FileSystem.get(conf);
-        if (fs.exists(path)) {
-            fs.delete(path, true);
-        }
-    }
-
-    protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        if (job == null) {
-            throw new JobException("Job is null");
-        }
-
-        long mapInputBytes = 0;
-        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
-        for (InputSplit split : input.getSplits(job)) {
-            mapInputBytes += split.getLength();
-        }
-        if (mapInputBytes == 0) {
-            throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
-        }
-        double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
-        return totalMapInputMB;
-    }
-
-    protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
-        if (job == null) {
-            throw new JobException("Job is null");
-        }
-        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
-        return input.getSplits(job).size();
-    }
-
-    public static KylinConfig loadKylinPropsAndMetadata(Configuration conf) throws IOException {
-        File metaDir = new File("meta");
-        System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
-        System.out.println("The absolute path for meta dir is " + metaDir.getAbsolutePath());
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
-        return kylinConfig;
-    }
-
-    public void kill() throws JobException {
-        if (job != null) {
-            try {
-                job.killJob();
-            } catch (IOException e) {
-                throw new JobException(e);
-            }
-        }
-    }
-
-    public Map<String, String> getInfo() throws JobException {
-        if (job != null) {
-            Map<String, String> status = new HashMap<String, String>();
-            if (null != job.getJobID()) {
-                status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
-            }
-            if (null != job.getTrackingURL()) {
-                status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
-            }
-
-            return status;
-        } else {
-            throw new JobException("Job is null");
-        }
-    }
-
-    public Counters getCounters() throws JobException {
-        if (job != null) {
-            try {
-                return job.getCounters();
-            } catch (IOException e) {
-                throw new JobException(e);
-            }
-        } else {
-            throw new JobException("Job is null");
-        }
-    }
-
-    public void setAsync(boolean isAsync) {
-        this.isAsync = isAsync;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
deleted file mode 100644
index e25ec36..0000000
--- a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.hadoop.cardinality;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.kylinolap.common.hll.HyperLogLogPlusCounter;
-import com.kylinolap.cube.kv.RowConstants;
-
-/**
- * @author Jack
- * 
- */
-public class ColumnCardinalityMapper<T> extends Mapper<T, Text, IntWritable, BytesWritable> {
-
-    private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
-    public static final String DEFAULT_DELIM = ",";
-
-    @Override
-    public void map(T key, Text value, Context context) throws IOException, InterruptedException {
-        String delim = context.getConfiguration().get(HiveColumnCardinalityJob.KEY_INPUT_DELIM);
-        if (delim == null) {
-            delim = DEFAULT_DELIM;
-        }
-        String line = value.toString();
-        StringTokenizer tokenizer = new StringTokenizer(line, delim);
-        int i = 1;
-        while (tokenizer.hasMoreTokens()) {
-            String temp = tokenizer.nextToken();
-            getHllc(i).add(Bytes.toBytes(temp));
-            i++;
-        }
-    }
-
-    private HyperLogLogPlusCounter getHllc(Integer key) {
-        if (!hllcMap.containsKey(key)) {
-            hllcMap.put(key, new HyperLogLogPlusCounter());
-        }
-        return hllcMap.get(key);
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        Iterator<Integer> it = hllcMap.keySet().iterator();
-        while (it.hasNext()) {
-            int key = it.next();
-            HyperLogLogPlusCounter hllc = hllcMap.get(key);
-            ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-            buf.clear();
-            hllc.writeRegisters(buf);
-            buf.flip();
-            context.write(new IntWritable(key), new BytesWritable(buf.array()));
-        }
-    }
-
-}