You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/01/07 15:46:42 UTC

[21/51] [partial] incubator-kylin git commit: migrate repo from github.com to apache git

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobEngine.java b/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
new file mode 100644
index 0000000..b86fe36
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/engine/JobEngine.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.engine;
+
+/** 
+ * @author George Song (ysong1), xduo
+ * 
+ */
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.kylinolap.common.KylinConfig;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.math.stat.descriptive.rank.Percentile;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.quartz.Scheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.exception.JobException;
+
+public class JobEngine implements ConnectionStateListener {
+
+    private static Logger log = LoggerFactory.getLogger(JobEngine.class);
+
+    private final String engineID;
+
+    private final JobEngineConfig engineConfig;
+    private final QuatzScheduler scheduler;
+    private InterProcessMutex sharedLock;
+    private CuratorFramework zkClient;
+
+    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+    private static final ConcurrentHashMap<JobEngineConfig, JobEngine> CACHE = new ConcurrentHashMap<JobEngineConfig, JobEngine>();
+    private int daemonJobIntervalInSeconds;
+
+    public static JobEngine getInstance(String engineID, JobEngineConfig engineCfg) throws JobException {
+        JobEngine r = CACHE.get(engineCfg);
+        if (r == null) {
+            r = new JobEngine(engineID, engineCfg);
+            CACHE.putIfAbsent(engineCfg, r);
+        }
+        return r;
+    }
+
+    private JobEngine(String engineID, JobEngineConfig context) throws JobException {
+        if (context.getZookeeperString() == null || context.getZookeeperString().equals("")) {
+            throw new IllegalArgumentException("Zookeeper connection string is null or empty");
+        }
+        log.info("Using metadata url: " + context.getConfig());
+
+        this.engineID = engineID;
+        this.engineConfig = context;
+        this.scheduler = new QuatzScheduler();
+
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        this.zkClient = CuratorFrameworkFactory.newClient(context.getZookeeperString(), retryPolicy);
+        this.zkClient.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                log.debug("Closing HBASE connection");
+                releaseLock();
+            }
+        });
+    }
+
+    private void releaseLock() {
+        try {
+            if (sharedLock != null && sharedLock.isAcquiredInThisProcess()) {
+                sharedLock.release();
+            }
+            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+                // client.setData().forPath(ZOOKEEPER_LOCK_PATH, null);
+                if (zkClient.checkExists().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID) != null) {
+                    zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void start(int daemonJobIntervalInSeconds) throws Exception {
+        this.daemonJobIntervalInSeconds = daemonJobIntervalInSeconds;
+
+        sharedLock = new InterProcessMutex(zkClient, ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
+        log.info("Trying to obtain the shared lock...");
+        // current thread will be blocked until the lock is got
+        sharedLock.acquire();
+
+        log.info("Obtained the shared lock. Starting job scheduler...");
+        zkClient.setData().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID, Bytes.toBytes(this.engineID));
+        startScheduler();
+    }
+
+    private void startScheduler() throws JobException, IOException {
+        String logDir = KylinConfig.getInstanceFromEnv().getKylinJobLogDir();
+        new File(logDir).mkdirs();
+
+        log.info("Starting scheduler.");
+        this.scheduler.start();
+        this.scheduler.scheduleFetcher(this.daemonJobIntervalInSeconds, this.engineConfig);
+    }
+
+    public void start() throws Exception {
+        start(JobConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS);
+    }
+
+    public void stop() throws JobException {
+        releaseLock();
+        this.scheduler.stop();
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+        if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
+            releaseLock();
+        }
+    }
+
+    public void interruptJob(JobInstance jobInstance, JobStep jobStep) throws IOException, JobException {
+        // kill the running step
+        this.scheduler.interrupt(jobInstance, jobStep);
+    }
+
+    public Scheduler getScheduler() {
+        return this.scheduler.getScheduler();
+    }
+
+    // Job engine metrics related methods
+
+    // <StepID, Duration Seconds>
+    public static ConcurrentHashMap<String, Double> JOB_DURATION = new ConcurrentHashMap<String, Double>();
+
+    public int getNumberOfJobStepsExecuted() {
+        return JOB_DURATION.values().size();
+    }
+
+    public String getPrimaryEngineID() throws Exception {
+        byte[] data = zkClient.getData().forPath(ZOOKEEPER_LOCK_PATH + "/" + this.engineID);
+        if (data == null) {
+            return "";
+        } else {
+            return Bytes.toString(data);
+        }
+    }
+
+    public double getMinJobStepDuration() {
+        double[] all = getJobStepDuration();
+        Arrays.sort(all);
+
+        if (all.length > 0) {
+            return all[0];
+        } else {
+            return 0;
+        }
+    }
+
+    private double[] getJobStepDuration() {
+        Collection<Double> values = JOB_DURATION.values();
+        Double[] all = (Double[]) values.toArray(new Double[values.size()]);
+        return ArrayUtils.toPrimitive(all);
+    }
+
+    public double getMaxJobStepDuration() {
+        double[] all = getJobStepDuration();
+        Arrays.sort(all);
+
+        if (all.length > 1) {
+            return all[all.length - 1];
+        } else {
+            return 0;
+        }
+    }
+
+    public double getPercentileJobStepDuration(double percentile) {
+        Collection<Double> values = JOB_DURATION.values();
+        Double[] all = (Double[]) values.toArray(new Double[values.size()]);
+        Percentile p = new Percentile(percentile);
+        return p.evaluate(ArrayUtils.toPrimitive(all));
+    }
+
+    public Integer getScheduledJobsSzie() {
+        return scheduler.getScheduledJobs();
+    }
+
+    public int getEngineThreadPoolSize() {
+        return scheduler.getThreadPoolSize();
+    }
+
+    public int getNumberOfIdleSlots() {
+        return scheduler.getIdleSlots();
+    }
+
+    public int getNumberOfJobStepsRunning() {
+        return scheduler.getRunningJobs();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java b/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
new file mode 100644
index 0000000..5527cf6
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/engine/JobEngineConfig.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.engine;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.tools.OptionsHelper;
+import com.kylinolap.metadata.model.cube.CubeDesc.CubeCapacity;
+
+/**
+ * @author ysong1
+ */
+public class JobEngineConfig {
+    private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
+    public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
+
+    private String getHadoopJobConfFilePath(CubeCapacity capaticy, boolean appendSuffix) throws IOException {
+        String hadoopJobConfFile;
+        if (appendSuffix)
+            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
+        else
+            hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
+
+        String path = System.getProperty(KylinConfig.KYLIN_CONF);
+
+        if (path == null) {
+            path = System.getenv(KylinConfig.KYLIN_CONF);
+        }
+
+        if (path != null) {
+            path = path + File.separator + hadoopJobConfFile;
+        }
+
+        if (null == path || !new File(path).exists()) {
+            File defaultFilePath = new File("/etc/kylin/" + hadoopJobConfFile);
+
+            if (defaultFilePath.exists()) {
+                path = defaultFilePath.getAbsolutePath();
+            } else {
+                logger.debug("Search conf file " + hadoopJobConfFile + "  from classpath ...");
+                InputStream is = JobEngineConfig.class.getClassLoader().getResourceAsStream(hadoopJobConfFile);
+                if (is == null) {
+                    logger.debug("Can't get " + hadoopJobConfFile + " from classpath");
+                    logger.debug("No " + hadoopJobConfFile + " file were found");
+                } else {
+                    File tmp = File.createTempFile(HADOOP_JOB_CONF_FILENAME, ".xml");
+                    inputStreamToFile(is, tmp);
+                    path = tmp.getAbsolutePath();
+                }
+            }
+        }
+
+        if (null == path || !new File(path).exists()) {
+            return "";
+        }
+
+        return OptionsHelper.convertToFileURL(path);
+    }
+
+    public String getHadoopJobConfFilePath(CubeCapacity capaticy) throws IOException {
+        String path = getHadoopJobConfFilePath(capaticy, true);
+        if (!StringUtils.isEmpty(path)) {
+            logger.info("Chosen job conf is : " + path);
+            return path;
+        } else {
+            path = getHadoopJobConfFilePath(capaticy, false);
+            if (!StringUtils.isEmpty(path)) {
+                logger.info("Chosen job conf is : " + path);
+                return path;
+            }
+        }
+        return "";
+    }
+
+    private void inputStreamToFile(InputStream ins, File file) throws IOException {
+        OutputStream os = new FileOutputStream(file);
+        int bytesRead = 0;
+        byte[] buffer = new byte[8192];
+        while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
+            os.write(buffer, 0, bytesRead);
+        }
+        os.close();
+        ins.close();
+    }
+
+    // there should be no setters
+    private final KylinConfig config;
+
+    public JobEngineConfig(KylinConfig config) {
+        this.config = config;
+    }
+
+    public KylinConfig getConfig() {
+        return config;
+    }
+
+    public String getHdfsWorkingDirectory() {
+        return config.getHdfsWorkingDirectory();
+    }
+
+    /**
+     * @return the kylinJobJarPath
+     */
+    public String getKylinJobJarPath() {
+        return config.getKylinJobJarPath();
+    }
+
+    /**
+     * @return the runAsRemoteCommand
+     */
+    public boolean isRunAsRemoteCommand() {
+        return config.getRunAsRemoteCommand();
+    }
+
+    /**
+     * @return the zookeeperString
+     */
+    public String getZookeeperString() {
+        return config.getZookeeperString();
+    }
+
+    /**
+     * @return the remoteHadoopCliHostname
+     */
+    public String getRemoteHadoopCliHostname() {
+        return config.getRemoteHadoopCliHostname();
+    }
+
+    /**
+     * @return the remoteHadoopCliUsername
+     */
+    public String getRemoteHadoopCliUsername() {
+        return config.getRemoteHadoopCliUsername();
+    }
+
+    /**
+     * @return the remoteHadoopCliPassword
+     */
+    public String getRemoteHadoopCliPassword() {
+        return config.getRemoteHadoopCliPassword();
+    }
+    
+    public String getMapReduceCmdExtraArgs() {
+        return config.getMapReduceCmdExtraArgs();
+    }
+
+    /**
+     * @return the yarnStatusServiceUrl
+     */
+    public String getYarnStatusServiceUrl() {
+        return config.getYarnStatusServiceUrl();
+    }
+
+    /**
+     * @return the maxConcurrentJobLimit
+     */
+    public int getMaxConcurrentJobLimit() {
+        return config.getMaxConcurrentJobLimit();
+    }
+
+    /**
+     * @return the timeZone
+     */
+    public String getTimeZone() {
+        return config.getTimeZone();
+    }
+
+    /**
+     * @return the adminDls
+     */
+    public String getAdminDls() {
+        return config.getAdminDls();
+    }
+
+    /**
+     * @return the jobStepTimeout
+     */
+    public long getJobStepTimeout() {
+        return config.getJobStepTimeout();
+    }
+
+    /**
+     * @return the asyncJobCheckInterval
+     */
+    public int getAsyncJobCheckInterval() {
+        return config.getYarnStatusCheckIntervalSeconds();
+    }
+
+    /**
+     * @return the flatTableByHive
+     */
+    public boolean isFlatTableByHive() {
+        return config.getFlatTableByHive();
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((config == null) ? 0 : config.hashCode());
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        JobEngineConfig other = (JobEngineConfig) obj;
+        if (config == null) {
+            if (other.config != null)
+                return false;
+        } else if (!config.equals(other.config))
+            return false;
+        return true;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java b/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
new file mode 100644
index 0000000..2389f27
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/engine/JobFetcher.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.engine;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.quartz.Job;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.util.StringSplitter;
+import com.kylinolap.job.JobDAO;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStatusEnum;
+import com.kylinolap.job.flow.JobFlow;
+
+/**
+ * @author ysong1, xduo
+ * 
+ */
+public class JobFetcher implements Job {
+
+    private static final Logger log = LoggerFactory.getLogger(JobFetcher.class);
+
+    public static final int JOB_THRESHOLD = 10;
+
+    @Override
+    public void execute(JobExecutionContext context) throws JobExecutionException {
+
+        JobEngineConfig engineConfig = (JobEngineConfig) context.getJobDetail().getJobDataMap().get(JobConstants.PROP_ENGINE_CONTEXT);
+
+        JobDAO jobDAO = JobDAO.getInstance(engineConfig.getConfig());
+
+        try {
+            // get all pending jobs
+            List<JobInstance> pendingJobList = jobDAO.listAllJobs(JobStatusEnum.PENDING);
+
+            log.debug(pendingJobList.size() + " pending jobs");
+            int leftJobs = JOB_THRESHOLD;
+            Random rand = new Random();
+            int maxConcurrentJobCount = engineConfig.getMaxConcurrentJobLimit();
+
+            for (JobInstance jobInstance : pendingJobList) {
+                @SuppressWarnings("unchecked")
+                ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) context.getScheduler().getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
+
+                if (jobFlows.size() >= maxConcurrentJobCount) {
+                    // If too many job instances in current job context, just
+                    // wait.
+                    break;
+                }
+
+                try {
+                    // there should be only 1 job for a certain job running
+                    boolean cubeHasRunningJob = false;
+                    for (String s : jobFlows.keySet()) {
+                        String[] tmp = StringSplitter.split(s, ".");
+                        String cubename = tmp[0];
+                        String jobid = tmp[1];
+                        if (cubename.equals(jobInstance.getRelatedCube())) {
+                            log.info("There is already a job of cube " + jobInstance.getRelatedCube() + " running, job uuid is " + jobid);
+                            cubeHasRunningJob = true;
+                            break;
+                        }
+                    }
+
+                    if (cubeHasRunningJob == false && jobFlows.containsKey(JobInstance.getJobIdentity(jobInstance)) == false) {
+                        // create job flow
+                        JobFlow jobFlow = new JobFlow(jobInstance, engineConfig);
+                        jobFlows.put(JobInstance.getJobIdentity(jobInstance), jobFlow);
+
+                        // schedule the 1st step
+                        Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
+                        JobDetail firstStep = jobFlow.getFirst();
+                        context.getScheduler().scheduleJob(firstStep, trigger);
+
+                        log.info("Job " + jobInstance.getUuid() + " has been scheduled with the first step " + firstStep.getKey().toString());
+                    }
+                } catch (Exception e) {
+                    log.error("Failed to trigger the job detail", e);
+                }
+
+                if (--leftJobs < 0) {
+                    log.info("Too many pending jobs!");
+                    break;
+                }
+                long ms = Math.abs(rand.nextLong() % 10L);
+                Thread.sleep(ms * 1000L);
+            }
+        } catch (Throwable t) {
+            log.error(t.getMessage());
+            throw new JobExecutionException(t);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java b/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
new file mode 100644
index 0000000..f080c37
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/engine/QuatzScheduler.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.engine;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.SimpleScheduleBuilder;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.quartz.UnableToInterruptJobException;
+import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.matchers.GroupMatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.cmd.IJobCommand;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.exception.JobException;
+import com.kylinolap.job.flow.JobFlow;
+import com.kylinolap.job.flow.JobFlowListener;
+
+/**
+ * @author xduo
+ * 
+ */
+public class QuatzScheduler {
+
+    private static Logger log = LoggerFactory.getLogger(QuatzScheduler.class);
+
+    private Scheduler scheduler;
+    private JobFlowListener globalJobListener;
+
+    // public static void scheduleJobFlow(Scheduler scheduler, JobFlow jobFlow)
+    // throws JobException {
+    // // schedule the 1st step
+    // Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
+    // JobDetail firstStep = jobFlow.getFirst();
+    // try {
+    // scheduler.scheduleJob(firstStep, trigger);
+    // } catch (SchedulerException e) {
+    // throw new JobException(e);
+    // }
+    // }
+
+    public QuatzScheduler() throws JobException {
+        this.globalJobListener = new JobFlowListener(JobConstants.GLOBAL_LISTENER_NAME);
+        StdSchedulerFactory sf = new StdSchedulerFactory();
+        Properties schedulerProperties = new Properties();
+        int numberOfProcessors = Runtime.getRuntime().availableProcessors();
+        schedulerProperties.setProperty("org.quartz.threadPool.threadCount", String.valueOf(numberOfProcessors));
+        schedulerProperties.setProperty("org.quartz.scheduler.skipUpdateCheck", "true");
+
+        try {
+            sf.initialize(schedulerProperties);
+            this.scheduler = sf.getScheduler();
+            this.scheduler.getListenerManager().addJobListener(this.globalJobListener, GroupMatcher.jobGroupEquals(JobConstants.CUBE_JOB_GROUP_NAME));
+
+            // cubename.jobUuid -> job flow
+            this.scheduler.getContext().put(JobConstants.PROP_JOB_RUNTIME_FLOWS, new ConcurrentHashMap<String, JobFlow>());
+
+            // put the scheduler in standby mode first
+            this.scheduler.standby();
+        } catch (SchedulerException e) {
+            throw new JobException(e);
+        }
+    }
+
+    public void start() throws JobException {
+        try {
+            this.scheduler.start();
+        } catch (SchedulerException e) {
+            throw new JobException(e);
+        }
+    }
+
+    public void scheduleFetcher(int intervalInSeconds, JobEngineConfig engineConfig) throws JobException {
+        JobDetail job = JobBuilder.newJob(JobFetcher.class).withIdentity(JobFetcher.class.getCanonicalName(), JobConstants.DAEMON_JOB_GROUP_NAME).build();
+        job.getJobDataMap().put(JobConstants.PROP_ENGINE_CONTEXT, engineConfig);
+
+        Trigger trigger = TriggerBuilder.newTrigger().startNow().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(intervalInSeconds).repeatForever()).build();
+
+        try {
+            this.scheduler.scheduleJob(job, trigger);
+        } catch (SchedulerException e) {
+            throw new JobException(e);
+        }
+    }
+
+    public boolean interrupt(JobInstance jobInstance, JobStep jobStep) throws JobException, IOException {
+        JobKey jobKey = new JobKey(JobInstance.getStepIdentity(jobInstance, jobStep), JobConstants.CUBE_JOB_GROUP_NAME);
+
+        boolean res = false;
+        try {
+            JobDetail jobDetail = this.scheduler.getJobDetail(jobKey);
+
+            IJobCommand iJobStepCmd = (IJobCommand) jobDetail.getJobDataMap().get(JobConstants.PROP_JOB_CMD_EXECUTOR);
+            if (null != iJobStepCmd) {
+                iJobStepCmd.cancel();
+            }
+
+            jobDetail.getJobDataMap().put(JobConstants.PROP_JOB_KILLED, true);
+            this.scheduler.addJob(jobDetail, true, true);
+
+            @SuppressWarnings("unchecked")
+            ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) this.scheduler.getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
+            jobFlows.remove(JobInstance.getJobIdentity(jobInstance));
+        } catch (UnableToInterruptJobException e) {
+            log.error(e.getLocalizedMessage(), e);
+            throw new JobException(e);
+        } catch (SchedulerException e) {
+            log.error(e.getLocalizedMessage(), e);
+            throw new JobException(e);
+        }
+        return res;
+    }
+
+    public void stop() throws JobException {
+        try {
+            this.scheduler.standby();
+        } catch (SchedulerException e) {
+            throw new JobException(e);
+        }
+    }
+
+    public Scheduler getScheduler() {
+        return this.scheduler;
+    }
+
+    // // metrics
+
+    public int getThreadPoolSize() {
+        try {
+            return scheduler.getMetaData().getThreadPoolSize();
+        } catch (SchedulerException e) {
+            log.error("Can't get scheduler metadata!", e);
+            return 0;
+        }
+    }
+
+    public int getRunningJobs() {
+        try {
+            return this.scheduler.getCurrentlyExecutingJobs().size();
+        } catch (SchedulerException e) {
+            log.error("Can't get scheduler metadata!", e);
+            return 0;
+        }
+    }
+
+    public int getIdleSlots() {
+        try {
+            return this.scheduler.getMetaData().getThreadPoolSize() - this.scheduler.getCurrentlyExecutingJobs().size();
+        } catch (SchedulerException e) {
+            log.error("Can't get scheduler metadata!", e);
+            return 0;
+        }
+    }
+
+    public int getScheduledJobs() {
+        int allTriggersCount = 0;
+        try {
+            for (String groupName : scheduler.getJobGroupNames()) {
+                allTriggersCount += scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName)).size();
+            }
+        } catch (SchedulerException e) {
+            log.error("Can't get scheduler metadata!", e);
+        }
+        return allTriggersCount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java b/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
new file mode 100644
index 0000000..1b404cb
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/exception/InvalidJobInstanceException.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.exception;
+
+/**
+ * @author ysong1
+ * 
+ */
+public class InvalidJobInstanceException extends Exception {
+
+    private static final long serialVersionUID = 2045169570038227895L;
+
+    public InvalidJobInstanceException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java b/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
new file mode 100644
index 0000000..d1050d0
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/exception/InvalidJobStatusException.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.exception;
+
+/**
+ * @author xduo
+ * 
+ */
+public class InvalidJobStatusException extends Exception {
+
+    /**
+     * @param string
+     */
+    public InvalidJobStatusException(String string) {
+    }
+
+    private static final long serialVersionUID = -8549756520626114000L;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/exception/JobException.java b/job/src/main/java/com/kylinolap/job/exception/JobException.java
new file mode 100644
index 0000000..6c4bc6a
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/exception/JobException.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.exception;
+
+/**
+ * @author xduo
+ * 
+ */
+public class JobException extends Exception {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 
+     */
+    public JobException() {
+        super();
+    }
+
+    /**
+     * @param message
+     * @param cause
+     */
+    public JobException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * @param message
+     */
+    public JobException(String message) {
+        super(message);
+    }
+
+    /**
+     * @param cause
+     */
+    public JobException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java b/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
new file mode 100644
index 0000000..1b8777e
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/flow/AsyncJobFlowNode.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.flow;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.quartz.DateBuilder;
+import org.quartz.DateBuilder.IntervalUnit;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.JobDAO;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.cmd.ICommandOutput;
+import com.kylinolap.job.cmd.JobCommandFactory;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStepStatusEnum;
+import com.kylinolap.job.engine.JobEngineConfig;
+import com.kylinolap.job.exception.JobException;
+
+/**
+ * @author xduo
+ * 
+ */
+public class AsyncJobFlowNode extends JobFlowNode {
+
+    @Override
+    public void execute(JobExecutionContext context) throws JobExecutionException {
+        this.currentJobDetail = context.getJobDetail();
+        JobDataMap data = this.currentJobDetail.getJobDataMap();
+        JobFlow jobFlow = (JobFlow) data.get(JobConstants.PROP_JOB_FLOW);
+        JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
+        KylinConfig config = engineConfig.getConfig();
+        String jobInstanceID = data.getString(JobConstants.PROP_JOBINSTANCE_UUID);
+        int jobStepID = data.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
+        ICommandOutput output = (ICommandOutput) data.get(JobConstants.PROP_JOB_CMD_OUTPUT);
+
+        try {
+            if (data.getBoolean(JobConstants.PROP_JOB_KILLED)) {
+                log.info(this.currentJobDetail.getKey() + " is killed");
+                return;
+            }
+
+            if (output == null) {
+                JobInstance jobInstance = updateJobStep(jobInstanceID, jobStepID, config, JobStepStatusEnum.RUNNING, System.currentTimeMillis(), null, null);
+
+                String command = data.getString(JobConstants.PROP_COMMAND);
+                jobCmd = JobCommandFactory.getJobCommand(command, jobInstance, jobStepID, engineConfig);
+                output = jobCmd.execute();
+                data.put(JobConstants.PROP_JOB_CMD_OUTPUT, output);
+                data.put(JobConstants.PROP_JOB_CMD_EXECUTOR, jobCmd);
+                context.getScheduler().addJob(this.currentJobDetail, true, true);
+
+                JobStepStatusEnum stepStatus = output.getStatus();
+                updateJobStep(jobInstanceID, jobStepID, config, stepStatus, null, stepStatus.isComplete() ? System.currentTimeMillis() : null, output.getOutput());
+
+                context.setResult(output.getExitCode());
+                scheduleStatusChecker(context);
+                log.debug("Start async job " + currentJobDetail.getKey());
+            } else {
+                JobInstance jobInstance = JobDAO.getInstance(engineConfig.getConfig()).getJob(jobInstanceID);
+                JobStep jobStep = jobInstance.getSteps().get(jobStepID);
+
+                log.debug("Start to check hadoop job status of " + currentJobDetail.getKey());
+                JobStepStatusEnum stepStatus = output.getStatus();
+
+                if ((System.currentTimeMillis() - jobStep.getExecStartTime()) / 1000 >= engineConfig.getJobStepTimeout()) {
+                    throw new JobException("Job step " + jobStep.getName() + " timeout.");
+                }
+
+                updateJobStep(jobInstance.getUuid(), jobStepID, config, stepStatus, null, stepStatus.isComplete() ? System.currentTimeMillis() : null, output.getOutput());
+
+                if (!stepStatus.isComplete()) {
+                    scheduleStatusChecker(context);
+                }
+
+                context.setResult(0);
+                log.debug("Status of async job " + currentJobDetail.getKey() + ":" + stepStatus);
+            }
+        } catch (Throwable t) {
+            handleException(jobInstanceID, jobStepID, config, t);
+        }
+
+    }
+
+    private void scheduleStatusChecker(JobExecutionContext context) throws SchedulerException {
+        JobDataMap jobDataMap = this.currentJobDetail.getJobDataMap();
+        JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
+        JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
+        int interval = engineConfig.getAsyncJobCheckInterval();
+        log.debug("Trigger a status check job in " + interval + " seconds for job " + currentJobDetail.getKey());
+
+        Trigger trigger = TriggerBuilder.newTrigger().startAt(DateBuilder.futureDate(interval, IntervalUnit.SECOND)).build();
+        Set<Trigger> triggers = new HashSet<Trigger>();
+        triggers.add(trigger);
+        context.getScheduler().scheduleJob(currentJobDetail, triggers, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlow.java b/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
new file mode 100644
index 0000000..9a58107
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/flow/JobFlow.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.flow;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.quartz.JobBuilder;
+import org.quartz.JobDetail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStepCmdTypeEnum;
+import com.kylinolap.job.engine.JobEngineConfig;
+
+/**
+ * @author xduo
+ */
+public class JobFlow {
+
+    private static Logger log = LoggerFactory.getLogger(JobFlow.class);
+
+    private final List<JobDetail> flowNodes;
+    private final JobInstance jobInstance;
+    private final JobEngineConfig engineConfig;
+
+    public JobFlow(JobInstance job, JobEngineConfig context) {
+        this.engineConfig = context;
+        this.jobInstance = job;
+        // validate job instance
+        List<JobStep> sortedSteps = jobInstance.getSteps();
+
+        if (sortedSteps == null || sortedSteps.size() == 0) {
+            throw new IllegalStateException("Steps of job " + jobInstance.getUuid() + " is null or empty!");
+        }
+
+        // sort the steps by step_sequenceID
+        Collections.sort(sortedSteps);
+        // find the 1st runnable job
+        int firstStepIndex = findFirstStep(sortedSteps);
+
+        log.info("Job " + jobInstance.getUuid() + " will be started at step " + firstStepIndex + " (sequence number)");
+
+        flowNodes = new LinkedList<JobDetail>();
+        for (int i = firstStepIndex; i < sortedSteps.size(); i++) {
+            JobDetail node = createJobFlowNode(jobInstance, i);
+            flowNodes.add(node);
+        }
+    }
+
+    public JobInstance getJobInstance() {
+        return jobInstance;
+    }
+
+    public JobEngineConfig getJobengineConfig() {
+        return engineConfig;
+    }
+
+    public JobDetail getFirst() {
+        if (flowNodes.isEmpty()) {
+            return null;
+        }
+        return flowNodes.get(0);
+    }
+
+    public JobDetail getNext(JobDetail jobFlowNode) {
+        int targetIndex = -1;
+        for (int index = 0; index < flowNodes.size(); index++) {
+            if (flowNodes.get(index).equals(jobFlowNode)) {
+                targetIndex = index;
+            }
+        }
+
+        if (targetIndex != -1 && flowNodes.size() > targetIndex + 1) {
+            return flowNodes.get(targetIndex + 1);
+        }
+        return null;
+    }
+
+    private int findFirstStep(List<JobStep> stepList) {
+        int firstJobIndex = 0;
+        for (int i = 0; i < stepList.size(); i++) {
+            JobStep currentStep = stepList.get(i);
+            if (currentStep.getStatus().isRunable() == false) {
+                continue;
+            } else {
+                firstJobIndex = i;
+                break;
+            }
+        }
+        return firstJobIndex;
+    }
+
+    private JobDetail createJobFlowNode(final JobInstance jobInstance, final int stepSeqId) {
+        JobStep step = jobInstance.getSteps().get(stepSeqId);
+
+        if (jobInstance.getName() == null || step.getName() == null) {
+            throw new IllegalArgumentException("JobInstance name or JobStep name cannot be null!");
+        }
+
+        // submit job the different groups based on isRunAsync property
+        JobDetail jobFlowNode = JobBuilder.newJob(step.isRunAsync() ? AsyncJobFlowNode.class : JobFlowNode.class).withIdentity(JobInstance.getStepIdentity(jobInstance, step), JobConstants.CUBE_JOB_GROUP_NAME).storeDurably().build();
+
+        // add job flow to node
+        jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOB_FLOW, this);
+
+        // add command to flow node
+        String execCmd = (step.getCmdType() == JobStepCmdTypeEnum.SHELL_CMD || step.getCmdType() == JobStepCmdTypeEnum.SHELL_CMD_HADOOP) ? wrapExecCmd(jobInstance, step.getExecCmd(), String.valueOf(step.getSequenceID())) : step.getExecCmd();
+        jobFlowNode.getJobDataMap().put(JobConstants.PROP_COMMAND, execCmd);
+
+        // add job instance and step sequenceID to flow node
+        jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOBINSTANCE_UUID, jobInstance.getUuid());
+        jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOBSTEP_SEQ_ID, step.getSequenceID());
+
+        // add async flag to flow node
+        jobFlowNode.getJobDataMap().put(JobConstants.PROP_JOB_ASYNC, step.isRunAsync());
+
+        return jobFlowNode;
+    }
+
+    private String wrapExecCmd(JobInstance job, String cmd, String suffix) {
+        if (StringUtils.isBlank(cmd))
+            return cmd;
+
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        try {
+            FileUtils.forceMkdir(new File(config.getKylinJobLogDir()));
+        } catch (IOException e) {
+            throw new RuntimeException("Create log dir " + config.getKylinJobLogDir() + " failed.", e);
+        }
+        String log = config.getKylinJobLogDir() + "/" + job.getUuid() + "_" + suffix + ".log";
+
+        String mkLogDir = "mkdir -p " + config.getKylinJobLogDir();
+        return mkLogDir + ";" + "set -o pipefail; " + cmd + " 2>&1 | tee " + log;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
new file mode 100644
index 0000000..8274d44
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/flow/JobFlowListener.java
@@ -0,0 +1,419 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.flow;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.kylinolap.cube.CubeSegmentStatusEnum;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobListener;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.MailService;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeManager;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.cube.exception.CubeIntegrityException;
+import com.kylinolap.job.JobDAO;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStatusEnum;
+import com.kylinolap.job.constant.JobStepStatusEnum;
+import com.kylinolap.job.engine.JobEngineConfig;
+
+/**
+ * Handle kylin job and cube change update.
+ * 
+ * @author George Song (ysong1), xduo
+ * 
+ */
+public class JobFlowListener implements JobListener {
+
+    private static Logger log = LoggerFactory.getLogger(JobFlowListener.class);
+
+    private String name;
+
+    public JobFlowListener(String name) {
+        if (name == null) {
+            throw new IllegalArgumentException("Listener name cannot be null!");
+        }
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
+        log.info(context.getJobDetail().getKey() + " was executed.");
+        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
+        JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
+        JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
+        String jobUuid = jobDataMap.getString(JobConstants.PROP_JOBINSTANCE_UUID);
+        int stepSeqID = jobDataMap.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
+        KylinConfig config = engineConfig.getConfig();
+
+        JobInstance jobInstance = null;
+        JobStep jobStep = null;
+        try {
+            jobInstance = JobDAO.getInstance(config).getJob(jobUuid);
+            jobStep = jobInstance.getSteps().get(stepSeqID);
+            CubeInstance cube = CubeManager.getInstance(config).getCube(jobInstance.getRelatedCube());
+
+            log.info(context.getJobDetail().getKey() + " status: " + jobStep.getStatus());
+            switch (jobStep.getStatus()) {
+            case FINISHED:
+                // Ensure we are using the latest metadata
+                CubeManager.getInstance(config).loadCubeCache(cube);
+                updateKylinJobOnSuccess(jobInstance, stepSeqID, engineConfig);
+                updateCubeSegmentInfoOnSucceed(jobInstance, engineConfig);
+                notifyUsers(jobInstance, engineConfig);
+                scheduleNextJob(context, jobInstance);
+                break;
+            case ERROR:
+                updateKylinJobStatus(jobInstance, stepSeqID, engineConfig);
+                notifyUsers(jobInstance, engineConfig);
+                break;
+            case DISCARDED:
+                // Ensure we are using the latest metadata
+                CubeManager.getInstance(config).loadCubeCache(cube);
+                updateCubeSegmentInfoOnDiscard(jobInstance, engineConfig);
+                notifyUsers(jobInstance, engineConfig);
+                break;
+            default:
+                break;
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            handleException(jobUuid, stepSeqID, config, e);
+        } finally {
+            if (null != jobInstance && jobInstance.getStatus().isComplete()) {
+                try {
+                    context.getScheduler().deleteJob(context.getJobDetail().getKey());
+                    @SuppressWarnings("unchecked")
+                    ConcurrentHashMap<String, JobFlow> jobFlows = (ConcurrentHashMap<String, JobFlow>) context.getScheduler().getContext().get(JobConstants.PROP_JOB_RUNTIME_FLOWS);
+                    jobFlows.remove(JobInstance.getJobIdentity(jobInstance));
+                } catch (SchedulerException e) {
+                    log.error(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.quartz.JobListener#jobToBeExecuted(org.quartz.JobExecutionContext)
+     */
+    @Override
+    public void jobToBeExecuted(JobExecutionContext context) {
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.quartz.JobListener#jobExecutionVetoed(org.quartz.JobExecutionContext)
+     */
+    @Override
+    public void jobExecutionVetoed(JobExecutionContext context) {
+    }
+
+    /**
+     * @param context
+     * @param jobInstance
+     */
+    protected void scheduleNextJob(JobExecutionContext context, JobInstance jobInstance) {
+        try {
+            // schedule next job
+            JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
+            JobFlow jobFlow = (JobFlow) jobDataMap.get(JobConstants.PROP_JOB_FLOW);
+            JobDetail nextJob = (JobDetail) jobFlow.getNext(context.getJobDetail());
+            if (nextJob != null) {
+                try {
+                    Trigger trigger = TriggerBuilder.newTrigger().startNow().build();
+                    log.debug("Job " + context.getJobDetail().getKey() + " will now chain to Job " + nextJob.getKey() + "");
+
+                    context.getScheduler().scheduleJob(nextJob, trigger);
+
+                } catch (SchedulerException se) {
+                    log.error("Error encountered during chaining to Job " + nextJob.getKey() + "", se);
+                }
+            }
+
+            context.getScheduler().deleteJob(context.getJobDetail().getKey());
+        } catch (SchedulerException e) {
+            log.error(e.getLocalizedMessage(), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * @param jobInstance
+     * @param stepId
+     */
+    private void updateKylinJobStatus(JobInstance jobInstance, int stepId, JobEngineConfig engineConfig) {
+        validate(jobInstance);
+        List<JobStep> steps = jobInstance.getSteps();
+        Collections.sort(steps);
+
+        JobStep jobStep = jobInstance.getSteps().get(stepId);
+
+        long duration = jobStep.getExecEndTime() - jobStep.getExecStartTime();
+        jobInstance.setDuration(jobInstance.getDuration() + (duration > 0 ? duration : 0) / 1000);
+        jobInstance.setMrWaiting(jobInstance.getMrWaiting() + jobStep.getExecWaitTime());
+
+        try {
+            JobDAO.getInstance(engineConfig.getConfig()).updateJobInstance(jobInstance);
+        } catch (IOException e) {
+            e.printStackTrace();
+            log.error(e.getLocalizedMessage(), e);
+        }
+    }
+
+    private void updateKylinJobOnSuccess(JobInstance jobInstance, int stepId, JobEngineConfig engineConfig) {
+        validate(jobInstance);
+        List<JobStep> steps = jobInstance.getSteps();
+        Collections.sort(steps);
+
+        JobStep jobStep = jobInstance.getSteps().get(stepId);
+        jobInstance.setExecStartTime(steps.get(0).getExecStartTime());
+
+        long duration = jobStep.getExecEndTime() - jobStep.getExecStartTime();
+        jobInstance.setDuration(jobInstance.getDuration() + (duration > 0 ? duration / 1000 : 0));
+        jobInstance.setMrWaiting(jobInstance.getMrWaiting() + jobStep.getExecWaitTime());
+        if (jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
+            jobInstance.setExecEndTime(steps.get(steps.size() - 1).getExecEndTime());
+        }
+
+        try {
+            JobDAO.getInstance(engineConfig.getConfig()).updateJobInstance(jobInstance);
+        } catch (IOException e) {
+            e.printStackTrace();
+            log.error(e.getLocalizedMessage(), e);
+        }
+    }
+
+    private void updateCubeSegmentInfoOnDiscard(JobInstance jobInstance, JobEngineConfig engineConfig) throws IOException, CubeIntegrityException {
+        CubeManager cubeMgr = CubeManager.getInstance(engineConfig.getConfig());
+        CubeInstance cubeInstance = cubeMgr.getCube(jobInstance.getRelatedCube());
+        cubeMgr.updateSegmentOnJobDiscard(cubeInstance, jobInstance.getRelatedSegment());
+    }
+
+    private void updateCubeSegmentInfoOnSucceed(JobInstance jobInstance, JobEngineConfig engineConfig) throws CubeIntegrityException, IOException {
+        if (jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
+            validate(jobInstance);
+
+            log.info("Updating cube segment " + jobInstance.getRelatedSegment() + " for cube " + jobInstance.getRelatedCube());
+
+            long cubeSize = 0;
+            JobStep convertToHFileStep = jobInstance.findStep(JobConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
+            if (null != convertToHFileStep) {
+                String cubeSizeString = convertToHFileStep.getInfo(JobInstance.HDFS_BYTES_WRITTEN);
+                if (cubeSizeString == null || cubeSizeString.equals("")) {
+                    throw new RuntimeException("Can't get cube segment size.");
+                }
+                cubeSize = Long.parseLong(cubeSizeString) / 1024;
+            } else {
+                log.info("No step with name '" + JobConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE + "' is found");
+            }
+
+            CubeManager cubeMgr = CubeManager.getInstance(engineConfig.getConfig());
+            CubeInstance cubeInstance = cubeMgr.getCube(jobInstance.getRelatedCube());
+            CubeSegment newSegment = cubeInstance.getSegmentById(jobInstance.getUuid());
+
+            long sourceCount = 0;
+            long sourceSize = 0;
+            switch (jobInstance.getType()) {
+            case BUILD:
+                JobStep baseCuboidStep = jobInstance.findStep(JobConstants.STEP_NAME_BUILD_BASE_CUBOID);
+                if (null != baseCuboidStep) {
+                    String sourceRecordsCount = baseCuboidStep.getInfo(JobInstance.SOURCE_RECORDS_COUNT);
+                    if (sourceRecordsCount == null || sourceRecordsCount.equals("")) {
+                        throw new RuntimeException("Can't get cube source record count.");
+                    }
+                    sourceCount = Long.parseLong(sourceRecordsCount);
+                } else {
+                    log.info("No step with name '" + JobConstants.STEP_NAME_BUILD_BASE_CUBOID + "' is found");
+                }
+
+                JobStep createFlatTableStep = jobInstance.findStep(JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+                if (null != createFlatTableStep) {
+                    String sourceRecordsSize = createFlatTableStep.getInfo(JobInstance.SOURCE_RECORDS_SIZE);
+                    if (sourceRecordsSize == null || sourceRecordsSize.equals("")) {
+                        throw new RuntimeException("Can't get cube source record size.");
+                    }
+                    sourceSize = Long.parseLong(sourceRecordsSize);
+                } else {
+                    log.info("No step with name '" + JobConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE + "' is found");
+                }
+
+                if (cubeInstance.needMergeImmediatelyAfterBuild(newSegment)) {
+                    for (CubeSegment seg : cubeInstance.getSegment(CubeSegmentStatusEnum.READY)) {
+                        sourceCount += seg.getSourceRecords();
+                        sourceSize += seg.getSourceRecordsSize();
+                    }
+                }
+                break;
+            case MERGE:
+                for (CubeSegment seg : cubeInstance.getMergingSegments()) {
+                    sourceCount += seg.getSourceRecords();
+                    sourceSize += seg.getSourceRecordsSize();
+                }
+                break;
+            }
+
+            cubeMgr.updateSegmentOnJobSucceed(cubeInstance, jobInstance.getType(), jobInstance.getRelatedSegment(), jobInstance.getUuid(), jobInstance.getExecEndTime(), cubeSize, sourceCount, sourceSize);
+            log.info("Update cube segment succeed" + jobInstance.getRelatedSegment() + " for cube " + jobInstance.getRelatedCube());
+        }
+    }
+
+    private void validate(JobInstance jobInstance) {
+        List<JobStep> steps = jobInstance.getSteps();
+        if (steps == null || steps.size() == 0) {
+            throw new RuntimeException("Steps of job " + jobInstance.getUuid() + " is null or empty!");
+        }
+    }
+
+    private void handleException(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, Throwable t) {
+        log.error(t.getLocalizedMessage(), t);
+        String exceptionMsg = "Failed with Exception:" + ExceptionUtils.getFullStackTrace(t);
+        try {
+            JobInstance jobInstance = JobDAO.getInstance(config).getJob(jobInstanceUuid);
+            jobInstance.getSteps().get(jobInstanceStepSeqId).setStatus(JobStepStatusEnum.ERROR);
+            // String output =
+            // jobInstance.getSteps().get(jobInstanceStepSeqId).getCmdOutput();
+            // jobInstance.getSteps().get(jobInstanceStepSeqId).setCmdOutput(output
+            // + "\n" + exceptionMsg);
+            jobInstance.getSteps().get(jobInstanceStepSeqId).setExecEndTime(System.currentTimeMillis());
+            JobDAO.getInstance(config).updateJobInstance(jobInstance);
+
+            String output = JobDAO.getInstance(config).getJobOutput(jobInstanceUuid, jobInstanceStepSeqId).getOutput();
+            output = output + "\n" + exceptionMsg;
+            JobDAO.getInstance(config).saveJobOutput(jobInstanceUuid, jobInstanceStepSeqId, output);
+        } catch (IOException e1) {
+            log.error(e1.getLocalizedMessage(), e1);
+        }
+    }
+
+    /**
+     * @param jobInstance
+     */
+    protected void notifyUsers(JobInstance jobInstance, JobEngineConfig engineConfig) {
+        KylinConfig config = engineConfig.getConfig();
+        String cubeName = jobInstance.getRelatedCube();
+        CubeInstance cubeInstance = CubeManager.getInstance(config).getCube(cubeName);
+        String finalStatus = null;
+        String content = JobConstants.NOTIFY_EMAIL_TEMPLATE;
+        String logMsg = "";
+
+        switch (jobInstance.getStatus()) {
+        case FINISHED:
+            finalStatus = "SUCCESS";
+            break;
+        case ERROR:
+            for (JobStep step : jobInstance.getSteps()) {
+                if (step.getStatus() == JobStepStatusEnum.ERROR) {
+                    try {
+                        logMsg = JobDAO.getInstance(config).getJobOutput(step).getOutput();
+                    } catch (IOException e) {
+                        log.error(e.getLocalizedMessage(), e);
+                    }
+                }
+            }
+            finalStatus = "FAILED";
+            break;
+        case DISCARDED:
+            finalStatus = "DISCARDED";
+        default:
+            break;
+        }
+
+        if (null == finalStatus) {
+            return;
+        }
+
+        try {
+            InetAddress inetAddress = InetAddress.getLocalHost();
+            content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
+        } catch (UnknownHostException e) {
+            log.error(e.getLocalizedMessage(), e);
+        }
+
+        content = content.replaceAll("\\$\\{job_name\\}", jobInstance.getName());
+        content = content.replaceAll("\\$\\{result\\}", finalStatus);
+        content = content.replaceAll("\\$\\{cube_name\\}", cubeName);
+        content = content.replaceAll("\\$\\{start_time\\}", new Date(jobInstance.getExecStartTime()).toString());
+        content = content.replaceAll("\\$\\{duration\\}", jobInstance.getDuration() / 60 + "mins");
+        content = content.replaceAll("\\$\\{mr_waiting\\}", jobInstance.getMrWaiting() / 60 + "mins");
+        content = content.replaceAll("\\$\\{last_update_time\\}", new Date(jobInstance.getLastModified()).toString());
+        content = content.replaceAll("\\$\\{submitter\\}", jobInstance.getSubmitter());
+        content = content.replaceAll("\\$\\{error_log\\}", logMsg);
+
+        
+        MailService mailService = new MailService();
+        try {
+            List<String> users = new ArrayList<String>();
+
+            if (null != cubeInstance.getDescriptor().getNotifyList()) {
+                users.addAll(cubeInstance.getDescriptor().getNotifyList());
+            }
+
+            if (null != engineConfig.getAdminDls()) {
+                String[] adminDls = engineConfig.getAdminDls().split(",");
+
+                for (String adminDl : adminDls) {
+                    users.add(adminDl);
+                }
+            }
+
+            log.info("prepare to send email to:"+users);
+            
+            log.info("job name:"+jobInstance.getName());
+            
+            log.info("submitter:"+jobInstance.getSubmitter());
+            
+            if (users.size() > 0) {
+                log.info("notify list:"+users);
+                mailService.sendMail(users, "["+ finalStatus + "] - [Kylin Cube Build Job]-" + cubeName, content);
+                log.info("notified users:"+users);
+            }
+        } catch (IOException e) {
+            log.error(e.getLocalizedMessage(), e);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java b/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
new file mode 100644
index 0000000..e539121
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/flow/JobFlowNode.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.flow;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.job.JobDAO;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.JobInstance.JobStep;
+import com.kylinolap.job.cmd.ICommandOutput;
+import com.kylinolap.job.cmd.IJobCommand;
+import com.kylinolap.job.cmd.JobCommandFactory;
+import com.kylinolap.job.constant.JobConstants;
+import com.kylinolap.job.constant.JobStepStatusEnum;
+import com.kylinolap.job.engine.JobEngine;
+import com.kylinolap.job.engine.JobEngineConfig;
+
+/**
+ * @author xduo
+ * 
+ */
+public class JobFlowNode implements InterruptableJob {
+
+    protected static final Logger log = LoggerFactory.getLogger(JobFlowNode.class);
+
+    protected JobDetail currentJobDetail;
+    protected IJobCommand jobCmd;
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.quartz.Job#execute(org.quartz.JobExecutionContext)
+     */
+    @Override
+    public void execute(JobExecutionContext context) throws JobExecutionException {
+        this.currentJobDetail = context.getJobDetail();
+        JobDataMap data = this.currentJobDetail.getJobDataMap();
+        JobFlow jobFlow = (JobFlow) data.get(JobConstants.PROP_JOB_FLOW);
+        JobEngineConfig engineConfig = jobFlow.getJobengineConfig();
+        String jobInstanceID = data.getString(JobConstants.PROP_JOBINSTANCE_UUID);
+        int jobStepID = data.getInt(JobConstants.PROP_JOBSTEP_SEQ_ID);
+        String command = data.getString(JobConstants.PROP_COMMAND);
+        KylinConfig config = engineConfig.getConfig();
+
+        try {
+            JobInstance jobInstance = updateJobStep(jobInstanceID, jobStepID, config, JobStepStatusEnum.RUNNING, System.currentTimeMillis(), null, null);
+
+            jobCmd = JobCommandFactory.getJobCommand(command, jobInstance, jobStepID, engineConfig);
+            data.put(JobConstants.PROP_JOB_CMD_EXECUTOR, jobCmd);
+            context.getScheduler().addJob(this.currentJobDetail, true, true);
+
+            ICommandOutput output = jobCmd.execute();
+
+            if (data.getBoolean(JobConstants.PROP_JOB_KILLED)) {
+                return;
+            }
+
+            int exitCode = output.getExitCode();
+            updateJobStep(jobInstanceID, jobStepID, config, output.getStatus(), null, System.currentTimeMillis(), output.getOutput());
+            context.setResult(exitCode);
+
+            log.info("Job status for " + context.getJobDetail().getKey() + " has been updated.");
+            log.info("cmd:" + command);
+            log.info("output:" + output.getOutput());
+            log.info("exitCode:" + exitCode);
+
+        } catch (Throwable t) {
+            handleException(jobInstanceID, jobStepID, config, t);
+        }
+    }
+
+    @Override
+    public void interrupt() throws UnableToInterruptJobException {
+    }
+
+    protected JobInstance updateJobStep(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, JobStepStatusEnum newStatus, Long execStartTime, Long execEndTime, String output) throws IOException {
+        // set step status to running
+        JobInstance jobInstance = JobDAO.getInstance(config).getJob(jobInstanceUuid);
+        JobStep currentStep = null;
+
+        try {
+            currentStep = jobInstance.getSteps().get(jobInstanceStepSeqId);
+            JobStepStatusEnum currentStatus = currentStep.getStatus();
+            boolean hasChange = false;
+
+            if (null != execStartTime) {
+                hasChange = true;
+                currentStep.setExecStartTime(execStartTime);
+            }
+            if (null != execEndTime) {
+                hasChange = true;
+                currentStep.setExecEndTime(execEndTime);
+            }
+            if (null != output) {
+                hasChange = true;
+                // currentStep.setCmdOutput(output);
+                JobDAO.getInstance(config).saveJobOutput(currentStep, output);
+            }
+            if (JobStepStatusEnum.WAITING == currentStatus && (JobStepStatusEnum.RUNNING == newStatus || JobStepStatusEnum.FINISHED == newStatus)) {
+                hasChange = true;
+                currentStep.setExecWaitTime((System.currentTimeMillis() - currentStep.getExecStartTime()) / 1000);
+            }
+            if (null != newStatus) {
+                hasChange = true;
+                currentStep.setStatus(newStatus);
+            }
+
+            if (hasChange) {
+                JobDAO.getInstance(config).updateJobInstance(jobInstance);
+            }
+        } catch (IOException e) {
+            log.error(e.getLocalizedMessage(), e);
+        }
+
+        if (null != execEndTime) {
+            JobEngine.JOB_DURATION.put(JobInstance.getStepIdentity(jobInstance, currentStep) + " - " + String.valueOf(currentStep.getExecStartTime()), (double) (currentStep.getExecEndTime() - currentStep.getExecStartTime()) / 1000);
+        }
+
+        return jobInstance;
+    }
+
+    protected void handleException(String jobInstanceUuid, int jobInstanceStepSeqId, KylinConfig config, Throwable t) {
+        log.error(t.getLocalizedMessage(), t);
+        String exceptionMsg = "Failed with Exception:" + ExceptionUtils.getFullStackTrace(t);
+        try {
+            JobDAO dao = JobDAO.getInstance(config);
+            JobInstance jobInstance = dao.getJob(jobInstanceUuid);
+            JobStep jobStep = jobInstance.getSteps().get(jobInstanceStepSeqId);
+            jobStep.setStatus(JobStepStatusEnum.ERROR);
+            jobStep.setExecEndTime(System.currentTimeMillis());
+            dao.updateJobInstance(jobInstance);
+
+            String output = dao.getJobOutput(jobInstanceUuid, jobInstanceStepSeqId).getOutput();
+            output = output + "\n" + exceptionMsg;
+            dao.saveJobOutput(jobInstanceUuid, jobInstanceStepSeqId, output);
+        } catch (IOException e1) {
+            log.error(e1.getLocalizedMessage(), e1);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
new file mode 100644
index 0000000..4cd59a9
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/AbstractHadoopJob.java
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.job.hadoop;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.persistence.ResourceStore;
+import com.kylinolap.common.util.StringSplitter;
+import com.kylinolap.cube.CubeInstance;
+import com.kylinolap.cube.CubeSegment;
+import com.kylinolap.job.JobInstance;
+import com.kylinolap.job.exception.JobException;
+import com.kylinolap.job.tools.OptionsHelper;
+import com.kylinolap.metadata.model.schema.TableDesc;
+
+@SuppressWarnings("static-access")
+public abstract class AbstractHadoopJob extends Configured implements Tool {
+    protected static final Logger log = LoggerFactory.getLogger(AbstractHadoopJob.class);
+
+    protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
+    protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
+    protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
+    protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
+    protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
+    protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat");
+    protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim");
+    protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output");
+    protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName("level").hasArg().isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create("level");
+    protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Partition file path.").create("input");
+    protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName("htable name").hasArg().isRequired(true).withDescription("HTable name").create("htablename");
+    protected static final Option OPTION_KEY_COLUMN_PERCENTAGE = OptionBuilder.withArgName("rowkey column percentage").hasArg().isRequired(true).withDescription("Percentage of row key columns").create("columnpercentage");
+    protected static final Option OPTION_KEY_SPLIT_NUMBER = OptionBuilder.withArgName("key split number").hasArg().isRequired(true).withDescription("Number of key split range").create("splitnumber");
+
+    protected String name;
+    protected String description;
+    protected boolean isAsync = false;
+    protected OptionsHelper optionsHelper = new OptionsHelper();
+
+    protected Job job;
+
+    protected void parseOptions(Options options, String[] args) throws ParseException {
+        optionsHelper.parseOptions(options, args);
+    }
+
+    public void printUsage(Options options) {
+        optionsHelper.printUsage(getClass().getSimpleName(), options);
+    }
+
+    public Option[] getOptions() {
+        return optionsHelper.getOptions();
+    }
+
+    public String getOptionsAsString() {
+        return optionsHelper.getOptionsAsString();
+    }
+
+    protected String getOptionValue(Option option) {
+        return optionsHelper.getOptionValue(option);
+    }
+
+    protected boolean hasOption(Option option) {
+        return optionsHelper.hasOption(option);
+    }
+
+    protected int waitForCompletion(Job job) throws IOException, InterruptedException, ClassNotFoundException {
+        int retVal = 0;
+        long start = System.nanoTime();
+
+        if (isAsync) {
+            job.submit();
+        } else {
+            job.waitForCompletion(true);
+            retVal = job.isSuccessful() ? 0 : 1;
+        }
+
+        log.debug("Job '" + job.getJobName() + "' finished " + (job.isSuccessful() ? "successfully in " : "with failures.  Time taken ") + StringUtils.formatTime((System.nanoTime() - start) / 1000000L));
+
+        return retVal;
+    }
+
+    protected static void runJob(Tool job, String[] args) {
+        try {
+            int exitCode = ToolRunner.run(job, args);
+            System.exit(exitCode);
+        } catch (Exception e) {
+            e.printStackTrace(System.err);
+            System.exit(5);
+        }
+    }
+
+    public void addInputDirs(String input, Job job) throws IOException {
+        for (String inp : StringSplitter.split(input, ",")) {
+            inp = inp.trim();
+            if (inp.endsWith("/*")) {
+                inp = inp.substring(0, inp.length() - 2);
+                FileSystem fs = FileSystem.get(job.getConfiguration());
+                Path path = new Path(inp);
+                FileStatus[] fileStatuses = fs.listStatus(path);
+                boolean hasDir = false;
+                for (FileStatus stat : fileStatuses) {
+                    if (stat.isDirectory()) {
+                        hasDir = true;
+                        addInputDirs(stat.getPath().toString(), job);
+                    }
+                }
+                if (fileStatuses.length > 0 && !hasDir) {
+                    addInputDirs(path.toString(), job);
+                }
+            } else {
+                System.out.println("Add input " + inp);
+                FileInputFormat.addInputPath(job, new Path(inp));
+            }
+        }
+    }
+
+    protected void attachKylinPropsAndMetadata(CubeInstance cube, Configuration conf) throws IOException {
+        File tmp = File.createTempFile("kylin_job_meta", "");
+        tmp.delete(); // we need a directory, so delete the file first
+
+        File metaDir = new File(tmp, "meta");
+        metaDir.mkdirs();
+        metaDir.getParentFile().deleteOnExit();
+
+        // write kylin.properties
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        File kylinPropsFile = new File(metaDir, "kylin.properties");
+        kylinConfig.writeProperties(kylinPropsFile);
+
+        // write cube / cube_desc / dict / table
+        ArrayList<String> dumpList = new ArrayList<String>();
+        dumpList.add(cube.getResourcePath());
+        dumpList.add(cube.getDescriptor().getResourcePath());
+        if (cube.isInvertedIndex()) {
+            dumpList.add(cube.getInvertedIndexDesc().getResourcePath());
+        }
+        for (TableDesc table : cube.getDescriptor().listTables()) {
+            dumpList.add(table.getResourcePath());
+        }
+
+        for (CubeSegment segment : cube.getSegments()) {
+            dumpList.addAll(segment.getDictionaryPaths());
+        }
+
+        dumpResources(kylinConfig, metaDir, dumpList);
+
+        // hadoop distributed cache
+        conf.set("tmpfiles", "file:///" + OptionsHelper.convertToFileURL(metaDir.getAbsolutePath()));
+    }
+
+    private void dumpResources(KylinConfig kylinConfig, File metaDir, ArrayList<String> dumpList) throws IOException {
+        ResourceStore from = ResourceStore.getStore(kylinConfig);
+        KylinConfig localConfig = KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath());
+        ResourceStore to = ResourceStore.getStore(localConfig);
+        for (String path : dumpList) {
+            InputStream in = from.getResource(path);
+            if (in == null)
+                throw new IllegalStateException("No resource found at -- " + path);
+            long ts = from.getResourceTimestamp(path);
+            to.putResource(path, in, ts);
+            log.info("Dumped resource " + path + " to " + metaDir.getAbsolutePath());
+        }
+    }
+
+    protected void deletePath(Configuration conf, Path path) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        if (fs.exists(path)) {
+            fs.delete(path, true);
+        }
+    }
+
+    protected double getTotalMapInputMB() throws ClassNotFoundException, IOException, InterruptedException, JobException {
+        if (job == null) {
+            throw new JobException("Job is null");
+        }
+
+        long mapInputBytes = 0;
+        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+        for (InputSplit split : input.getSplits(job)) {
+            mapInputBytes += split.getLength();
+        }
+        if (mapInputBytes == 0) {
+            throw new IllegalArgumentException("Map input splits are 0 bytes, something is wrong!");
+        }
+        double totalMapInputMB = (double) mapInputBytes / 1024 / 1024;
+        return totalMapInputMB;
+    }
+
+    protected int getMapInputSplitCount() throws ClassNotFoundException, JobException, IOException, InterruptedException {
+        if (job == null) {
+            throw new JobException("Job is null");
+        }
+        InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), job.getConfiguration());
+        return input.getSplits(job).size();
+    }
+
+    public static KylinConfig loadKylinPropsAndMetadata(Configuration conf) throws IOException {
+        File metaDir = new File("meta");
+        System.setProperty(KylinConfig.KYLIN_CONF, metaDir.getAbsolutePath());
+        System.out.println("The absolute path for meta dir is " + metaDir.getAbsolutePath());
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        kylinConfig.setMetadataUrl(metaDir.getCanonicalPath());
+        return kylinConfig;
+    }
+
+    public void kill() throws JobException {
+        if (job != null) {
+            try {
+                job.killJob();
+            } catch (IOException e) {
+                throw new JobException(e);
+            }
+        }
+    }
+
+    public Map<String, String> getInfo() throws JobException {
+        if (job != null) {
+            Map<String, String> status = new HashMap<String, String>();
+            if (null != job.getJobID()) {
+                status.put(JobInstance.MR_JOB_ID, job.getJobID().toString());
+            }
+            if (null != job.getTrackingURL()) {
+                status.put(JobInstance.YARN_APP_URL, job.getTrackingURL().toString());
+            }
+
+            return status;
+        } else {
+            throw new JobException("Job is null");
+        }
+    }
+
+    public Counters getCounters() throws JobException {
+        if (job != null) {
+            try {
+                return job.getCounters();
+            } catch (IOException e) {
+                throw new JobException(e);
+            }
+        } else {
+            throw new JobException("Job is null");
+        }
+    }
+
+    public void setAsync(boolean isAsync) {
+        this.isAsync = isAsync;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
new file mode 100644
index 0000000..e25ec36
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.job.hadoop.cardinality;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.kylinolap.common.hll.HyperLogLogPlusCounter;
+import com.kylinolap.cube.kv.RowConstants;
+
+/**
+ * @author Jack
+ * 
+ */
+public class ColumnCardinalityMapper<T> extends Mapper<T, Text, IntWritable, BytesWritable> {
+
+    private Map<Integer, HyperLogLogPlusCounter> hllcMap = new HashMap<Integer, HyperLogLogPlusCounter>();
+    public static final String DEFAULT_DELIM = ",";
+
+    @Override
+    public void map(T key, Text value, Context context) throws IOException, InterruptedException {
+        String delim = context.getConfiguration().get(HiveColumnCardinalityJob.KEY_INPUT_DELIM);
+        if (delim == null) {
+            delim = DEFAULT_DELIM;
+        }
+        String line = value.toString();
+        StringTokenizer tokenizer = new StringTokenizer(line, delim);
+        int i = 1;
+        while (tokenizer.hasMoreTokens()) {
+            String temp = tokenizer.nextToken();
+            getHllc(i).add(Bytes.toBytes(temp));
+            i++;
+        }
+    }
+
+    private HyperLogLogPlusCounter getHllc(Integer key) {
+        if (!hllcMap.containsKey(key)) {
+            hllcMap.put(key, new HyperLogLogPlusCounter());
+        }
+        return hllcMap.get(key);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        Iterator<Integer> it = hllcMap.keySet().iterator();
+        while (it.hasNext()) {
+            int key = it.next();
+            HyperLogLogPlusCounter hllc = hllcMap.get(key);
+            ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+            buf.clear();
+            hllc.writeRegisters(buf);
+            buf.flip();
+            context.write(new IntWritable(key), new BytesWritable(buf.array()));
+        }
+    }
+
+}