You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/06/28 11:00:24 UTC

[2/2] incubator-griffin git commit: refactor job structure to remove redundant class and api

refactor job structure to remove redundant class and api

1.refactor job structure
2.remove JobSchedule class to make job structure more clear
3.remove JobDataBean to make response more clear
4.remove "/jobs/config" api

Author: ahutsunshine <ah...@gmail.com>

Closes #320 from ahutsunshine/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/e1e7e3a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/e1e7e3a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/e1e7e3a9

Branch: refs/heads/master
Commit: e1e7e3a9704176dfd12ebc4d7f1f23f5837925eb
Parents: 81c3a3c
Author: ahutsunshine <ah...@gmail.com>
Authored: Thu Jun 28 19:00:16 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Thu Jun 28 19:00:16 2018 +0800

----------------------------------------------------------------------
 .../griffin/core/job/BatchJobOperatorImpl.java  |  52 ++-
 .../apache/griffin/core/job/JobController.java  |  24 +-
 .../apache/griffin/core/job/JobInstance.java    |  41 +-
 .../apache/griffin/core/job/JobOperator.java    |   8 +-
 .../org/apache/griffin/core/job/JobService.java |  10 +-
 .../apache/griffin/core/job/JobServiceImpl.java | 188 +++-----
 .../core/job/StreamingJobOperatorImpl.java      |  36 +-
 .../griffin/core/job/entity/AbstractJob.java    | 187 ++++++--
 .../griffin/core/job/entity/BatchJob.java       |  10 +
 .../griffin/core/job/entity/JobDataBean.java    | 110 -----
 .../griffin/core/job/entity/JobSchedule.java    | 188 --------
 .../griffin/core/job/entity/JobState.java       |  24 ++
 .../griffin/core/job/entity/StreamingJob.java   |   5 +
 .../griffin/core/job/entity/VirtualJob.java     |   8 +-
 .../griffin/core/job/repo/JobScheduleRepo.java  |  28 --
 .../src/main/resources/application.properties   |  11 +-
 .../griffin/core/job/JobControllerTest.java     | 391 ++++++++---------
 .../griffin/core/job/JobInstanceTest.java       | 429 +++++++++----------
 .../griffin/core/job/SparkSubmitJobTest.java    |   6 +-
 .../apache/griffin/core/util/EntityHelper.java  |  59 +--
 20 files changed, 794 insertions(+), 1021 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index 4d3b74e..7a6cf86 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -41,18 +41,23 @@ public class BatchJobOperatorImpl implements JobOperator {
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public JobSchedule add(JobSchedule js, GriffinMeasure measure) throws Exception {
-        validateParams(js, measure);
-        String qName = jobService.getQuartzName(js);
+    public AbstractJob add(AbstractJob job, GriffinMeasure measure) throws Exception {
+        validateParams(job, measure);
+        String qName = jobService.getQuartzName(job);
         String qGroup = jobService.getQuartzGroup();
         TriggerKey triggerKey = jobService.getTriggerKeyIfValid(qName, qGroup);
-        BatchJob batchJob = new BatchJob(js.getMeasureId(), js.getJobName(), qName, qGroup, false);
-        batchJob.setJobSchedule(js);
+        BatchJob batchJob = genBatchJobBean(job, qName, qGroup);
         batchJob = batchJobRepo.save(batchJob);
-        jobService.addJob(triggerKey, js, batchJob, BATCH);
-        JobSchedule jobSchedule = batchJob.getJobSchedule();
-        jobSchedule.setId(batchJob.getId());
-        return jobSchedule;
+        jobService.addJob(triggerKey, batchJob, BATCH);
+        return job;
+    }
+
+    private BatchJob genBatchJobBean(AbstractJob job, String qName, String qGroup) {
+        BatchJob batchJob = (BatchJob)job;
+        batchJob.setMetricName(job.getJobName());
+        batchJob.setGroup(qGroup);
+        batchJob.setName(qName);
+        return batchJob;
     }
 
     /**
@@ -107,17 +112,34 @@ public class BatchJobOperatorImpl implements JobOperator {
     }
 
     @Override
-    public JobState getState(AbstractJob job, JobDataBean bean, String action) throws SchedulerException {
+    public JobState getState(AbstractJob job, String action) throws SchedulerException {
         JobState jobState = new JobState();
         Scheduler scheduler = factory.getScheduler();
+        if (job.getGroup() == null || job.getName() == null) {
+            return null;
+        }
         TriggerKey triggerKey = triggerKey(job.getName(), job.getGroup());
         TriggerState triggerState = scheduler.getTriggerState(triggerKey);
         jobState.setState(triggerState.toString());
         jobState.setToStart(getStartStatus(triggerState));
         jobState.setToStop(getStopStatus(triggerState));
+        setTriggerTime(job, jobState);
         return jobState;
     }
 
+    private void setTriggerTime(AbstractJob job, JobState jobState) throws SchedulerException {
+        List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup());
+        // If triggers are empty, in Griffin it means job is completed whose trigger state is NONE or not scheduled.
+        if (CollectionUtils.isEmpty(triggers)) {
+            return;
+        }
+        Trigger trigger = triggers.get(0);
+        Date nextFireTime = trigger.getNextFireTime();
+        Date previousFireTime = trigger.getPreviousFireTime();
+        jobState.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1);
+        jobState.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1);
+    }
+
     /**
      * only PAUSED state of job can be started
      * @param state job state
@@ -237,18 +259,18 @@ public class BatchJobOperatorImpl implements JobOperator {
         return status;
     }
 
-    private void validateParams(JobSchedule js, GriffinMeasure measure) {
-        if (!jobService.isValidJobName(js.getJobName())) {
+    private void validateParams(AbstractJob job, GriffinMeasure measure) {
+        if (!jobService.isValidJobName(job.getJobName())) {
             throw new GriffinException.BadRequestException(INVALID_JOB_NAME);
         }
-        if (!isValidCronExpression(js.getCronExpression())) {
+        if (!isValidCronExpression(job.getCronExpression())) {
             throw new GriffinException.BadRequestException(INVALID_CRON_EXPRESSION);
         }
-        if (!isValidBaseLine(js.getSegments())) {
+        if (!isValidBaseLine(job.getSegments())) {
             throw new GriffinException.BadRequestException(MISSING_BASELINE_CONFIG);
         }
         List<String> names = getConnectorNames(measure);
-        if (!isValidConnectorNames(js.getSegments(), names)) {
+        if (!isValidConnectorNames(job.getSegments(), names)) {
             throw new GriffinException.BadRequestException(INVALID_CONNECTOR_NAME);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index 5f9b319..4dbaf9a 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -19,10 +19,9 @@ under the License.
 
 package org.apache.griffin.core.job;
 
-import org.apache.griffin.core.job.entity.JobDataBean;
+import org.apache.griffin.core.job.entity.AbstractJob;
 import org.apache.griffin.core.job.entity.JobHealth;
 import org.apache.griffin.core.job.entity.JobInstanceBean;
-import org.apache.griffin.core.job.entity.JobSchedule;
 import org.apache.griffin.core.util.FSUtil;
 import org.quartz.SchedulerException;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -44,29 +43,24 @@ public class JobController {
     private JobService jobService;
 
     @RequestMapping(value = "/jobs", method = RequestMethod.GET)
-    public List<JobDataBean> getJobs(@RequestParam(value = "type", defaultValue = "") String type) {
+    public List<AbstractJob> getJobs(@RequestParam(value = "type", defaultValue = "") String type) {
         return jobService.getAliveJobs(type);
     }
 
-    @RequestMapping(value = "/jobs/config", method = RequestMethod.GET)
-    public JobSchedule getJobSchedule(@RequestParam("jobName") String jobName) {
-        return jobService.getJobSchedule(jobName);
+    @RequestMapping(value = "/jobs", method = RequestMethod.POST)
+    @ResponseStatus(HttpStatus.CREATED)
+    public AbstractJob addJob(@RequestBody AbstractJob job) throws Exception {
+        return jobService.addJob(job);
     }
 
     @RequestMapping(value = "/jobs/config/{jobId}")
-    public JobSchedule getJobSchedule(@PathVariable("jobId") Long jobId) {
-        return jobService.getJobSchedule(jobId);
-    }
-
-    @RequestMapping(value = "/jobs", method = RequestMethod.POST)
-    @ResponseStatus(HttpStatus.CREATED)
-    public JobSchedule addJob(@RequestBody JobSchedule jobSchedule) throws Exception {
-        return jobService.addJob(jobSchedule);
+    public AbstractJob getJobConfig(@PathVariable("jobId") Long jobId) {
+        return jobService.getJobConfig(jobId);
     }
 
     @RequestMapping(value = "/jobs/{id}", method = RequestMethod.PUT)
     @ResponseStatus(HttpStatus.OK)
-    public JobDataBean onActions(@PathVariable("id") Long jobId, @RequestParam String action) throws Exception {
+    public AbstractJob onActions(@PathVariable("id") Long jobId, @RequestParam String action) throws Exception {
         return jobService.onAction(jobId,action);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index eacd5a2..23b69f3 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -63,6 +63,9 @@ public class JobInstance implements Job {
     public static final String PREDICATE_JOB_NAME = "predicateJobName";
     static final String JOB_NAME = "jobName";
     static final String PATH_CONNECTOR_CHARACTER = ",";
+    public static final String INTERVAL = "interval";
+    public static final String REPEAT = "repeat";
+    public static final String CHECK_DONEFILE_SCHEDULE = "checkdonefile.schedule";
 
     @Autowired
     private SchedulerFactoryBean factory;
@@ -75,7 +78,6 @@ public class JobInstance implements Job {
     @Autowired
     private Environment env;
 
-    private JobSchedule jobSchedule;
     private GriffinMeasure measure;
     private AbstractJob job;
     private List<SegmentPredicate> mPredicates;
@@ -88,7 +90,7 @@ public class JobInstance implements Job {
         try {
             initParam(context);
             setSourcesPartitionsAndPredicates(measure.getDataSources());
-            createJobInstance(jobSchedule.getConfigMap());
+            createJobInstance(job.getConfigMap());
         } catch (Exception e) {
             LOGGER.error("Create predicate job failure.", e);
         }
@@ -99,8 +101,7 @@ public class JobInstance implements Job {
         JobDetail jobDetail = context.getJobDetail();
         Long jobId = jobDetail.getJobDataMap().getLong(GRIFFIN_JOB_ID);
         job = jobRepo.findOne(jobId);
-        jobSchedule = job.getJobSchedule();
-        Long measureId = jobSchedule.getMeasureId();
+        Long measureId = job.getMeasureId();
         measure = measureRepo.findOne(measureId);
         setJobStartTime(jobDetail);
     }
@@ -117,7 +118,7 @@ public class JobInstance implements Job {
 
     private void setSourcesPartitionsAndPredicates(List<DataSource> sources) throws Exception {
         boolean isFirstBaseline = true;
-        for (JobDataSegment jds : jobSchedule.getSegments()) {
+        for (JobDataSegment jds : job.getSegments()) {
             if (jds.isBaseline() && isFirstBaseline) {
                 Long tsOffset = TimeUtil.str2Long(jds.getSegmentRange().getBegin());
                 measure.setTimestamp(jobStartTime + tsOffset);
@@ -234,9 +235,10 @@ public class JobInstance implements Job {
 
     @SuppressWarnings("unchecked")
     private void createJobInstance(Map<String, Object> confMap) throws Exception {
-        Map<String, Object> config = (Map<String, Object>) confMap.get("checkdonefile.schedule");
-        Long interval = TimeUtil.str2Long((String) config.get("interval"));
-        Integer repeat = Integer.valueOf(config.get("repeat").toString());
+        confMap = checkConfMap(confMap != null ? confMap : new HashMap<>());
+        Map<String, Object> config = (Map<String, Object>) confMap.get(CHECK_DONEFILE_SCHEDULE);
+        Long interval = TimeUtil.str2Long((String) config.get(INTERVAL));
+        Integer repeat = Integer.valueOf(config.get(REPEAT).toString());
         String groupName = "PG";
         String jobName = job.getJobName() + "_predicate_" + System.currentTimeMillis();
         TriggerKey tk = triggerKey(jobName, groupName);
@@ -247,6 +249,29 @@ public class JobInstance implements Job {
         createJobInstance(tk, interval, repeat, jobName);
     }
 
+    @SuppressWarnings("unchecked")
+    Map<String, Object> checkConfMap(Map<String, Object> confMap) {
+        Map<String, Object> config = (Map<String, Object>) confMap.get(CHECK_DONEFILE_SCHEDULE);
+        String interval = env.getProperty("predicate.job.interval");
+        interval = interval != null ? interval : "5m";
+        String repeat = env.getProperty("predicate.job.repeat.count");
+        repeat = repeat != null ? repeat : "12";
+        if (config == null) {
+            Map<String, Object> map = new HashMap<>();
+            map.put(INTERVAL, interval);
+            map.put(REPEAT, repeat);
+            confMap.put(CHECK_DONEFILE_SCHEDULE, map);
+        } else { // replace if interval or repeat is not null
+            String confRepeat = config.get(REPEAT).toString();
+            String confInterval = config.get(INTERVAL).toString();
+            interval = confInterval != null ? confInterval : interval;
+            repeat = confRepeat != null ? confRepeat : repeat;
+            config.put(INTERVAL, interval);
+            config.put(REPEAT, repeat);
+        }
+        return confMap;
+    }
+
     private void saveJobInstance(String pName, String pGroup) {
         ProcessType type = measure.getProcessType() == BATCH ? BATCH : STREAMING;
         Long tms = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
index 40f1c9e..7fdbe7d 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobOperator.java
@@ -19,12 +19,14 @@ under the License.
 
 package org.apache.griffin.core.job;
 
-import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.job.entity.AbstractJob;
+import org.apache.griffin.core.job.entity.JobHealth;
+import org.apache.griffin.core.job.entity.JobState;
 import org.apache.griffin.core.measure.entity.GriffinMeasure;
 import org.quartz.SchedulerException;
 
 public interface JobOperator {
-    JobSchedule add(JobSchedule js, GriffinMeasure measure) throws Exception;
+    AbstractJob add(AbstractJob job, GriffinMeasure measure) throws Exception;
 
     void start(AbstractJob job) throws Exception;
 
@@ -34,5 +36,5 @@ public interface JobOperator {
 
     JobHealth getHealth(JobHealth jobHealth, AbstractJob job) throws SchedulerException;
 
-    JobState getState(AbstractJob job, JobDataBean bean, String action) throws SchedulerException;
+    JobState getState(AbstractJob job, String action) throws SchedulerException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/JobService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java
index a2c4762..7539cea 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobService.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java
@@ -27,15 +27,13 @@ import java.util.List;
 
 public interface JobService {
 
-    List<JobDataBean> getAliveJobs(String type);
+    List<AbstractJob> getAliveJobs(String type);
 
-    JobSchedule getJobSchedule(String jobName);
+    AbstractJob addJob(AbstractJob js) throws Exception;
 
-    JobSchedule getJobSchedule(Long jobId);
+    AbstractJob getJobConfig(Long jobId);
 
-    JobSchedule addJob(JobSchedule js) throws Exception;
-
-    JobDataBean onAction(Long jobId,String action) throws Exception;
+    AbstractJob onAction(Long jobId,String action) throws Exception;
 
     void deleteJob(Long jobId) throws SchedulerException;
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 2fbc25b..6230d2a 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -19,50 +19,11 @@ under the License.
 
 package org.apache.griffin.core.job;
 
-import static java.util.TimeZone.getTimeZone;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_MEASURE_ID;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_NAME_DOES_NOT_EXIST;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_TYPE_DOES_NOT_SUPPORT;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.MEASURE_TYPE_DOES_NOT_SUPPORT;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.NO_SUCH_JOB_ACTION;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST;
-import static org.apache.griffin.core.job.entity.LivySessionStates.isActive;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.BUSY;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.DEAD;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.IDLE;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.NOT_STARTED;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.RECOVERING;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.RUNNING;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.STARTING;
-import static org.apache.griffin.core.job.entity.LivySessionStates.State.UNKNOWN;
-import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
-import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING;
-import static org.quartz.CronScheduleBuilder.cronSchedule;
-import static org.quartz.JobBuilder.newJob;
-import static org.quartz.JobKey.jobKey;
-import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
-import static org.quartz.TriggerBuilder.newTrigger;
-import static org.quartz.TriggerKey.triggerKey;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TimeZone;
-
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.commons.lang.StringUtils;
 import org.apache.griffin.core.exception.GriffinException;
-import org.apache.griffin.core.job.entity.AbstractJob;
-import org.apache.griffin.core.job.entity.BatchJob;
-import org.apache.griffin.core.job.entity.JobDataBean;
-import org.apache.griffin.core.job.entity.JobHealth;
-import org.apache.griffin.core.job.entity.JobInstanceBean;
-import org.apache.griffin.core.job.entity.JobSchedule;
-import org.apache.griffin.core.job.entity.JobState;
-import org.apache.griffin.core.job.entity.LivySessionStates;
+import org.apache.griffin.core.job.entity.*;
 import org.apache.griffin.core.job.entity.LivySessionStates.State;
-import org.apache.griffin.core.job.entity.StreamingJob;
 import org.apache.griffin.core.job.repo.BatchJobRepo;
 import org.apache.griffin.core.job.repo.JobInstanceRepo;
 import org.apache.griffin.core.job.repo.JobRepo;
@@ -72,15 +33,7 @@ import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
 import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
 import org.apache.griffin.core.util.JsonUtil;
 import org.apache.griffin.core.util.YarnNetUtil;
-import org.quartz.CronTrigger;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobKey;
-import org.quartz.Scheduler;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-import org.quartz.TriggerKey;
+import org.quartz.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -96,7 +49,23 @@ import org.springframework.web.client.HttpClientErrorException;
 import org.springframework.web.client.ResourceAccessException;
 import org.springframework.web.client.RestTemplate;
 
-import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+
+import static java.util.TimeZone.getTimeZone;
+import static org.apache.griffin.core.exception.GriffinExceptionMessage.*;
+import static org.apache.griffin.core.job.entity.LivySessionStates.State.*;
+import static org.apache.griffin.core.job.entity.LivySessionStates.isActive;
+import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
+import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.STREAMING;
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.JobKey.jobKey;
+import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
+import static org.quartz.TriggerBuilder.newTrigger;
+import static org.quartz.TriggerKey.triggerKey;
 
 @Service
 public class JobServiceImpl implements JobService {
@@ -135,7 +104,7 @@ public class JobServiceImpl implements JobService {
     }
 
     @Override
-    public List<JobDataBean> getAliveJobs(String type) {
+    public List<AbstractJob> getAliveJobs(String type) {
         List<? extends AbstractJob> jobs;
         if (BATCH_TYPE.equals(type)) {
             jobs = batchJobRepo.findByDeleted(false);
@@ -147,14 +116,13 @@ public class JobServiceImpl implements JobService {
         return getJobDataBeans(jobs);
     }
 
-    private List<JobDataBean> getJobDataBeans(List<? extends AbstractJob> jobs) {
-        List<JobDataBean> dataList = new ArrayList<>();
+    private List<AbstractJob> getJobDataBeans(List<? extends AbstractJob> jobs) {
+        List<AbstractJob> dataList = new ArrayList<>();
         try {
             for (AbstractJob job : jobs) {
-                JobDataBean jobData = genJobDataBean(job);
-                if (jobData != null) {
-                    dataList.add(jobData);
-                }
+                JobState jobState = genJobState(job);
+                job.setJobState(jobState);
+                dataList.add(job);
             }
         } catch (SchedulerException e) {
             LOGGER.error("Failed to get RUNNING jobs.", e);
@@ -164,38 +132,21 @@ public class JobServiceImpl implements JobService {
     }
 
     @Override
-    public JobSchedule getJobSchedule(String jobName) {
-        List<AbstractJob> jobs = jobRepo.findByJobNameAndDeleted(jobName, false);
-        if (jobs.size() == 0) {
-            LOGGER.warn("Job name {} does not exist.", jobName);
-            throw new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST);
-        }
-        AbstractJob job = jobs.get(0);
-        return getJobSchedule(job);
+    public AbstractJob addJob(AbstractJob job) throws Exception {
+        Long measureId = job.getMeasureId();
+        GriffinMeasure measure = getMeasureIfValid(measureId);
+        JobOperator op = getJobOperator(measure.getProcessType());
+        return op.add(job, measure);
     }
 
     @Override
-    public JobSchedule getJobSchedule(Long jobId) {
+    public AbstractJob getJobConfig(Long jobId) {
         AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
         if (job == null) {
             LOGGER.warn("Job id {} does not exist.", jobId);
             throw new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST);
         }
-        return getJobSchedule(job);
-    }
-
-    private JobSchedule getJobSchedule(AbstractJob job) {
-        JobSchedule jobSchedule = job.getJobSchedule();
-        jobSchedule.setId(job.getId());
-        return jobSchedule;
-    }
-
-    @Override
-    public JobSchedule addJob(JobSchedule js) throws Exception {
-        Long measureId = js.getMeasureId();
-        GriffinMeasure measure = getMeasureIfValid(measureId);
-        JobOperator op = getJobOperator(measure.getProcessType());
-        return op.add(js, measure);
+        return job;
     }
 
     /**
@@ -203,12 +154,14 @@ public class JobServiceImpl implements JobService {
      * @param action job operation: start job, stop job
      */
     @Override
-    public JobDataBean onAction(Long jobId, String action) throws Exception {
+    public AbstractJob onAction(Long jobId, String action) throws Exception {
         AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
         validateJobExist(job);
         JobOperator op = getJobOperator(job);
         doAction(action, job, op);
-        return genJobDataBean(job, action);
+        JobState jobState = genJobState(job, action);
+        job.setJobState(jobState);
+        return job;
     }
 
     private void doAction(String action, AbstractJob job, JobOperator op) throws Exception {
@@ -350,65 +303,33 @@ public class JobServiceImpl implements JobService {
     }
 
     List<? extends Trigger> getTriggers(String name, String group) throws SchedulerException {
+        if (name == null || group == null) {
+            return null;
+        }
         JobKey jobKey = new JobKey(name, group);
         Scheduler scheduler = factory.getScheduler();
         return scheduler.getTriggersOfJob(jobKey);
     }
 
-    private JobDataBean genJobDataBean(AbstractJob job, String action) throws SchedulerException {
-        if (job.getName() == null || job.getGroup() == null) {
-            return null;
-        }
-        JobDataBean jobData = new JobDataBean();
-        List<? extends Trigger> triggers = getTriggers(job.getName(), job.getGroup());
-        /* If triggers are empty, in Griffin it means job is not scheduled or completed whose trigger state is NONE. */
-        if (CollectionUtils.isEmpty(triggers) && job instanceof BatchJob) {
-            return null;
-        }
-        setTriggerTime(triggers, jobData);
+    private JobState genJobState(AbstractJob job, String action) throws SchedulerException {
         JobOperator op = getJobOperator(job);
-        JobState state = op.getState(job, jobData, action);
-        jobData.setJobState(state);
-        jobData.setJobId(job.getId());
-        jobData.setJobName(job.getJobName());
-        jobData.setMeasureId(job.getMeasureId());
-        jobData.setCronExpression(getCronExpression(triggers));
-        jobData.setProcessType(job instanceof BatchJob ? BATCH : STREAMING);
-        return jobData;
-    }
-
-    private JobDataBean genJobDataBean(AbstractJob job) throws SchedulerException {
-        return genJobDataBean(job, null);
+        JobState state = op.getState(job, action);
+        job.setJobState(state);
+        return state;
     }
 
-    private void setTriggerTime(List<? extends Trigger> triggers, JobDataBean jobBean) {
-        if (CollectionUtils.isEmpty(triggers)) {
-            return;
-        }
-        Trigger trigger = triggers.get(0);
-        Date nextFireTime = trigger.getNextFireTime();
-        Date previousFireTime = trigger.getPreviousFireTime();
-        jobBean.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1);
-        jobBean.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1);
+    private JobState genJobState(AbstractJob job) throws SchedulerException {
+        return genJobState(job, null);
     }
 
-    private String getCronExpression(List<? extends Trigger> triggers) {
-        for (Trigger trigger : triggers) {
-            if (trigger instanceof CronTrigger) {
-                return ((CronTrigger) trigger).getCronExpression();
-            }
-        }
-        return null;
-    }
-
-    void addJob(TriggerKey tk, JobSchedule js, AbstractJob job, ProcessType type) throws Exception {
+    void addJob(TriggerKey tk, AbstractJob job, ProcessType type) throws Exception {
         JobDetail jobDetail = addJobDetail(tk, job);
-        Trigger trigger = genTriggerInstance(tk, jobDetail, js, type);
+        Trigger trigger = genTriggerInstance(tk, jobDetail, job, type);
         factory.getScheduler().scheduleJob(trigger);
     }
 
-    String getQuartzName(JobSchedule js) {
-        return js.getJobName() + "_" + System.currentTimeMillis();
+    String getQuartzName(AbstractJob job) {
+        return job.getJobName() + "_" + System.currentTimeMillis();
     }
 
     String getQuartzGroup() {
@@ -438,11 +359,11 @@ public class JobServiceImpl implements JobService {
         return measure;
     }
 
-    private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, JobSchedule js, ProcessType type) {
+    private Trigger genTriggerInstance(TriggerKey tk, JobDetail jd, AbstractJob job, ProcessType type) {
         TriggerBuilder builder = newTrigger().withIdentity(tk).forJob(jd);
         if (type == BATCH) {
-            TimeZone timeZone = getTimeZone(js.getTimeZone());
-            return builder.withSchedule(cronSchedule(js.getCronExpression()).inTimeZone(timeZone)).build();
+            TimeZone timeZone = getTimeZone(job.getTimeZone());
+            return builder.withSchedule(cronSchedule(job.getCronExpression()).inTimeZone(timeZone)).build();
         } else if (type == STREAMING) {
             return builder.startNow().withSchedule(simpleSchedule().withRepeatCount(0)).build();
         }
@@ -540,8 +461,9 @@ public class JobServiceImpl implements JobService {
 
     /**
      * Check instance status in case that session id is overdue and app id is null and so we cannot update instance state.
+     *
      * @param instance job instance bean
-     * @param e HttpClientErrorException
+     * @param e        HttpClientErrorException
      * @return boolean
      */
     private boolean checkStatus(JobInstanceBean instance, HttpClientErrorException e) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
index b4991e3..1ced023 100644
--- a/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/StreamingJobOperatorImpl.java
@@ -81,18 +81,23 @@ public class StreamingJobOperatorImpl implements JobOperator {
 
     @Override
     @Transactional(rollbackFor = Exception.class)
-    public JobSchedule add(JobSchedule js, GriffinMeasure measure) throws Exception {
-        validateParams(js);
-        String qName = jobService.getQuartzName(js);
+    public AbstractJob add(AbstractJob job, GriffinMeasure measure) throws Exception {
+        validateParams(job);
+        String qName = jobService.getQuartzName(job);
         String qGroup = jobService.getQuartzGroup();
         TriggerKey triggerKey = jobService.getTriggerKeyIfValid(qName, qGroup);
-        StreamingJob streamingJob = new StreamingJob(js.getMeasureId(), js.getJobName(), qName, qGroup, false);
-        streamingJob.setJobSchedule(js);
+        StreamingJob streamingJob = genStreamingJobBean(job, qName, qGroup);
         streamingJob = streamingJobRepo.save(streamingJob);
-        jobService.addJob(triggerKey, js, streamingJob, STREAMING);
-        JobSchedule jobSchedule = streamingJob.getJobSchedule();
-        jobSchedule.setId(streamingJob.getId());
-        return jobSchedule;
+        jobService.addJob(triggerKey,streamingJob, STREAMING);
+        return streamingJob;
+    }
+
+    private StreamingJob genStreamingJobBean(AbstractJob job, String qName, String qGroup) {
+        StreamingJob streamingJob = (StreamingJob)job;
+        streamingJob.setMetricName(job.getJobName());
+        streamingJob.setGroup(qGroup);
+        streamingJob.setName(qName);
+        return streamingJob;
     }
 
     /**
@@ -107,14 +112,13 @@ public class StreamingJobOperatorImpl implements JobOperator {
         StreamingJob streamingJob = (StreamingJob) job;
         verifyJobState(streamingJob);
         streamingJob = streamingJobRepo.save(streamingJob);
-        JobSchedule js = streamingJob.getJobSchedule();
-        String qName = jobService.getQuartzName(js);
+        String qName = jobService.getQuartzName(job);
         String qGroup = jobService.getQuartzGroup();
         TriggerKey triggerKey = triggerKey(qName, qGroup);
-        jobService.addJob(triggerKey, js, streamingJob, STREAMING);
+        jobService.addJob(triggerKey,streamingJob, STREAMING);
     }
 
-    private void verifyJobState(StreamingJob job) throws SchedulerException {
+    private void verifyJobState(AbstractJob job) throws SchedulerException {
         /* Firstly you should check whether job is scheduled. If it is scheduled, triggers are empty. */
         List<? extends Trigger> triggers = jobService.getTriggers(job.getName(), job.getGroup());
         if (!CollectionUtils.isEmpty(triggers)) {
@@ -157,7 +161,7 @@ public class StreamingJobOperatorImpl implements JobOperator {
     }
 
     @Override
-    public JobState getState(AbstractJob job, JobDataBean bean, String action) {
+    public JobState getState(AbstractJob job, String action) {
         JobState jobState = new JobState();
         List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId());
         for (JobInstanceBean instance : instances) {
@@ -256,8 +260,8 @@ public class StreamingJobOperatorImpl implements JobOperator {
     }
 
 
-    private void validateParams(JobSchedule js) {
-        if (!jobService.isValidJobName(js.getJobName())) {
+    private void validateParams(AbstractJob job) {
+        if (!jobService.isValidJobName(job.getJobName())) {
             throw new GriffinException.BadRequestException(INVALID_JOB_NAME);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
index 0a7388b..df5bb06 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
@@ -19,19 +19,33 @@ under the License.
 
 package org.apache.griffin.core.job.entity;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.*;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
 import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+import org.apache.griffin.core.util.JsonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.persistence.*;
+import javax.validation.constraints.NotNull;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 @Entity
 @Table(name = "job")
 @Inheritance(strategy = InheritanceType.SINGLE_TABLE)
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "job.type")
+@JsonSubTypes({@JsonSubTypes.Type(value = BatchJob.class, name = "batch"), @JsonSubTypes.Type(value = StreamingJob.class, name = "streaming"), @JsonSubTypes.Type(value = VirtualJob.class, name = "virtual")})
 @DiscriminatorColumn(name = "type")
 public abstract class AbstractJob extends AbstractAuditableEntity {
     private static final long serialVersionUID = 7569493377868453677L;
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJob.class);
+
     protected Long measureId;
 
     protected String jobName;
@@ -39,33 +53,45 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
     protected String metricName;
 
     @Column(name = "quartz_job_name")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private String name;
 
     @Column(name = "quartz_group_name")
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private String group;
 
     @JsonIgnore
     protected boolean deleted = false;
 
-    @OneToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
-    @JoinColumn(name = "job_schedule_id")
-    private JobSchedule jobSchedule;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private String cronExpression;
 
-    AbstractJob() {
-    }
+    @Transient
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private JobState jobState;
 
-    AbstractJob(Long measureId, String jobName, String name, String group, boolean deleted) {
-        this.measureId = measureId;
-        this.jobName = jobName;
-        this.name = name;
-        this.group = group;
-        this.deleted = deleted;
+    @NotNull
+    private String timeZone;
+
+    @JsonIgnore
+    private String predicateConfig;
+
+    @Transient
+    private Map<String, Object> configMap;
+
+    @NotNull
+    @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+    @JoinColumn(name = "job_id")
+    private List<JobDataSegment> segments = new ArrayList<>();
+
+    @JsonProperty("measure.id")
+    public Long getMeasureId() {
+        return measureId;
     }
 
-    AbstractJob(String jobName, Long measureId, String metricName) {
-        this.jobName = jobName;
+    @JsonProperty("measure.id")
+    public void setMeasureId(Long measureId) {
         this.measureId = measureId;
-        this.metricName = metricName;
     }
 
     @JsonProperty("job.name")
@@ -75,9 +101,71 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
 
     @JsonProperty("job.name")
     public void setJobName(String jobName) {
+        if (StringUtils.isEmpty(jobName)) {
+            LOGGER.warn("Job name cannot be empty.");
+            throw new NullPointerException();
+        }
         this.jobName = jobName;
     }
 
+    @JsonProperty("cron.expression")
+    public String getCronExpression() {
+        return cronExpression;
+    }
+
+    @JsonProperty("cron.expression")
+    public void setCronExpression(String cronExpression) {
+        this.cronExpression = cronExpression;
+    }
+
+    @JsonProperty("job.state")
+    public JobState getJobState() {
+        return jobState;
+    }
+
+    @JsonProperty("job.state")
+    public void setJobState(JobState jobState) {
+        this.jobState = jobState;
+    }
+
+    @JsonProperty("cron.time.zone")
+    public String getTimeZone() {
+        return timeZone;
+    }
+
+    @JsonProperty("cron.time.zone")
+    public void setTimeZone(String timeZone) {
+        this.timeZone = timeZone;
+    }
+
+    @JsonProperty("data.segments")
+    public List<JobDataSegment> getSegments() {
+        return segments;
+    }
+
+    @JsonProperty("data.segments")
+    public void setSegments(List<JobDataSegment> segments) {
+        this.segments = segments;
+    }
+
+    @JsonProperty("predicate.config")
+    public Map<String, Object> getConfigMap() {
+        return configMap;
+    }
+
+    @JsonProperty("predicate.config")
+    public void setConfigMap(Map<String, Object> configMap) {
+        this.configMap = configMap;
+    }
+
+    private String getPredicateConfig() {
+        return predicateConfig;
+    }
+
+    private void setPredicateConfig(String config) {
+        this.predicateConfig = config;
+    }
+
     @JsonProperty("metric.name")
     public String getMetricName() {
         return metricName;
@@ -88,16 +176,6 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
         this.metricName = metricName;
     }
 
-    @JsonProperty("measure.id")
-    public Long getMeasureId() {
-        return measureId;
-    }
-
-    @JsonProperty("measure.id")
-    public void setMeasureId(Long measureId) {
-        this.measureId = measureId;
-    }
-
     public boolean isDeleted() {
         return deleted;
     }
@@ -106,16 +184,6 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
         this.deleted = deleted;
     }
 
-    @JsonProperty("job.config")
-    public JobSchedule getJobSchedule() {
-        return jobSchedule;
-    }
-
-    @JsonProperty("job.config")
-    public void setJobSchedule(JobSchedule jobSchedule) {
-        this.jobSchedule = jobSchedule;
-    }
-
     @JsonProperty("quartz.name")
     public String getName() {
         return name;
@@ -135,4 +203,51 @@ public abstract class AbstractJob extends AbstractAuditableEntity {
     public void setGroup(String quartzGroup) {
         this.group = quartzGroup;
     }
+
+    @JsonProperty("job.type")
+    public abstract String getType();
+
+    @PrePersist
+    @PreUpdate
+    public void save() throws JsonProcessingException {
+        if (configMap != null) {
+            this.predicateConfig = JsonUtil.toJson(configMap);
+        }
+    }
+
+    @PostLoad
+    public void load() throws IOException {
+        if (!StringUtils.isEmpty(predicateConfig)) {
+            this.configMap = JsonUtil.toEntity(predicateConfig, new TypeReference<Map<String, Object>>() {
+            });
+        }
+    }
+
+    AbstractJob() {
+    }
+
+    AbstractJob(Long measureId, String jobName, String name, String group, boolean deleted) {
+        this.measureId = measureId;
+        this.jobName = jobName;
+        this.name = name;
+        this.group = group;
+        this.deleted = deleted;
+    }
+
+    AbstractJob(Long measureId, String jobName, String cronExpression, String timeZone,List<JobDataSegment> segments, boolean deleted) {
+        this.measureId = measureId;
+        this.jobName = jobName;
+        this.metricName = jobName;
+        this.cronExpression = cronExpression;
+        this.timeZone = timeZone;
+        this.segments = segments;
+        this.deleted = deleted;
+    }
+
+
+    AbstractJob(String jobName, Long measureId, String metricName) {
+        this.jobName = jobName;
+        this.measureId = measureId;
+        this.metricName = metricName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java
index 54f938d..bdaebb1 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/BatchJob.java
@@ -21,10 +21,17 @@ package org.apache.griffin.core.job.entity;
 
 import javax.persistence.DiscriminatorValue;
 import javax.persistence.Entity;
+import java.util.List;
 
 @Entity
 @DiscriminatorValue("griffinBatchJob")
 public class BatchJob extends AbstractJob {
+
+    @Override
+    public String getType() {
+        return "batch";
+    }
+
     public BatchJob() {
         super();
     }
@@ -39,5 +46,8 @@ public class BatchJob extends AbstractJob {
         setId(jobId);
     }
 
+    public BatchJob(Long measureId, String jobName, String cronExpression, String timeZone, List<JobDataSegment> segments, boolean deleted) {
+        super(measureId, jobName, cronExpression, timeZone,segments, deleted);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
deleted file mode 100644
index d97dc52..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job.entity;
-
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
-
-public class JobDataBean {
-
-    private Long jobId;
-
-    private String jobName;
-
-    private Long measureId;
-
-    private JobState jobState;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    private Long nextFireTime;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    private Long previousFireTime;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    private String cronExpression;
-
-    private ProcessType processType;
-
-    public Long getJobId() {
-        return jobId;
-    }
-
-    public void setJobId(Long jobId) {
-        this.jobId = jobId;
-    }
-
-    public String getJobName() {
-        return jobName;
-    }
-
-    public void setJobName(String jobName) {
-        this.jobName = jobName;
-    }
-
-    public Long getMeasureId() {
-        return measureId;
-    }
-
-    public void setMeasureId(Long measureId) {
-        this.measureId = measureId;
-    }
-
-    public JobState getJobState() {
-        return jobState;
-    }
-
-    public void setJobState(JobState jobState) {
-        this.jobState = jobState;
-    }
-
-    public Long getNextFireTime() {
-        return nextFireTime;
-    }
-
-    public void setNextFireTime(Long nextFireTime) {
-        this.nextFireTime = nextFireTime;
-    }
-
-    public Long getPreviousFireTime() {
-        return previousFireTime;
-    }
-
-    public void setPreviousFireTime(Long previousFireTime) {
-        this.previousFireTime = previousFireTime;
-    }
-
-    public String getCronExpression() {
-        return cronExpression;
-    }
-
-    public void setCronExpression(String cronExpression) {
-        this.cronExpression = cronExpression;
-    }
-
-    public ProcessType getProcessType() {
-        return processType;
-    }
-
-    public void setProcessType(ProcessType processType) {
-        this.processType = processType;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
deleted file mode 100644
index 80383d5..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job.entity;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.commons.lang.StringUtils;
-import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
-import org.apache.griffin.core.util.JsonUtil;
-import org.apache.griffin.core.util.PropertiesUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.core.io.ClassPathResource;
-
-import javax.persistence.*;
-import javax.validation.constraints.NotNull;
-import java.io.IOException;
-import java.util.*;
-
-@Entity
-public class JobSchedule extends AbstractAuditableEntity {
-
-	private static final long serialVersionUID = -267127340001680745L;
-
-	private static final Logger LOGGER = LoggerFactory.getLogger(JobSchedule.class);
-
-    @NotNull
-    private Long measureId;
-
-    @NotNull
-    private String jobName;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    private String cronExpression;
-
-    @Transient
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    private JobState jobState;
-
-    @NotNull
-    private String timeZone;
-
-    @JsonIgnore
-    private String predicateConfig;
-
-    @Transient
-    private Map<String, Object> configMap = defaultPredicatesConfig();
-
-    @NotNull
-    @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
-    @JoinColumn(name = "job_schedule_id")
-    private List<JobDataSegment> segments = new ArrayList<>();
-
-    @JsonProperty("measure.id")
-    public Long getMeasureId() {
-        return measureId;
-    }
-
-    @JsonProperty("measure.id")
-    public void setMeasureId(Long measureId) {
-        this.measureId = measureId;
-    }
-
-    @JsonProperty("job.name")
-    public String getJobName() {
-        return jobName;
-    }
-
-    @JsonProperty("job.name")
-    public void setJobName(String jobName) {
-        if (StringUtils.isEmpty(jobName)) {
-            LOGGER.warn("Job name cannot be empty.");
-            throw new NullPointerException();
-        }
-        this.jobName = jobName;
-    }
-
-    @JsonProperty("cron.expression")
-    public String getCronExpression() {
-        return cronExpression;
-    }
-
-    @JsonProperty("cron.expression")
-    public void setCronExpression(String cronExpression) {
-        this.cronExpression = cronExpression;
-    }
-
-    public JobState getJobState() {
-        return jobState;
-    }
-
-    public void setJobState(JobState jobState) {
-        this.jobState = jobState;
-    }
-
-    @JsonProperty("cron.time.zone")
-    public String getTimeZone() {
-        return timeZone;
-    }
-
-    @JsonProperty("cron.time.zone")
-    public void setTimeZone(String timeZone) {
-        this.timeZone = timeZone;
-    }
-
-    @JsonProperty("data.segments")
-    public List<JobDataSegment> getSegments() {
-        return segments;
-    }
-
-    @JsonProperty("data.segments")
-    public void setSegments(List<JobDataSegment> segments) {
-        this.segments = segments;
-    }
-
-    @JsonProperty("predicate.config")
-    public Map<String, Object> getConfigMap() {
-        return configMap;
-    }
-
-    @JsonProperty("predicate.config")
-    public void setConfigMap(Map<String, Object> configMap) {
-        this.configMap = configMap;
-    }
-
-    @PrePersist
-    @PreUpdate
-    public void save() throws JsonProcessingException {
-        if (configMap != null) {
-            this.predicateConfig = JsonUtil.toJson(configMap);
-        }
-    }
-
-    @PostLoad
-    public void load() throws IOException {
-        if (!StringUtils.isEmpty(predicateConfig)) {
-            this.configMap = JsonUtil.toEntity(predicateConfig, new TypeReference<Map<String, Object>>() {
-            });
-        }
-    }
-
-    /**
-     * @return set default predicate config
-     * @throws JsonProcessingException json exception
-     */
-    private Map<String, Object> defaultPredicatesConfig() throws JsonProcessingException {
-        String path = "/application.properties";
-        Properties appConf = PropertiesUtil.getProperties(path, new ClassPathResource(path));
-        Map<String, Object> scheduleConf = new HashMap<>();
-        Map<String, Object> map = new HashMap<>();
-        map.put("interval", appConf.getProperty("predicate.job.interval"));
-        map.put("repeat", appConf.getProperty("predicate.job.repeat.count"));
-        scheduleConf.put("checkdonefile.schedule", map);
-        this.predicateConfig = JsonUtil.toJson(scheduleConf);
-        return scheduleConf;
-    }
-
-    public JobSchedule() throws JsonProcessingException {
-    }
-
-    public JobSchedule(Long measureId, String jobName, String cronExpression, String timeZone, List<JobDataSegment> segments) throws JsonProcessingException {
-        this.measureId = measureId;
-        this.jobName = jobName;
-        this.cronExpression = cronExpression;
-        this.timeZone = timeZone;
-        this.segments = segments;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java
index 8219af2..b8f06ef 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobState.java
@@ -19,6 +19,8 @@ under the License.
 
 package org.apache.griffin.core.job.entity;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
+
 /**
  * Encapsulating job scheduler state to reduce job startup and stop logical processing
  */
@@ -39,6 +41,12 @@ public class JobState {
      */
     private boolean toStop = false;
 
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private Long nextFireTime;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private Long previousFireTime;
+
     public String getState() {
         return state;
     }
@@ -62,4 +70,20 @@ public class JobState {
     public void setToStop(boolean toStop) {
         this.toStop = toStop;
     }
+
+    public Long getNextFireTime() {
+        return nextFireTime;
+    }
+
+    public void setNextFireTime(Long nextFireTime) {
+        this.nextFireTime = nextFireTime;
+    }
+
+    public Long getPreviousFireTime() {
+        return previousFireTime;
+    }
+
+    public void setPreviousFireTime(Long previousFireTime) {
+        this.previousFireTime = previousFireTime;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java
index 70a4acc..cecf7c2 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/StreamingJob.java
@@ -26,6 +26,11 @@ import javax.persistence.Entity;
 @DiscriminatorValue("griffinStreamingJob")
 public class StreamingJob extends AbstractJob {
 
+    @Override
+    public String getType() {
+        return "streaming";
+    }
+
     public StreamingJob() {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
index 8e2dbd7..e4dcd4e 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
@@ -25,10 +25,14 @@ import javax.persistence.Entity;
 @Entity
 @DiscriminatorValue("virtualJob")
 public class VirtualJob extends AbstractJob {
+    private static final long serialVersionUID = 1130038058433818835L;
 
-	private static final long serialVersionUID = 1130038058433818835L;
+    @Override
+    public String getType() {
+        return "virtual";
+    }
 
-	public VirtualJob() {
+    public VirtualJob() {
         super();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
deleted file mode 100644
index 49e5db9..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job.repo;
-
-import org.apache.griffin.core.job.entity.JobSchedule;
-import org.springframework.data.repository.CrudRepository;
-
-public interface JobScheduleRepo extends CrudRepository<JobSchedule, Long> {
-
-    JobSchedule findByJobName(String jobName);
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties
index 92eb524..d77cbcf 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -17,13 +17,16 @@
 # under the License.
 #
 
-spring.datasource.url = jdbc:postgresql://localhost:5432/quartz?autoReconnect=true&useSSL=false
+spring.datasource.url = jdbc:h2:mem:quartz;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
 spring.datasource.username = griffin
 spring.datasource.password = 123456
-spring.jpa.generate-ddl=true
-
-spring.datasource.driver-class-name = org.postgresql.Driver
+spring.jpa.generate-ddl = true
 
+spring.datasource.driver-class-name = org.h2.Driver
+# Create quartz table structure
+spring.datasource.schema = classpath:init_quartz_h2.sql
+# enable h2 console, default path: http://localhost:8080/h2-console/
+spring.h2.console.enabled = true
 spring.jpa.show-sql = true
 
 # Hive metastore

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e1e7e3a9/service/src/test/java/org/apache/griffin/core/job/JobControllerTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/job/JobControllerTest.java b/service/src/test/java/org/apache/griffin/core/job/JobControllerTest.java
index 0a430f2..d2ec983 100644
--- a/service/src/test/java/org/apache/griffin/core/job/JobControllerTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/JobControllerTest.java
@@ -1,195 +1,196 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job;
-
-import org.apache.griffin.core.exception.GriffinException;
-import org.apache.griffin.core.exception.GriffinExceptionHandler;
-import org.apache.griffin.core.exception.GriffinExceptionMessage;
-import org.apache.griffin.core.job.entity.*;
-import org.apache.griffin.core.util.JsonUtil;
-import org.apache.griffin.core.util.URLHelper;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.springframework.http.MediaType;
-import org.springframework.test.context.junit4.SpringRunner;
-import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.setup.MockMvcBuilders;
-
-import java.util.Arrays;
-import java.util.Collections;
-
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST;
-import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_NAME_DOES_NOT_EXIST;
-import static org.apache.griffin.core.util.EntityHelper.createJobSchedule;
-import static org.hamcrest.CoreMatchers.is;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doThrow;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
-import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
-@RunWith(SpringRunner.class)
-public class JobControllerTest {
-
-    private MockMvc mvc;
-
-    @Mock
-    private JobServiceImpl service;
-
-    @InjectMocks
-    private JobController controller;
-
-    @Before
-    public void setup() {
-        mvc = MockMvcBuilders
-                .standaloneSetup(controller)
-                .setControllerAdvice(new GriffinExceptionHandler())
-                .build();
-    }
-
-
-    @Test
-    public void testGetJobs() throws Exception {
-        JobDataBean jobBean = new JobDataBean();
-        jobBean.setJobName("job_name");
-        given(service.getAliveJobs("")).willReturn(Collections.singletonList(jobBean));
-
-        mvc.perform(get(URLHelper.API_VERSION_PATH + "/jobs").contentType(MediaType.APPLICATION_JSON))
-                .andExpect(status().isOk())
-                .andExpect(jsonPath("$.[0].jobName", is("job_name")));
-    }
-
-    @Test
-    public void testAddJobForSuccess() throws Exception {
-        JobSchedule jobSchedule = createJobSchedule();
-        jobSchedule.setId(1L);
-//        given(service.addJob(jobSchedule)).willReturn(jobSchedule);
-
-        mvc.perform(post(URLHelper.API_VERSION_PATH + "/jobs")
-                .contentType(MediaType.APPLICATION_JSON)
-                .content(JsonUtil.toJson(jobSchedule)))
-                .andExpect(status().isCreated());
-    }
-
-    @Test
-    public void testAddJobForFailureWithBadRequest() throws Exception {
-        JobSchedule jobSchedule = createJobSchedule();
-        given(service.addJob(jobSchedule))
-                .willThrow(new GriffinException.BadRequestException(GriffinExceptionMessage.MISSING_METRIC_NAME));
-
-        mvc.perform(post(URLHelper.API_VERSION_PATH + "/jobs")
-                .contentType(MediaType.APPLICATION_JSON)
-                .content(JsonUtil.toJson(jobSchedule)))
-                .andExpect(status().isBadRequest());
-    }
-
-    @Test
-    public void testAddJobForFailureWithTriggerKeyExist() throws Exception {
-        JobSchedule jobSchedule = createJobSchedule();
-        given(service.addJob(jobSchedule))
-                .willThrow(new GriffinException.ConflictException(GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST));
-
-        mvc.perform(post(URLHelper.API_VERSION_PATH + "/jobs")
-                .contentType(MediaType.APPLICATION_JSON)
-                .content(JsonUtil.toJson(jobSchedule)))
-                .andExpect(status().isConflict());
-    }
-
-    @Test
-    public void testDeleteJobByIdForSuccess() throws Exception {
-        doNothing().when(service).deleteJob(1L);
-
-        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs/1"))
-                .andExpect(status().isNoContent());
-    }
-
-    @Test
-    public void testDeleteJobByIdForFailureWithNotFound() throws Exception {
-        doThrow(new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST)).when(service).deleteJob(1L);
-
-        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs/1"))
-                .andExpect(status().isNotFound());
-    }
-
-    @Test
-    public void testDeleteJobByIdForFailureWithException() throws Exception {
-        doThrow(new GriffinException.ServiceException("Failed to delete job", new Exception()))
-                .when(service).deleteJob(1L);
-
-        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs/1"))
-                .andExpect(status().isInternalServerError());
-    }
-
-    @Test
-    public void testDeleteJobByNameForSuccess() throws Exception {
-        String jobName = "jobName";
-        doNothing().when(service).deleteJob(jobName);
-
-        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs").param("jobName", jobName))
-                .andExpect(status().isNoContent());
-    }
-
-    @Test
-    public void testDeleteJobByNameForFailureWithNotFound() throws Exception {
-        String jobName = "jobName";
-        doThrow(new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST)).when(service).deleteJob(jobName);
-
-        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs").param("jobName", jobName))
-                .andExpect(status().isNotFound());
-    }
-
-    @Test
-    public void testDeleteJobByNameForFailureWithException() throws Exception {
-        String jobName = "jobName";
-        doThrow(new GriffinException.ServiceException("Failed to delete job", new Exception()))
-                .when(service).deleteJob(jobName);
-
-        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs").param("jobName", jobName))
-                .andExpect(status().isInternalServerError());
-    }
-
-    @Test
-    public void testFindInstancesOfJob() throws Exception {
-        int page = 0;
-        int size = 2;
-        JobInstanceBean jobInstance = new JobInstanceBean(1L, LivySessionStates.State.RUNNING, "", "", null, null);
-        given(service.findInstancesOfJob(1L, page, size)).willReturn(Arrays.asList(jobInstance));
-
-        mvc.perform(get(URLHelper.API_VERSION_PATH + "/jobs/instances").param("jobId", String.valueOf(1L))
-                .param("page", String.valueOf(page)).param("size", String.valueOf(size)))
-                .andExpect(status().isOk())
-                .andExpect(jsonPath("$.[0].state", is("RUNNING")));
-    }
-
-    @Test
-    public void testGetHealthInfo() throws Exception {
-        JobHealth jobHealth = new JobHealth(1, 3);
-        given(service.getHealthInfo()).willReturn(jobHealth);
-
-        mvc.perform(get(URLHelper.API_VERSION_PATH + "/jobs/health"))
-                .andExpect(status().isOk())
-                .andExpect(jsonPath("$.healthyJobCount", is(1)));
-    }
-}
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//
+//package org.apache.griffin.core.job;
+//
+//import org.apache.griffin.core.exception.GriffinException;
+//import org.apache.griffin.core.exception.GriffinExceptionHandler;
+//import org.apache.griffin.core.exception.GriffinExceptionMessage;
+//import org.apache.griffin.core.job.entity.*;
+//import org.apache.griffin.core.util.JsonUtil;
+//import org.apache.griffin.core.util.URLHelper;
+//import org.junit.Before;
+//import org.junit.Test;
+//import org.junit.runner.RunWith;
+//import org.mockito.InjectMocks;
+//import org.mockito.Mock;
+//import org.springframework.http.MediaType;
+//import org.springframework.test.context.junit4.SpringRunner;
+//import org.springframework.test.web.servlet.MockMvc;
+//import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+//
+//import java.util.Arrays;
+//import java.util.Collections;
+//
+//import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_ID_DOES_NOT_EXIST;
+//import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_NAME_DOES_NOT_EXIST;
+//import static org.apache.griffin.core.util.EntityHelper.createGriffinJob;
+//import static org.apache.griffin.core.util.EntityHelper.createJobSchedule;
+//import static org.hamcrest.CoreMatchers.is;
+//import static org.mockito.BDDMockito.given;
+//import static org.mockito.Mockito.doNothing;
+//import static org.mockito.Mockito.doThrow;
+//import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*;
+//import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
+//import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
+//import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+//
+//@RunWith(SpringRunner.class)
+//public class JobControllerTest {
+//
+//    private MockMvc mvc;
+//
+//    @Mock
+//    private JobServiceImpl service;
+//
+//    @InjectMocks
+//    private JobController controller;
+//
+//    @Before
+//    public void setup() {
+//        mvc = MockMvcBuilders
+//                .standaloneSetup(controller)
+//                .setControllerAdvice(new GriffinExceptionHandler())
+//                .build();
+//    }
+//
+//
+//    @Test
+//    public void testGetJobs() throws Exception {
+//        AbstractJob job = createJob();
+//        jobBean.setJobName("job_name");
+//        given(service.getAliveJobs("")).willReturn(Collections.singletonList(jobBean));
+//
+//        mvc.perform(get(URLHelper.API_VERSION_PATH + "/jobs").contentType(MediaType.APPLICATION_JSON))
+//                .andExpect(status().isOk())
+//                .andExpect(jsonPath("$.[0].jobName", is("job_name")));
+//    }
+//
+//    @Test
+//    public void testAddJobForSuccess() throws Exception {
+//        JobSchedule jobSchedule = createJobSchedule();
+//        jobSchedule.setId(1L);
+////        given(service.addJob(jobSchedule)).willReturn(jobSchedule);
+//
+//        mvc.perform(post(URLHelper.API_VERSION_PATH + "/jobs")
+//                .contentType(MediaType.APPLICATION_JSON)
+//                .content(JsonUtil.toJson(jobSchedule)))
+//                .andExpect(status().isCreated());
+//    }
+//
+//    @Test
+//    public void testAddJobForFailureWithBadRequest() throws Exception {
+//        JobSchedule jobSchedule = createJobSchedule();
+//        given(service.addJob(jobSchedule))
+//                .willThrow(new GriffinException.BadRequestException(GriffinExceptionMessage.MISSING_METRIC_NAME));
+//
+//        mvc.perform(post(URLHelper.API_VERSION_PATH + "/jobs")
+//                .contentType(MediaType.APPLICATION_JSON)
+//                .content(JsonUtil.toJson(jobSchedule)))
+//                .andExpect(status().isBadRequest());
+//    }
+//
+//    @Test
+//    public void testAddJobForFailureWithTriggerKeyExist() throws Exception {
+//        JobSchedule jobSchedule = createJobSchedule();
+//        given(service.addJob(jobSchedule))
+//                .willThrow(new GriffinException.ConflictException(GriffinExceptionMessage.QUARTZ_JOB_ALREADY_EXIST));
+//
+//        mvc.perform(post(URLHelper.API_VERSION_PATH + "/jobs")
+//                .contentType(MediaType.APPLICATION_JSON)
+//                .content(JsonUtil.toJson(jobSchedule)))
+//                .andExpect(status().isConflict());
+//    }
+//
+//    @Test
+//    public void testDeleteJobByIdForSuccess() throws Exception {
+//        doNothing().when(service).deleteJob(1L);
+//
+//        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs/1"))
+//                .andExpect(status().isNoContent());
+//    }
+//
+//    @Test
+//    public void testDeleteJobByIdForFailureWithNotFound() throws Exception {
+//        doThrow(new GriffinException.NotFoundException(JOB_ID_DOES_NOT_EXIST)).when(service).deleteJob(1L);
+//
+//        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs/1"))
+//                .andExpect(status().isNotFound());
+//    }
+//
+//    @Test
+//    public void testDeleteJobByIdForFailureWithException() throws Exception {
+//        doThrow(new GriffinException.ServiceException("Failed to delete job", new Exception()))
+//                .when(service).deleteJob(1L);
+//
+//        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs/1"))
+//                .andExpect(status().isInternalServerError());
+//    }
+//
+//    @Test
+//    public void testDeleteJobByNameForSuccess() throws Exception {
+//        String jobName = "jobName";
+//        doNothing().when(service).deleteJob(jobName);
+//
+//        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs").param("jobName", jobName))
+//                .andExpect(status().isNoContent());
+//    }
+//
+//    @Test
+//    public void testDeleteJobByNameForFailureWithNotFound() throws Exception {
+//        String jobName = "jobName";
+//        doThrow(new GriffinException.NotFoundException(JOB_NAME_DOES_NOT_EXIST)).when(service).deleteJob(jobName);
+//
+//        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs").param("jobName", jobName))
+//                .andExpect(status().isNotFound());
+//    }
+//
+//    @Test
+//    public void testDeleteJobByNameForFailureWithException() throws Exception {
+//        String jobName = "jobName";
+//        doThrow(new GriffinException.ServiceException("Failed to delete job", new Exception()))
+//                .when(service).deleteJob(jobName);
+//
+//        mvc.perform(delete(URLHelper.API_VERSION_PATH + "/jobs").param("jobName", jobName))
+//                .andExpect(status().isInternalServerError());
+//    }
+//
+//    @Test
+//    public void testFindInstancesOfJob() throws Exception {
+//        int page = 0;
+//        int size = 2;
+//        JobInstanceBean jobInstance = new JobInstanceBean(1L, LivySessionStates.State.RUNNING, "", "", null, null);
+//        given(service.findInstancesOfJob(1L, page, size)).willReturn(Arrays.asList(jobInstance));
+//
+//        mvc.perform(get(URLHelper.API_VERSION_PATH + "/jobs/instances").param("jobId", String.valueOf(1L))
+//                .param("page", String.valueOf(page)).param("size", String.valueOf(size)))
+//                .andExpect(status().isOk())
+//                .andExpect(jsonPath("$.[0].state", is("RUNNING")));
+//    }
+//
+//    @Test
+//    public void testGetHealthInfo() throws Exception {
+//        JobHealth jobHealth = new JobHealth(1, 3);
+//        given(service.getHealthInfo()).willReturn(jobHealth);
+//
+//        mvc.perform(get(URLHelper.API_VERSION_PATH + "/jobs/health"))
+//                .andExpect(status().isOk())
+//                .andExpect(jsonPath("$.healthyJobCount", is(1)));
+//    }
+//}