You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/09 09:43:32 UTC

[05/11] incubator-griffin git commit: upgrade new version

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index b878854..90c18ec 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -23,34 +23,38 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.commons.lang.StringUtils;
 import org.apache.griffin.core.error.exception.GriffinException.GetHealthInfoFailureException;
 import org.apache.griffin.core.error.exception.GriffinException.GetJobsFailureException;
-import org.apache.griffin.core.job.entity.JobHealth;
-import org.apache.griffin.core.job.entity.JobInstance;
-import org.apache.griffin.core.job.entity.JobRequestBody;
-import org.apache.griffin.core.job.entity.LivySessionStates;
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.job.repo.GriffinJobRepo;
 import org.apache.griffin.core.job.repo.JobInstanceRepo;
+import org.apache.griffin.core.job.repo.JobScheduleRepo;
+import org.apache.griffin.core.measure.entity.DataSource;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
 import org.apache.griffin.core.measure.entity.Measure;
-import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
 import org.apache.griffin.core.util.GriffinOperationMessage;
 import org.apache.griffin.core.util.JsonUtil;
 import org.quartz.*;
-import org.quartz.impl.matchers.GroupMatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.domain.PageRequest;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.domain.Sort;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.quartz.SchedulerFactoryBean;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.interceptor.TransactionAspectSupport;
+import org.springframework.util.CollectionUtils;
 import org.springframework.web.client.RestClientException;
 import org.springframework.web.client.RestTemplate;
 
 import java.io.IOException;
-import java.io.Serializable;
+import java.text.ParseException;
 import java.util.*;
 
-import static org.apache.griffin.core.util.GriffinOperationMessage.*;
+import static org.apache.griffin.core.util.GriffinOperationMessage.CREATE_JOB_FAIL;
+import static org.apache.griffin.core.util.GriffinOperationMessage.CREATE_JOB_SUCCESS;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.JobKey.jobKey;
 import static org.quartz.TriggerBuilder.newTrigger;
@@ -59,200 +63,305 @@ import static org.quartz.TriggerKey.triggerKey;
 @Service
 public class JobServiceImpl implements JobService {
     private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class);
+    static final String JOB_SCHEDULE_ID = "jobScheduleId";
+    static final String GRIFFIN_JOB_ID = "griffinJobId";
+    static final int MAX_PAGE_SIZE = 1024;
+    static final int DEFAULT_PAGE_SIZE = 10;
 
     @Autowired
     private SchedulerFactoryBean factory;
     @Autowired
     private JobInstanceRepo jobInstanceRepo;
     @Autowired
-    private Properties sparkJobProps;
+    @Qualifier("livyConf")
+    private Properties livyConf;
     @Autowired
-    private MeasureRepo measureRepo;
+    private GriffinMeasureRepo measureRepo;
+    @Autowired
+    private GriffinJobRepo jobRepo;
+    @Autowired
+    private JobScheduleRepo jobScheduleRepo;
 
     private RestTemplate restTemplate;
 
-
     public JobServiceImpl() {
         restTemplate = new RestTemplate();
     }
 
     @Override
-    public List<Map<String, Serializable>> getAliveJobs() {
+    public List<JobDataBean> getAliveJobs() {
         Scheduler scheduler = factory.getObject();
-        List<Map<String, Serializable>> list = new ArrayList<>();
+        List<JobDataBean> dataList = new ArrayList<>();
         try {
-            for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.anyGroup())) {
-                Map jobInfoMap = getJobInfoMap(scheduler, jobKey);
-                if (jobInfoMap.size() != 0 && !isJobDeleted(scheduler, jobKey)) {
-                    list.add(jobInfoMap);
+            List<GriffinJob> jobs = jobRepo.findByDeleted(false);
+            for (GriffinJob job : jobs) {
+                JobDataBean jobData = genJobData(scheduler, jobKey(job.getQuartzName(), job.getQuartzGroup()), job);
+                if (jobData != null) {
+                    dataList.add(jobData);
                 }
             }
-        } catch (SchedulerException e) {
-            LOGGER.error("failed to get running jobs.{}", e.getMessage());
+        } catch (Exception e) {
+            LOGGER.error("Failed to get running jobs.", e);
             throw new GetJobsFailureException();
         }
-        return list;
+        return dataList;
     }
 
-    private boolean isJobDeleted(Scheduler scheduler, JobKey jobKey) throws SchedulerException {
-        JobDataMap jobDataMap = scheduler.getJobDetail(jobKey).getJobDataMap();
-        return jobDataMap.getBooleanFromString("deleted");
-    }
-
-    private Map getJobInfoMap(Scheduler scheduler, JobKey jobKey) throws SchedulerException {
+    private JobDataBean genJobData(Scheduler scheduler, JobKey jobKey, GriffinJob job) throws SchedulerException {
         List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
-        Map<String, Serializable> jobInfoMap = new HashMap<>();
-        if (triggers == null || triggers.size() == 0) {
-            return jobInfoMap;
-        }
-        JobDetail jd = scheduler.getJobDetail(jobKey);
-        Date nextFireTime = triggers.get(0).getNextFireTime();
-        Date previousFireTime = triggers.get(0).getPreviousFireTime();
-        Trigger.TriggerState triggerState = scheduler.getTriggerState(triggers.get(0).getKey());
-
-        jobInfoMap.put("jobName", jobKey.getName());
-        jobInfoMap.put("groupName", jobKey.getGroup());
-        if (nextFireTime != null) {
-            jobInfoMap.put("nextFireTime", nextFireTime.getTime());
-        } else {
-            jobInfoMap.put("nextFireTime", -1);
-        }
-        if (previousFireTime != null) {
-            jobInfoMap.put("previousFireTime", previousFireTime.getTime());
-        } else {
-            jobInfoMap.put("previousFireTime", -1);
+        if (CollectionUtils.isEmpty(triggers)) {
+            return null;
         }
-        jobInfoMap.put("triggerState", triggerState);
-        jobInfoMap.put("measureId", jd.getJobDataMap().getString("measureId"));
-        jobInfoMap.put("sourcePattern", jd.getJobDataMap().getString("sourcePattern"));
-        jobInfoMap.put("targetPattern", jd.getJobDataMap().getString("targetPattern"));
-        if (StringUtils.isNotEmpty(jd.getJobDataMap().getString("blockStartTimestamp"))) {
-            jobInfoMap.put("blockStartTimestamp", jd.getJobDataMap().getString("blockStartTimestamp"));
+        JobDataBean jobData = new JobDataBean();
+        Trigger trigger = triggers.get(0);
+        setTriggerTime(trigger, jobData);
+        jobData.setJobId(job.getId());
+        jobData.setJobName(job.getJobName());
+        jobData.setMeasureId(job.getMeasureId());
+        jobData.setTriggerState(scheduler.getTriggerState(trigger.getKey()));
+        jobData.setCronExpression(getCronExpression(triggers));
+        return jobData;
+    }
+
+    private String getCronExpression(List<Trigger> triggers) {
+        for (Trigger trigger : triggers) {
+            if (trigger instanceof CronTrigger) {
+                return ((CronTrigger) trigger).getCronExpression();
+            }
         }
-        jobInfoMap.put("jobStartTime", jd.getJobDataMap().getString("jobStartTime"));
-        jobInfoMap.put("interval", jd.getJobDataMap().getString("interval"));
-        return jobInfoMap;
+        return null;
+    }
+
+    private void setTriggerTime(Trigger trigger, JobDataBean jobBean) throws SchedulerException {
+        Date nextFireTime = trigger.getNextFireTime();
+        Date previousFireTime = trigger.getPreviousFireTime();
+        jobBean.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1);
+        jobBean.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1);
     }
 
     @Override
-    public GriffinOperationMessage addJob(String groupName, String jobName, Long measureId, JobRequestBody jobRequestBody) {
-        int interval;
-        Date jobStartTime;
+    public GriffinOperationMessage addJob(JobSchedule js) {
+        Long measureId = js.getMeasureId();
+        GriffinMeasure measure = getMeasureIfValid(measureId);
+        if (measure != null) {
+            return addJob(js, measure);
+        }
+        return CREATE_JOB_FAIL;
+    }
+
+    private GriffinOperationMessage addJob(JobSchedule js, GriffinMeasure measure) {
+        String qName = js.getJobName() + "_" + System.currentTimeMillis();
+        String qGroup = getQuartzGroupName();
         try {
-            interval = Integer.parseInt(jobRequestBody.getInterval());
-            jobStartTime = new Date(Long.parseLong(jobRequestBody.getJobStartTime()));
-            setJobStartTime(jobStartTime, interval);
-
-            Scheduler scheduler = factory.getObject();
-            TriggerKey triggerKey = triggerKey(jobName, groupName);
-            if (scheduler.checkExists(triggerKey)) {
-                LOGGER.error("the triggerKey({},{})  has been used.", jobName, groupName);
-                return CREATE_JOB_FAIL;
+            if (addJob(js, measure, qName, qGroup)) {
+                return CREATE_JOB_SUCCESS;
             }
+        } catch (Exception e) {
+            LOGGER.error("Add job exception happens.", e);
+            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
+        }
+        return CREATE_JOB_FAIL;
+    }
+
+    private boolean addJob(JobSchedule js, GriffinMeasure measure, String qName, String qGroup) throws SchedulerException, ParseException {
+        Scheduler scheduler = factory.getObject();
+        TriggerKey triggerKey = triggerKey(qName, qGroup);
+        if (!isJobScheduleParamValid(js, measure)) {
+            return false;
+        }
+        if (scheduler.checkExists(triggerKey)) {
+            return false;
+        }
+        GriffinJob job = saveGriffinJob(measure.getId(), js.getJobName(), qName, qGroup);
+        return job != null && saveAndAddQuartzJob(scheduler, triggerKey, js, job);
+    }
+
+    private String getQuartzGroupName() {
+        return "BA";
+    }
+
+    private boolean isJobScheduleParamValid(JobSchedule js, GriffinMeasure measure) throws SchedulerException {
+        if (!isJobNameValid(js.getJobName())) {
+            return false;
+        }
+        if (!isBaseLineValid(js.getSegments())) {
+            return false;
+        }
+        List<String> names = getConnectorNames(measure);
+        return isConnectorNamesValid(js.getSegments(), names);
+    }
+
+    private boolean isJobNameValid(String jobName) {
+        if (StringUtils.isEmpty(jobName)) {
+            LOGGER.warn("Job name cannot be empty.");
+            return false;
+        }
+        int size = jobRepo.countByJobNameAndDeleted(jobName, false);
+        if (size > 0) {
+            LOGGER.warn("Job name already exits.");
+            return false;
+        }
+        return true;
+    }
 
-            if (!isMeasureIdAvailable(measureId)) {
-                LOGGER.error("The measure id {} does't exist.", measureId);
-                return CREATE_JOB_FAIL;
+    private boolean isBaseLineValid(List<JobDataSegment> segments) {
+        for (JobDataSegment jds : segments) {
+            if (jds.getBaseline()) {
+                return true;
             }
+        }
+        LOGGER.warn("Please set segment timestamp baseline in as.baseline field.");
+        return false;
+    }
 
-            JobDetail jobDetail = addJobDetail(scheduler, groupName, jobName, measureId, jobRequestBody);
-            scheduler.scheduleJob(newTriggerInstance(triggerKey, jobDetail, interval, jobStartTime));
-            return GriffinOperationMessage.CREATE_JOB_SUCCESS;
-        } catch (NumberFormatException e) {
-            LOGGER.info("jobStartTime or interval format error! {}", e.getMessage());
-            return CREATE_JOB_FAIL;
-        } catch (SchedulerException e) {
-            LOGGER.error("SchedulerException when add job. {}", e.getMessage());
-            return CREATE_JOB_FAIL;
+    private boolean isConnectorNamesValid(List<JobDataSegment> segments, List<String> names) {
+        for (JobDataSegment segment : segments) {
+            if (!isConnectorNameValid(segment.getDataConnectorName(), names)) {
+                return false;
+            }
         }
+        return true;
     }
 
-    private Boolean isMeasureIdAvailable(long measureId) {
-        Measure measure = measureRepo.findOne(measureId);
-        if (measure != null && !measure.getDeleted()) {
-            return true;
+    private boolean isConnectorNameValid(String param, List<String> names) {
+        for (String name : names) {
+            if (name.equals(param)) {
+                return true;
+            }
         }
+        LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", param, names);
         return false;
     }
 
-    private JobDetail addJobDetail(Scheduler scheduler, String groupName, String jobName, Long measureId, JobRequestBody jobRequestBody) throws SchedulerException {
-        JobKey jobKey = jobKey(jobName, groupName);
+    private List<String> getConnectorNames(GriffinMeasure measure) {
+        List<String> names = new ArrayList<>();
+        Set<String> sets = new HashSet<>();
+        List<DataSource> sources = measure.getDataSources();
+        for (DataSource source : sources) {
+            source.getConnectors().forEach(dc -> {
+                sets.add(dc.getName());
+            });
+        }
+        names.addAll(sets);
+        if (names.size() < sets.size()) {
+            LOGGER.error("Connector names cannot be repeated.");
+            throw new IllegalArgumentException();
+        }
+        return names;
+    }
+
+    private GriffinMeasure getMeasureIfValid(Long measureId) {
+        Measure measure = measureRepo.findByIdAndDeleted(measureId, false);
+        if (measure == null) {
+            LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist or is deleted.", measureId);
+            return null;
+        }
+        return (GriffinMeasure) measure;
+    }
+
+    private GriffinJob saveGriffinJob(Long measureId, String jobName, String qName, String qGroup) {
+        GriffinJob job = new GriffinJob(measureId, jobName, qName, qGroup, false);
+        return jobRepo.save(job);
+    }
+
+    private boolean saveAndAddQuartzJob(Scheduler scheduler, TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws SchedulerException, ParseException {
+        js = jobScheduleRepo.save(js);
+        JobDetail jobDetail = addJobDetail(scheduler, triggerKey, js, job);
+        scheduler.scheduleJob(genTriggerInstance(triggerKey, jobDetail, js));
+        return true;
+    }
+
+
+    private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jd, JobSchedule js) throws ParseException {
+        return newTrigger()
+                .withIdentity(triggerKey)
+                .forJob(jd)
+                .withSchedule(CronScheduleBuilder.cronSchedule(new CronExpression(js.getCronExpression()))
+                        .inTimeZone(TimeZone.getTimeZone(js.getTimeZone()))
+                )
+                .build();
+    }
+
+    private JobDetail addJobDetail(Scheduler scheduler, TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws SchedulerException {
+        JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
         JobDetail jobDetail;
-        if (scheduler.checkExists(jobKey)) {
+        Boolean isJobKeyExist = scheduler.checkExists(jobKey);
+        if (isJobKeyExist) {
             jobDetail = scheduler.getJobDetail(jobKey);
-            setJobData(jobDetail, jobRequestBody, measureId, groupName, jobName);
-            scheduler.addJob(jobDetail, true);
         } else {
-            jobDetail = newJob(SparkSubmitJob.class)
-                    .storeDurably()
-                    .withIdentity(jobKey)
-                    .build();
-            //set JobData
-            setJobData(jobDetail, jobRequestBody, measureId, groupName, jobName);
-            scheduler.addJob(jobDetail, false);
+            jobDetail = newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build();
         }
+        setJobDataMap(jobDetail, js, job);
+        scheduler.addJob(jobDetail, isJobKeyExist);
         return jobDetail;
     }
 
-    private Trigger newTriggerInstance(TriggerKey triggerKey, JobDetail jobDetail, int interval, Date jobStartTime) throws SchedulerException {
-        Trigger trigger = newTrigger()
-                .withIdentity(triggerKey)
-                .forJob(jobDetail)
-                .withSchedule(SimpleScheduleBuilder.simpleSchedule()
-                        .withIntervalInSeconds(interval)
-                        .repeatForever())
-                .startAt(jobStartTime)
-                .build();
-        return trigger;
+
+    private void setJobDataMap(JobDetail jd, JobSchedule js, GriffinJob job) {
+        jd.getJobDataMap().put(JOB_SCHEDULE_ID, js.getId().toString());
+        jd.getJobDataMap().put(GRIFFIN_JOB_ID, job.getId().toString());
     }
 
-    private void setJobStartTime(Date jobStartTime, int interval) {
-        long currentTimestamp = System.currentTimeMillis();
-        long jobStartTimestamp = jobStartTime.getTime();
-        //if jobStartTime is before currentTimestamp, reset it with a future time
-        if (jobStartTime.before(new Date(currentTimestamp))) {
-            long n = (currentTimestamp - jobStartTimestamp) / (long) (interval * 1000);
-            jobStartTimestamp = jobStartTimestamp + (n + 1) * (long) (interval * 1000);
-            jobStartTime.setTime(jobStartTimestamp);
+    private boolean pauseJob(List<JobInstanceBean> instances) {
+        if (CollectionUtils.isEmpty(instances)) {
+            return true;
+        }
+        List<JobInstanceBean> deletedInstances = new ArrayList<>();
+        boolean pauseStatus = true;
+        for (JobInstanceBean instance : instances) {
+            boolean status = pauseJob(instance, deletedInstances);
+            pauseStatus = pauseStatus && status;
         }
+        jobInstanceRepo.save(deletedInstances);
+        return pauseStatus;
     }
 
-    private void setJobData(JobDetail jobDetail, JobRequestBody jobRequestBody, Long measureId, String groupName, String jobName) {
-        jobDetail.getJobDataMap().put("groupName", groupName);
-        jobDetail.getJobDataMap().put("jobName", jobName);
-        jobDetail.getJobDataMap().put("measureId", measureId.toString());
-        jobDetail.getJobDataMap().put("sourcePattern", jobRequestBody.getSourcePattern());
-        jobDetail.getJobDataMap().put("targetPattern", jobRequestBody.getTargetPattern());
-        jobDetail.getJobDataMap().put("blockStartTimestamp", jobRequestBody.getBlockStartTimestamp());
-        jobDetail.getJobDataMap().put("jobStartTime", jobRequestBody.getJobStartTime());
-        jobDetail.getJobDataMap().put("interval", jobRequestBody.getInterval());
-        jobDetail.getJobDataMap().put("lastBlockStartTimestamp", "");
-        jobDetail.getJobDataMap().putAsString("deleted", false);
+    private boolean pauseJob(JobInstanceBean instance, List<JobInstanceBean> deletedInstances) {
+        boolean status;
+        try {
+            status = pauseJob(instance.getPredicateGroup(), instance.getPredicateName());
+            if (status) {
+                instance.setDeleted(true);
+                deletedInstances.add(instance);
+            }
+        } catch (SchedulerException e) {
+            LOGGER.error("Pause predicate job({},{}) failure.", instance.getId(), instance.getPredicateName());
+            status = false;
+        }
+        return status;
     }
 
     @Override
-    public GriffinOperationMessage pauseJob(String group, String name) {
-        try {
-            Scheduler scheduler = factory.getObject();
-            scheduler.pauseJob(new JobKey(name, group));
-            return GriffinOperationMessage.PAUSE_JOB_SUCCESS;
-        } catch (SchedulerException | NullPointerException e) {
-            LOGGER.error("{} {}", GriffinOperationMessage.PAUSE_JOB_FAIL, e.getMessage());
-            return GriffinOperationMessage.PAUSE_JOB_FAIL;
+    public boolean pauseJob(String group, String name) throws SchedulerException {
+        Scheduler scheduler = factory.getObject();
+        JobKey jobKey = new JobKey(name, group);
+        if (!scheduler.checkExists(jobKey)) {
+            LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
+            return false;
         }
+        scheduler.pauseJob(jobKey);
+        return true;
     }
 
-    private GriffinOperationMessage setJobDeleted(String group, String name) {
-        try {
-            Scheduler scheduler = factory.getObject();
-            JobDetail jobDetail = scheduler.getJobDetail(new JobKey(name, group));
-            jobDetail.getJobDataMap().putAsString("deleted", true);
-            scheduler.addJob(jobDetail, true);
-            return GriffinOperationMessage.SET_JOB_DELETED_STATUS_SUCCESS;
-        } catch (SchedulerException | NullPointerException e) {
-            LOGGER.error("{} {}", GriffinOperationMessage.PAUSE_JOB_FAIL, e.getMessage());
-            return GriffinOperationMessage.SET_JOB_DELETED_STATUS_FAIL;
+    private boolean setJobDeleted(GriffinJob job) throws SchedulerException {
+        job.setDeleted(true);
+        jobRepo.save(job);
+        return true;
+    }
+
+    private boolean deletePredicateJob(GriffinJob job) throws SchedulerException {
+        boolean pauseStatus = true;
+        List<JobInstanceBean> instances = job.getJobInstances();
+        for (JobInstanceBean instance : instances) {
+            if (!instance.getDeleted()) {
+                pauseStatus = pauseStatus && deleteJob(instance.getPredicateGroup(), instance.getPredicateName());
+                instance.setDeleted(true);
+                if (instance.getState().equals(LivySessionStates.State.finding)) {
+                    instance.setState(LivySessionStates.State.not_found);
+                }
+            }
         }
+        return pauseStatus;
     }
 
     /**
@@ -260,18 +369,60 @@ public class JobServiceImpl implements JobService {
      * 1. pause these jobs
      * 2. set these jobs as deleted status
      *
-     * @param group job group name
-     * @param name  job name
+     * @param jobId griffin job id
+     * @return custom information
+     */
+    @Override
+    public GriffinOperationMessage deleteJob(Long jobId) {
+        GriffinJob job = jobRepo.findByIdAndDeleted(jobId, false);
+        return deleteJob(job) ? GriffinOperationMessage.DELETE_JOB_SUCCESS : GriffinOperationMessage.DELETE_JOB_FAIL;
+    }
+
+    /**
+     * logically delete
+     *
+     * @param name griffin job name which may not be unique.
      * @return custom information
      */
     @Override
-    public GriffinOperationMessage deleteJob(String group, String name) {
-        //logically delete
-        if (pauseJob(group, name).equals(PAUSE_JOB_SUCCESS) &&
-                setJobDeleted(group, name).equals(SET_JOB_DELETED_STATUS_SUCCESS)) {
-            return GriffinOperationMessage.DELETE_JOB_SUCCESS;
+    public GriffinOperationMessage deleteJob(String name) {
+        List<GriffinJob> jobs = jobRepo.findByJobNameAndDeleted(name, false);
+        if (CollectionUtils.isEmpty(jobs)) {
+            LOGGER.warn("There is no job with '{}' name.", name);
+            return GriffinOperationMessage.DELETE_JOB_FAIL;
         }
-        return GriffinOperationMessage.DELETE_JOB_FAIL;
+        for (GriffinJob job : jobs) {
+            if (!deleteJob(job)) {
+                return GriffinOperationMessage.DELETE_JOB_FAIL;
+            }
+        }
+        return GriffinOperationMessage.DELETE_JOB_SUCCESS;
+    }
+
+    private boolean deleteJob(GriffinJob job) {
+        if (job == null) {
+            LOGGER.warn("Griffin job does not exist.");
+            return false;
+        }
+        try {
+            if (pauseJob(job.getQuartzGroup(), job.getQuartzName()) && deletePredicateJob(job) && setJobDeleted(job)) {
+                return true;
+            }
+        } catch (Exception e) {
+            LOGGER.error("Delete job failure.", e);
+        }
+        return false;
+    }
+
+    private boolean deleteJob(String group, String name) throws SchedulerException {
+        Scheduler scheduler = factory.getObject();
+        JobKey jobKey = new JobKey(name, group);
+        if (scheduler.checkExists(jobKey)) {
+            LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
+            return false;
+        }
+        scheduler.deleteJob(jobKey);
+        return true;
     }
 
     /**
@@ -279,74 +430,62 @@ public class JobServiceImpl implements JobService {
      * 1. search jobs related to measure
      * 2. deleteJob
      *
-     * @param measure measure data quality between source and target dataset
-     * @throws SchedulerException quartz throws if schedule has problem
+     * @param measureId measure id
      */
-    public void deleteJobsRelateToMeasure(Measure measure) throws SchedulerException {
-        Scheduler scheduler = factory.getObject();
-        //get all jobs
-        for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.anyGroup())) {
-            JobDetail jobDetail = scheduler.getJobDetail(jobKey);
-            JobDataMap jobDataMap = jobDetail.getJobDataMap();
-            if (jobDataMap.getString("measureId").equals(measure.getId().toString())) {
-                //select jobs related to measureId
-                deleteJob(jobKey.getGroup(), jobKey.getName());
-                LOGGER.info("{} {} is paused and logically deleted.", jobKey.getGroup(), jobKey.getName());
-            }
+    public boolean deleteJobsRelateToMeasure(Long measureId) {
+        List<GriffinJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, false);
+        if (CollectionUtils.isEmpty(jobs)) {
+            LOGGER.warn("Measure id {} has no related jobs.", measureId);
+            return false;
+        }
+        for (GriffinJob job : jobs) {
+            deleteJob(job);
         }
+        return true;
     }
 
     @Override
-    public List<JobInstance> findInstancesOfJob(String group, String jobName, int page, int size) {
-        try {
-            Scheduler scheduler = factory.getObject();
-            JobKey jobKey = new JobKey(jobName, group);
-            if (!scheduler.checkExists(jobKey) || isJobDeleted(scheduler, jobKey)) {
-                return new ArrayList<>();
-            }
-        } catch (SchedulerException e) {
-            LOGGER.error("Quartz schedule error. {}", e.getMessage());
+    public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size) {
+        AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
+        if (job == null) {
+            LOGGER.warn("Job id {} does not exist.", jobId);
             return new ArrayList<>();
         }
-        //query and return instances
-        Pageable pageRequest = new PageRequest(page, size, Sort.Direction.DESC, "timestamp");
-        return jobInstanceRepo.findByGroupNameAndJobName(group, jobName, pageRequest);
+        size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size;
+        size = size <= 0 ? DEFAULT_PAGE_SIZE : size;
+        Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, "tms");
+        return jobInstanceRepo.findByJobId(jobId, pageable);
+    }
+
+    @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}")
+    public void deleteExpiredJobInstance() {
+        List<JobInstanceBean> instances = jobInstanceRepo.findByExpireTmsLessThanEqual(System.currentTimeMillis());
+        if (!pauseJob(instances)) {
+            LOGGER.error("Pause job failure.");
+            return;
+        }
+        jobInstanceRepo.deleteByExpireTimestamp(System.currentTimeMillis());
+        LOGGER.info("Delete expired job instances success.");
     }
 
     @Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}")
     public void syncInstancesOfAllJobs() {
-        List<Object> groupJobList = jobInstanceRepo.findGroupWithJobName();
-        for (Object groupJobObj : groupJobList) {
-            try {
-                Object[] groupJob = (Object[]) groupJobObj;
-                if (groupJob != null && groupJob.length == 2) {
-                    syncInstancesOfJob(groupJob[0].toString(), groupJob[1].toString());
-                }
-            } catch (Exception e) {
-                LOGGER.error("schedule update instances of all jobs failed. {}", e.getMessage());
+        List<JobInstanceBean> beans = jobInstanceRepo.findByActiveState();
+        if (!CollectionUtils.isEmpty(beans)) {
+            for (JobInstanceBean jobInstance : beans) {
+                syncInstancesOfJob(jobInstance);
             }
         }
     }
 
+
     /**
-     * call livy to update part of jobInstance table data associated with group and jobName in mysql.
+     * call livy to update part of job instance table data associated with group and jobName in mysql.
      *
-     * @param group   group name of jobInstance
-     * @param jobName job name of jobInstance
+     * @param jobInstance job instance livy info
      */
-    private void syncInstancesOfJob(String group, String jobName) {
-        //update all instance info belongs to this group and job.
-        List<JobInstance> jobInstanceList = jobInstanceRepo.findByGroupNameAndJobName(group, jobName);
-        for (JobInstance jobInstance : jobInstanceList) {
-            if (LivySessionStates.isActive(jobInstance.getState())) {
-                String uri = sparkJobProps.getProperty("livy.uri") + "/" + jobInstance.getSessionId();
-                setJobInstanceInfo(jobInstance, uri, group, jobName);
-            }
-
-        }
-    }
-
-    private void setJobInstanceInfo(JobInstance jobInstance, String uri, String group, String jobName) {
+    private void syncInstancesOfJob(JobInstanceBean jobInstance) {
+        String uri = livyConf.getProperty("livy.uri") + "/" + jobInstance.getSessionId();
         TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() {
         };
         try {
@@ -354,28 +493,31 @@ public class JobServiceImpl implements JobService {
             HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, type);
             setJobInstanceIdAndUri(jobInstance, resultMap);
         } catch (RestClientException e) {
-            LOGGER.error("spark session {} has overdue, set state as unknown!\n {}", jobInstance.getSessionId(), e.getMessage());
+            LOGGER.error("Spark session {} has overdue, set state as unknown!\n {}", jobInstance.getSessionId(), e.getMessage());
             setJobInstanceUnknownStatus(jobInstance);
         } catch (IOException e) {
-            LOGGER.error("jobInstance jsonStr convert to map failed. {}", e.getMessage());
+            LOGGER.error("Job instance json converts to map failed. {}", e.getMessage());
         } catch (IllegalArgumentException e) {
-            LOGGER.warn("Livy status is illegal. {}", group, jobName, e.getMessage());
+            LOGGER.error("Livy status is illegal. {}", e.getMessage());
         }
     }
 
-    private void setJobInstanceIdAndUri(JobInstance jobInstance, HashMap<String, Object> resultMap) throws IllegalArgumentException {
+
+    private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String, Object> resultMap) {
         if (resultMap != null && resultMap.size() != 0 && resultMap.get("state") != null) {
-            jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
+            instance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
             if (resultMap.get("appId") != null) {
-                jobInstance.setAppId(resultMap.get("appId").toString());
-                jobInstance.setAppUri(sparkJobProps.getProperty("spark.uri") + "/cluster/app/" + resultMap.get("appId").toString());
+                String appId = String.valueOf(resultMap.get("appId"));
+                String appUri = livyConf.getProperty("spark.uri") + "/cluster/app/" + appId;
+                instance.setAppId(appId);
+                instance.setAppUri(appUri);
             }
-            jobInstanceRepo.save(jobInstance);
+            jobInstanceRepo.save(instance);
         }
 
     }
 
-    private void setJobInstanceUnknownStatus(JobInstance jobInstance) {
+    private void setJobInstanceUnknownStatus(JobInstanceBean jobInstance) {
         //if server cannot get session from Livy, set State as unknown.
         jobInstance.setState(LivySessionStates.State.unknown);
         jobInstanceRepo.save(jobInstance);
@@ -388,55 +530,42 @@ public class JobServiceImpl implements JobService {
      */
     @Override
     public JobHealth getHealthInfo() {
-        Scheduler scheduler = factory.getObject();
-        int jobCount = 0;
-        int notHealthyCount = 0;
-        try {
-            Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup());
-            for (JobKey jobKey : jobKeys) {
-                List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
-                if (triggers != null && triggers.size() != 0 && !isJobDeleted(scheduler, jobKey)) {
-                    jobCount++;
-                    notHealthyCount = getJobNotHealthyCount(notHealthyCount, jobKey);
-                }
-            }
-        } catch (SchedulerException e) {
-            LOGGER.error(e.getMessage());
-            throw new GetHealthInfoFailureException();
+        JobHealth jobHealth = new JobHealth();
+        List<GriffinJob> jobs = jobRepo.findByDeleted(false);
+        for (GriffinJob job : jobs) {
+            jobHealth = getHealthInfo(jobHealth, job);
         }
-        return new JobHealth(jobCount - notHealthyCount, jobCount);
+        return jobHealth;
     }
 
-    private int getJobNotHealthyCount(int notHealthyCount, JobKey jobKey) {
-        if (!isJobHealthy(jobKey)) {
-            notHealthyCount++;
+    private JobHealth getHealthInfo(JobHealth jobHealth, GriffinJob job) {
+        List<Trigger> triggers = getTriggers(job);
+        if (!CollectionUtils.isEmpty(triggers)) {
+            jobHealth.setJobCount(jobHealth.getJobCount() + 1);
+            if (isJobHealthy(job.getId())) {
+                jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1);
+            }
         }
-        return notHealthyCount;
+        return jobHealth;
     }
 
-    private Boolean isJobHealthy(JobKey jobKey) {
-        Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "timestamp");
-        JobInstance latestJobInstance;
-        List<JobInstance> jobInstances = jobInstanceRepo.findByGroupNameAndJobName(jobKey.getGroup(), jobKey.getName(), pageRequest);
-        if (jobInstances != null && jobInstances.size() > 0) {
-            latestJobInstance = jobInstances.get(0);
-            if (LivySessionStates.isHealthy(latestJobInstance.getState())) {
-                return true;
-            }
+    private List<Trigger> getTriggers(GriffinJob job) {
+        JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup());
+        List<Trigger> triggers;
+        try {
+            triggers = (List<Trigger>) factory.getObject().getTriggersOfJob(jobKey);
+        } catch (SchedulerException e) {
+            LOGGER.error("Job schedule exception. {}", e.getMessage());
+            throw new GetHealthInfoFailureException();
         }
-        return false;
+        return triggers;
     }
 
-    @Override
-    public Map<String, List<Map<String, Serializable>>> getJobDetailsGroupByMeasureId() {
-        Map<String, List<Map<String, Serializable>>> jobDetailsMap = new HashMap<>();
-        List<Map<String, Serializable>> jobInfoList = getAliveJobs();
-        for (Map<String, Serializable> jobInfo : jobInfoList) {
-            String measureId = (String) jobInfo.get("measureId");
-            List<Map<String, Serializable>> jobs = jobDetailsMap.getOrDefault(measureId, new ArrayList<>());
-            jobs.add(jobInfo);
-            jobDetailsMap.put(measureId, jobs);
-        }
-        return jobDetailsMap;
+    private Boolean isJobHealthy(Long jobId) {
+        Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms");
+        List<JobInstanceBean> instances = jobInstanceRepo.findByJobId(jobId,pageable);
+        return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState());
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/Predicator.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/Predicator.java b/service/src/main/java/org/apache/griffin/core/job/Predicator.java
new file mode 100644
index 0000000..dd9e105
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/Predicator.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import java.io.IOException;
+
+public interface Predicator {
+    boolean predicate() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index a1e1e9d..e089d15 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -21,205 +21,128 @@ package org.apache.griffin.core.job;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.commons.lang.StringUtils;
-import org.apache.griffin.core.job.entity.JobInstance;
+import org.apache.griffin.core.job.entity.JobInstanceBean;
+import org.apache.griffin.core.job.entity.LivyConf;
 import org.apache.griffin.core.job.entity.LivySessionStates;
-import org.apache.griffin.core.job.entity.SparkJobDO;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+import org.apache.griffin.core.job.factory.PredicatorFactory;
 import org.apache.griffin.core.job.repo.JobInstanceRepo;
-import org.apache.griffin.core.measure.entity.DataConnector;
-import org.apache.griffin.core.measure.entity.DataSource;
-import org.apache.griffin.core.measure.entity.Measure;
-import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
 import org.apache.griffin.core.util.JsonUtil;
 import org.quartz.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.util.CollectionUtils;
 import org.springframework.web.client.RestTemplate;
 
 import java.io.IOException;
-import java.text.SimpleDateFormat;
 import java.util.*;
 
+import static org.apache.griffin.core.job.JobInstance.*;
+
 @PersistJobDataAfterExecution
 @DisallowConcurrentExecution
 public class SparkSubmitJob implements Job {
     private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitJob.class);
+    private static final String SPARK_JOB_JARS_SPLIT = ";";
 
     @Autowired
-    private MeasureRepo measureRepo;
-    @Autowired
     private JobInstanceRepo jobInstanceRepo;
     @Autowired
-    private Properties sparkJobProps;
-
-    /**
-     * partitionItems
-     * for example
-     * partitionItems like "date","hour",...
-     */
-    private String[] partitionItems;
-    /**
-     * sourcePatternItems targetPatternItems
-     * for example
-     * sourcePatternItems or targetPatternItems is like "YYYYMMDD","HH",...
-     */
-    private String[] sourcePatternItems, targetPatternItems;
+    @Qualifier("livyConf")
+    private Properties livyConfProps;
+    @Autowired
+    private JobServiceImpl jobService;
 
-    private Measure measure;
-    private String sourcePattern, targetPattern;
-    private String blockStartTimestamp, lastBlockStartTimestamp;
-    private String interval;
-    private String uri;
+    private GriffinMeasure measure;
+    private String livyUri;
+    private List<SegmentPredicate> mPredicts;
+    private JobInstanceBean jobInstance;
     private RestTemplate restTemplate = new RestTemplate();
-    private SparkJobDO sparkJobDO = new SparkJobDO();
+    private LivyConf livyConf = new LivyConf();
 
-    public SparkSubmitJob() {
-    }
-
-    /**
-     * execute method is used to submit sparkJobDO to Livy.
-     *
-     * @param context Job execution context
-     */
     @Override
     public void execute(JobExecutionContext context) {
         JobDetail jd = context.getJobDetail();
-        String groupName = jd.getJobDataMap().getString("groupName");
-        String jobName = jd.getJobDataMap().getString("jobName");
-        initParam(jd);
-        //prepare current system timestamp
-        long currentBlockStartTimestamp = setCurrentBlockStartTimestamp(System.currentTimeMillis());
-        LOGGER.info("currentBlockStartTimestamp: {}", currentBlockStartTimestamp);
         try {
-            if (StringUtils.isNotEmpty(sourcePattern)) {
-                setAllDataConnectorPartitions(measure.getDataSources(), sourcePattern.split("-"), partitionItems, "source", currentBlockStartTimestamp);
-            }
-            if (StringUtils.isNotEmpty(targetPattern)) {
-                setAllDataConnectorPartitions(measure.getDataSources(), targetPattern.split("-"), partitionItems, "target", currentBlockStartTimestamp);
+            initParam(jd);
+            setLivyConf();
+            if (!success(mPredicts)) {
+                updateJobInstanceState(context);
+                return;
             }
+            saveJobInstance(jd);
         } catch (Exception e) {
-            LOGGER.error("Can not execute job.Set partitions error. {}", e.getMessage());
-            return;
+            LOGGER.error("Post spark task error.", e);
         }
-        jd.getJobDataMap().put("lastBlockStartTimestamp", currentBlockStartTimestamp + "");
-        setSparkJobDO();
-        String result;
-        try {
-            result = restTemplate.postForObject(uri, sparkJobDO, String.class);
-        } catch (Exception e) {
-            LOGGER.error("Post spark task error. {}", e.getMessage());
-            return;
-        }
-        LOGGER.info(result);
-        saveJobInstance(groupName, jobName, result);
     }
 
-    private void initParam(JobDetail jd) {
-        /**
-         * the field measureId is generated from `setJobData` in `JobServiceImpl`
-         */
-        String measureId = jd.getJobDataMap().getString("measureId");
-        measure = measureRepo.findOne(Long.valueOf(measureId));
-        if (measure == null) {
-            LOGGER.error("Measure with id {} is not find!", measureId);
-            return;
+    private void updateJobInstanceState(JobExecutionContext context) throws IOException {
+        SimpleTrigger simpleTrigger = (SimpleTrigger) context.getTrigger();
+        int repeatCount = simpleTrigger.getRepeatCount();
+        int fireCount = simpleTrigger.getTimesTriggered();
+        if (fireCount > repeatCount) {
+            saveJobInstance(null, LivySessionStates.State.not_found, true);
         }
-        setMeasureInstanceName(measure, jd);
-        partitionItems = sparkJobProps.getProperty("sparkJob.dateAndHour").split(",");
-        uri = sparkJobProps.getProperty("livy.uri");
-        sourcePattern = jd.getJobDataMap().getString("sourcePattern");
-        targetPattern = jd.getJobDataMap().getString("targetPattern");
-        blockStartTimestamp = jd.getJobDataMap().getString("blockStartTimestamp");
-        lastBlockStartTimestamp = jd.getJobDataMap().getString("lastBlockStartTimestamp");
-        LOGGER.info("lastBlockStartTimestamp:{}", lastBlockStartTimestamp);
-        interval = jd.getJobDataMap().getString("interval");
-    }
-
-    private void setMeasureInstanceName(Measure measure, JobDetail jd) {
-        // in order to keep metric name unique, we set measure name as jobName at present
-        measure.setName(jd.getJobDataMap().getString("jobName"));
     }
 
-    private void setAllDataConnectorPartitions(List<DataSource> sources, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) throws IOException {
-        if (sources == null) {
-            return;
-        }
-        for (DataSource dataSource : sources) {
-            setDataSourcePartitions(dataSource, patternItemSet, partitionItems, sourceName, timestamp);
+    private String post2Livy() {
+        String result;
+        try {
+            result = restTemplate.postForObject(livyUri, livyConf, String.class);
+            LOGGER.info(result);
+        } catch (Exception e) {
+            LOGGER.error("Post to livy error. {}", e.getMessage());
+            result = null;
         }
+        return result;
     }
 
-    private void setDataSourcePartitions(DataSource dataSource, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) throws IOException {
-        String name = dataSource.getName();
-        for (DataConnector dataConnector : dataSource.getConnectors()) {
-            if (sourceName.equals(name)) {
-                setDataConnectorPartitions(dataConnector, patternItemSet, partitionItems, timestamp);
-            }
+    private boolean success(List<SegmentPredicate> predicates) throws IOException {
+        if (CollectionUtils.isEmpty(predicates)) {
+            return true;
         }
-    }
+        for (SegmentPredicate segPredicate : predicates) {
+            Predicator predicator = PredicatorFactory.newPredicateInstance(segPredicate);
+            try {
+                if (!predicator.predicate()) {
+                    return false;
+                }
+            } catch (Exception e) {
+                return false;
+            }
 
-    private void setDataConnectorPartitions(DataConnector dc, String[] patternItemSet, String[] partitionItems, long timestamp) throws IOException {
-        Map<String, String> partitionItemMap = genPartitionMap(patternItemSet, partitionItems, timestamp);
-        /**
-         * partitions must be a string like: "dt=20170301, hour=12"
-         * partitionItemMap.toString() is like "{dt=20170301, hour=12}"
-         */
-        String partitions = partitionItemMap.toString().substring(1, partitionItemMap.toString().length() - 1);
-        partitions = partitions.replaceAll(",", " AND ");
-        Map<String, String> configMap = dc.getConfigInMaps();
-        //config should not be null
-        configMap.put("where", partitions);
-        try {
-            dc.setConfig(configMap);
-        } catch (JsonProcessingException e) {
-            LOGGER.error(e.getMessage());
         }
+        return true;
     }
 
-
-    private Map<String, String> genPartitionMap(String[] patternItemSet, String[] partitionItems, long timestamp) {
-        /**
-         * patternItemSet:{YYYYMMdd,HH}
-         * partitionItems:{dt,hour}
-         * partitionItemMap:{dt=20170804,hour=09}
-         */
-        int comparableSizeMin = Math.min(patternItemSet.length, partitionItems.length);
-        Map<String, String> partitionItemMap = new HashMap<>();
-        for (int i = 0; i < comparableSizeMin; i++) {
-            /**
-             * in order to get a standard date like 20170427 01 (YYYYMMdd-HH)
-             */
-            String pattern = patternItemSet[i].replace("mm", "MM");
-            pattern = pattern.replace("DD", "dd");
-            pattern = pattern.replace("hh", "HH");
-            SimpleDateFormat sdf = new SimpleDateFormat(pattern);
-            partitionItemMap.put(partitionItems[i], sdf.format(new Date(timestamp)));
-        }
-        return partitionItemMap;
+    private void initParam(JobDetail jd) throws IOException {
+        mPredicts = new ArrayList<>();
+        livyUri = livyConfProps.getProperty("livy.uri");
+        jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME));
+        measure = JsonUtil.toEntity(jd.getJobDataMap().getString(MEASURE_KEY), GriffinMeasure.class);
+        setPredicts(jd.getJobDataMap().getString(PREDICATES_KEY));
+        setMeasureInstanceName(measure, jd);
     }
 
-
-    private long setCurrentBlockStartTimestamp(long currentSystemTimestamp) {
-        long currentBlockStartTimestamp = 0;
-        if (StringUtils.isNotEmpty(lastBlockStartTimestamp)) {
-            try {
-                currentBlockStartTimestamp = Long.parseLong(lastBlockStartTimestamp) + Integer.parseInt(interval) * 1000;
-            } catch (Exception e) {
-                LOGGER.info("lastBlockStartTimestamp or interval format problem! {}", e.getMessage());
-            }
-        } else {
-            if (StringUtils.isNotEmpty(blockStartTimestamp)) {
-                try {
-                    currentBlockStartTimestamp = Long.parseLong(blockStartTimestamp);
-                } catch (Exception e) {
-                    LOGGER.info("blockStartTimestamp format problem! {}", e.getMessage());
-                }
-            } else {
-                currentBlockStartTimestamp = currentSystemTimestamp;
+    private void setPredicts(String json) throws IOException {
+        List<Map<String, Object>> maps = JsonUtil.toEntity(json, new TypeReference<List<Map>>() {
+        });
+        if (maps != null) {
+            for (Map<String, Object> map : maps) {
+                SegmentPredicate sp = new SegmentPredicate();
+                sp.setType((String) map.get("type"));
+                sp.setConfigMap((Map<String, String>) map.get("config"));
+                mPredicts.add(sp);
             }
         }
-        return currentBlockStartTimestamp;
+    }
+
+    private void setMeasureInstanceName(GriffinMeasure measure, JobDetail jd) {
+        // in order to keep metric name unique, we set job name as measure name at present
+        measure.setName(jd.getJobDataMap().getString(JOB_NAME));
     }
 
     private String escapeCharacter(String str, String regex) {
@@ -227,67 +150,85 @@ public class SparkSubmitJob implements Job {
         return str.replaceAll(regex, escapeCh);
     }
 
-    private void setSparkJobDO() {
-        sparkJobDO.setFile(sparkJobProps.getProperty("sparkJob.file"));
-        sparkJobDO.setClassName(sparkJobProps.getProperty("sparkJob.className"));
+    private void setLivyConf() throws JsonProcessingException {
+        setLivyParams();
+        setLivyArgs();
+        setLivyJars();
+        setPropConf();
+    }
 
+    private void setLivyParams() {
+        livyConf.setFile(livyConfProps.getProperty("sparkJob.file"));
+        livyConf.setClassName(livyConfProps.getProperty("sparkJob.className"));
+        livyConf.setName(livyConfProps.getProperty("sparkJob.name"));
+        livyConf.setQueue(livyConfProps.getProperty("sparkJob.queue"));
+        livyConf.setNumExecutors(Long.parseLong(livyConfProps.getProperty("sparkJob.numExecutors")));
+        livyConf.setExecutorCores(Long.parseLong(livyConfProps.getProperty("sparkJob.executorCores")));
+        livyConf.setDriverMemory(livyConfProps.getProperty("sparkJob.driverMemory"));
+        livyConf.setExecutorMemory(livyConfProps.getProperty("sparkJob.executorMemory"));
+        livyConf.setFiles(new ArrayList<>());
+    }
+
+    private void setLivyArgs() throws JsonProcessingException {
         List<String> args = new ArrayList<>();
-        args.add(sparkJobProps.getProperty("sparkJob.args_1"));
-        measure.setTriggerTimeStamp(System.currentTimeMillis());
+        args.add(livyConfProps.getProperty("sparkJob.args_1"));
         String measureJson = JsonUtil.toJsonWithFormat(measure);
-        // to fix livy bug: ` will be ignored by livy
+        // to fix livy bug: character ` will be ignored by livy
         String finalMeasureJson = escapeCharacter(measureJson, "\\`");
+        LOGGER.info(finalMeasureJson);
         args.add(finalMeasureJson);
-        args.add(sparkJobProps.getProperty("sparkJob.args_3"));
-        sparkJobDO.setArgs(args);
+        args.add(livyConfProps.getProperty("sparkJob.args_3"));
+        livyConf.setArgs(args);
+    }
 
-        sparkJobDO.setName(sparkJobProps.getProperty("sparkJob.name"));
-        sparkJobDO.setQueue(sparkJobProps.getProperty("sparkJob.queue"));
-        sparkJobDO.setNumExecutors(Long.parseLong(sparkJobProps.getProperty("sparkJob.numExecutors")));
-        sparkJobDO.setExecutorCores(Long.parseLong(sparkJobProps.getProperty("sparkJob.executorCores")));
-        sparkJobDO.setDriverMemory(sparkJobProps.getProperty("sparkJob.driverMemory"));
-        sparkJobDO.setExecutorMemory(sparkJobProps.getProperty("sparkJob.executorMemory"));
+    private void setLivyJars() {
+        String jarProp = livyConfProps.getProperty("sparkJob.jars");
+        List<String> jars = Arrays.asList(jarProp.split(SPARK_JOB_JARS_SPLIT));
+        livyConf.setJars(jars);
+    }
 
+    private void setPropConf() {
         Map<String, String> conf = new HashMap<>();
-        conf.put("spark.jars.packages", sparkJobProps.getProperty("sparkJob.spark.jars.packages"));
-        sparkJobDO.setConf(conf);
-
-        List<String> jars = new ArrayList<>();
-        jars.add(sparkJobProps.getProperty("sparkJob.jars_1"));
-        jars.add(sparkJobProps.getProperty("sparkJob.jars_2"));
-        jars.add(sparkJobProps.getProperty("sparkJob.jars_3"));
-        sparkJobDO.setJars(jars);
+        conf.put("spark.yarn.dist.files", livyConfProps.getProperty("spark.yarn.dist.files"));
+        livyConf.setConf(conf);
+    }
 
-        List<String> files = new ArrayList<>();
-        sparkJobDO.setFiles(files);
+    private void saveJobInstance(JobDetail jd) throws SchedulerException, IOException {
+        String result = post2Livy();
+        boolean pauseStatus = false;
+        if (result != null) {
+            pauseStatus = jobService.pauseJob(jd.getKey().getGroup(), jd.getKey().getName());
+            LOGGER.info("Delete predicate job {}.", pauseStatus ? "success" : "failure");
+        }
+        saveJobInstance(result, LivySessionStates.State.found, pauseStatus);
     }
 
-    public void saveJobInstance(String groupName, String jobName, String result) {
+    private void saveJobInstance(String result, LivySessionStates.State state, Boolean pauseStatus) throws IOException {
         TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() {
         };
-        try {
-            Map<String, Object> resultMap = JsonUtil.toEntity(result, type);
-            if (resultMap != null) {
-                JobInstance jobInstance = genJobInstance(groupName, jobName, resultMap);
-                jobInstanceRepo.save(jobInstance);
-            }
-        } catch (IOException e) {
-            LOGGER.error("jobInstance jsonStr convert to map failed. {}", e.getMessage());
-        } catch (IllegalArgumentException e) {
-            LOGGER.warn("Livy status is illegal. {}", e.getMessage());
+        Map<String, Object> resultMap = null;
+        if (result != null) {
+            resultMap = JsonUtil.toEntity(result, type);
         }
+        setJobInstance(resultMap, state, pauseStatus);
+        jobInstanceRepo.save(jobInstance);
     }
 
-    private JobInstance genJobInstance(String groupName, String jobName, Map<String, Object> resultMap) throws IllegalArgumentException {
-        JobInstance jobInstance = new JobInstance();
-        jobInstance.setGroupName(groupName);
-        jobInstance.setJobName(jobName);
-        jobInstance.setTimestamp(System.currentTimeMillis());
-        jobInstance.setSessionId(Integer.parseInt(resultMap.get("id").toString()));
-        jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
+    private void setJobInstance(Map<String, Object> resultMap, LivySessionStates.State state, Boolean pauseStatus) {
+        jobInstance.setState(state);
+        jobInstance.setDeleted(pauseStatus);
+        if (resultMap == null) {
+            return;
+        }
+        if (resultMap.get("state") != null) {
+            jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
+        }
+        if (resultMap.get("id") != null) {
+            jobInstance.setSessionId(Long.parseLong(resultMap.get("id").toString()));
+        }
         if (resultMap.get("appId") != null) {
             jobInstance.setAppId(resultMap.get("appId").toString());
         }
-        return jobInstance;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
new file mode 100644
index 0000000..21ceec9
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+
+import javax.persistence.*;
+
+@Entity
+@Table(name = "job")
+@Inheritance(strategy = InheritanceType.SINGLE_TABLE)
+@DiscriminatorColumn(name = "type")
+public abstract class AbstractJob extends AbstractAuditableEntity {
+    private static final long serialVersionUID = 7569493377868453677L;
+
+    protected Long measureId;
+
+    protected String jobName;
+
+    protected String metricName;
+
+    protected Boolean deleted = false;
+
+    AbstractJob() {
+    }
+
+    AbstractJob(Long measureId, String jobName, boolean deleted) {
+        this.measureId = measureId;
+        this.jobName = jobName;
+        this.deleted = deleted;
+    }
+
+    AbstractJob(String jobName, Long measureId, String metricName) {
+        this.jobName = jobName;
+        this.measureId = measureId;
+        this.metricName = metricName;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getMetricName() {
+        return metricName;
+    }
+
+    public void setMetricName(String metricName) {
+        this.metricName = metricName;
+    }
+
+    public Long getMeasureId() {
+        return measureId;
+    }
+
+    public void setMeasureId(Long measureId) {
+        this.measureId = measureId;
+    }
+
+    public Boolean getDeleted() {
+        return deleted;
+    }
+
+    public void setDeleted(Boolean deleted) {
+        this.deleted = deleted;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
new file mode 100644
index 0000000..65d8e15
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
@@ -0,0 +1,79 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import javax.persistence.*;
+import java.util.ArrayList;
+import java.util.List;
+
+@Entity
+@DiscriminatorValue("griffin_job")
+public class GriffinJob extends AbstractJob {
+
+    @Column(name = "quartz_job_name")
+    private String quartzName;
+
+    @Column(name = "quartz_group_name")
+    private String quartzGroup;
+
+    @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}, orphanRemoval = true)
+    @JoinColumn(name = "job_id")
+    private List<JobInstanceBean> jobInstances = new ArrayList<>();
+
+    public String getQuartzName() {
+        return quartzName;
+    }
+
+    public void setQuartzName(String quartzName) {
+        this.quartzName = quartzName;
+    }
+
+    public String getQuartzGroup() {
+        return quartzGroup;
+    }
+
+    public void setQuartzGroup(String quartzGroup) {
+        this.quartzGroup = quartzGroup;
+    }
+
+    public List<JobInstanceBean> getJobInstances() {
+        return jobInstances;
+    }
+
+    public void setJobInstances(List<JobInstanceBean> jobInstances) {
+        this.jobInstances = jobInstances;
+    }
+
+    public GriffinJob() {
+        super();
+    }
+
+    public GriffinJob(Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) {
+        super(measureId, jobName, deleted);
+        this.metricName = jobName;
+        this.quartzName = qJobName;
+        this.quartzGroup = qGroupName;
+    }
+
+    public GriffinJob(Long jobId, Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) {
+        this(measureId, jobName, qJobName, qGroupName, deleted);
+        setId(jobId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
new file mode 100644
index 0000000..b27ab91
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+
+import org.quartz.Trigger;
+
+public class JobDataBean {
+
+    private Long jobId;
+
+    private String jobName;
+
+    private Long measureId;
+
+    private Trigger.TriggerState triggerState;
+
+    private Long nextFireTime;
+
+    private Long previousFireTime;
+
+    private String cronExpression;
+
+    public Long getJobId() {
+        return jobId;
+    }
+
+    public void setJobId(Long jobId) {
+        this.jobId = jobId;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public Long getMeasureId() {
+        return measureId;
+    }
+
+    public void setMeasureId(Long measureId) {
+        this.measureId = measureId;
+    }
+
+    public Trigger.TriggerState getTriggerState() {
+        return triggerState;
+    }
+
+    public void setTriggerState(Trigger.TriggerState triggerState) {
+        this.triggerState = triggerState;
+    }
+
+    public Long getNextFireTime() {
+        return nextFireTime;
+    }
+
+    public void setNextFireTime(Long nextFireTime) {
+        this.nextFireTime = nextFireTime;
+    }
+
+    public Long getPreviousFireTime() {
+        return previousFireTime;
+    }
+
+    public void setPreviousFireTime(Long previousFireTime) {
+        this.previousFireTime = previousFireTime;
+    }
+
+    public String getCronExpression() {
+        return cronExpression;
+    }
+
+    public void setCronExpression(String cronExpression) {
+        this.cronExpression = cronExpression;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
new file mode 100644
index 0000000..7009b5d
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.*;
+import javax.validation.constraints.NotNull;
+
+@Entity
+public class JobDataSegment extends AbstractAuditableEntity {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(JobDataSegment.class);
+
+    @NotNull
+    private String dataConnectorName;
+
+    private Boolean baseline = false;
+
+    @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+    @JoinColumn(name = "segment_range_id")
+    private SegmentRange segmentRange;
+
+    @JsonProperty("as.baseline")
+    public Boolean getBaseline() {
+        return baseline;
+    }
+
+    @JsonProperty("as.baseline")
+    public void setBaseline(Boolean baseline) {
+        this.baseline = baseline;
+    }
+
+    @JsonProperty("segment.range")
+    public SegmentRange getSegmentRange() {
+        return segmentRange;
+    }
+
+    @JsonProperty("segment.range")
+    public void setSegmentRange(SegmentRange segmentRange) {
+        this.segmentRange = segmentRange;
+    }
+
+    @JsonProperty("data.connector.name")
+    public String getDataConnectorName() {
+        return dataConnectorName;
+    }
+
+    @JsonProperty("data.connector.name")
+    public void setDataConnectorName(String dataConnectorName) {
+        if (StringUtils.isEmpty(dataConnectorName)) {
+            LOGGER.error(" Data connector name is invalid. Please check your connector name.");
+            throw new NullPointerException();
+        }
+        this.dataConnectorName = dataConnectorName;
+    }
+
+    public JobDataSegment() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java
index 9d2a654..ecb5feb 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java
@@ -40,6 +40,8 @@ public class JobHealth {
     }
 
     public JobHealth() {
+        this.healthyJobCount = 0;
+        this.jobCount = 0;
     }
 
     public JobHealth(int healthyJobCount, int jobCount) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java
deleted file mode 100644
index 2cb5949..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job.entity;
-
-import org.apache.griffin.core.job.entity.LivySessionStates.State;
-import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
-
-import javax.persistence.*;
-
-@Entity
-public class JobInstance extends AbstractAuditableEntity {
-
-    private static final long serialVersionUID = -4748881017029815874L;
-
-    private String groupName;
-    private String jobName;
-    private int sessionId;
-    @Enumerated(EnumType.STRING)
-    private State state;
-    private String appId;
-    @Lob
-    @Column(length = 1024)
-    private String appUri;
-    private long timestamp;
-
-    public String getGroupName() {
-        return groupName;
-    }
-
-    public void setGroupName(String groupName) {
-        this.groupName = groupName;
-    }
-
-    public String getJobName() {
-        return jobName;
-    }
-
-    public void setJobName(String jobName) {
-        this.jobName = jobName;
-    }
-
-    public int getSessionId() {
-        return sessionId;
-    }
-
-    public void setSessionId(int sessionId) {
-        this.sessionId = sessionId;
-    }
-
-    public State getState() {
-        return state;
-    }
-
-    public void setState(State state) {
-        this.state = state;
-    }
-
-    public String getAppId() {
-        return appId;
-    }
-
-    public void setAppId(String appId) {
-        this.appId = appId;
-    }
-
-    public String getAppUri() {
-        return appUri;
-    }
-
-    public void setAppUri(String appUri) {
-        this.appUri = appUri;
-    }
-
-    public long getTimestamp() {
-        return timestamp;
-    }
-
-    public void setTimestamp(long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    public JobInstance() {
-    }
-
-    public JobInstance(String groupName, String jobName, int sessionId, State state, String appId, String appUri, long timestamp) {
-        this.groupName = groupName;
-        this.jobName = jobName;
-        this.sessionId = sessionId;
-        this.state = state;
-        this.appId = appId;
-        this.appUri = appUri;
-        this.timestamp = timestamp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
new file mode 100644
index 0000000..ff4d444
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
@@ -0,0 +1,156 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.griffin.core.job.entity.LivySessionStates.State;
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+
+@Entity
+public class JobInstanceBean extends AbstractAuditableEntity {
+
+    private static final long serialVersionUID = -4748881017029815874L;
+
+    private Long sessionId;
+
+    @Enumerated(EnumType.STRING)
+    private State state;
+
+    private String appId;
+
+    @Column(length = 10 * 1024)
+    private String appUri;
+
+    @Column(name = "timestamp")
+    private Long tms;
+
+    @Column(name = "expire_timestamp")
+    private Long expireTms;
+
+    @Column(name = "predicate_group_name")
+    private String predicateGroup;
+
+    @Column(name = "predicate_job_name")
+    private String predicateName;
+
+    @Column(name = "predicate_job_deleted")
+    private Boolean deleted = false;
+
+    public Long getSessionId() {
+        return sessionId;
+    }
+
+    public void setSessionId(Long sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public State getState() {
+        return state;
+    }
+
+    public void setState(State state) {
+        this.state = state;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public void setAppId(String appId) {
+        this.appId = appId;
+    }
+
+    public String getAppUri() {
+        return appUri;
+    }
+
+    public void setAppUri(String appUri) {
+        this.appUri = appUri;
+    }
+
+    @JsonProperty("timestamp")
+    public Long getTms() {
+        return tms;
+    }
+
+    @JsonProperty("timestamp")
+    public void setTms(Long tms) {
+        this.tms = tms;
+    }
+
+    @JsonProperty("expireTimestamp")
+    public Long getExpireTms() {
+        return expireTms;
+    }
+
+    @JsonProperty("expireTimestamp")
+    public void setExpireTms(Long expireTms) {
+        this.expireTms = expireTms;
+    }
+
+    public String getPredicateGroup() {
+        return predicateGroup;
+    }
+
+    public void setPredicateGroup(String predicateGroup) {
+        this.predicateGroup = predicateGroup;
+    }
+
+    public String getPredicateName() {
+        return predicateName;
+    }
+
+    public void setPredicateName(String predicateName) {
+        this.predicateName = predicateName;
+    }
+
+    public Boolean getDeleted() {
+        return deleted;
+    }
+
+    public void setDeleted(Boolean deleted) {
+        this.deleted = deleted;
+    }
+
+    public JobInstanceBean() {
+    }
+
+    public JobInstanceBean(State state, String pName, String pGroup, Long tms, Long expireTms) {
+        this.state = state;
+        this.predicateName = pName;
+        this.predicateGroup = pGroup;
+        this.tms = tms;
+        this.expireTms = expireTms;
+    }
+
+    public JobInstanceBean(Long sessionId, State state, String appId, String appUri, Long timestamp, Long expireTms) {
+        this.sessionId = sessionId;
+        this.state = state;
+        this.appId = appId;
+        this.appUri = appUri;
+        this.tms = timestamp;
+        this.expireTms = expireTms;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java
deleted file mode 100644
index 0d0ea40..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.core.job.entity;
-
-public class JobRequestBody {
-    private String sourcePattern;
-    private String targetPattern;
-    private String blockStartTimestamp;
-    private String jobStartTime;
-    private String interval;
-
-    public String getSourcePattern() {
-        return sourcePattern;
-    }
-
-    public void setSourcePattern(String sourcePattern) {
-        this.sourcePattern = sourcePattern;
-    }
-
-    public String getTargetPattern() {
-        return targetPattern;
-    }
-
-    public void setTargetPattern(String targetPattern) {
-        this.targetPattern = targetPattern;
-    }
-
-    public String getBlockStartTimestamp() {
-        return blockStartTimestamp;
-    }
-
-    public void setBlockStartTimestamp(String blockStartTimestamp) {
-        this.blockStartTimestamp = blockStartTimestamp;
-    }
-
-    public String getJobStartTime() {
-        return jobStartTime;
-    }
-
-    public void setJobStartTime(String jobStartTime) {
-        this.jobStartTime = jobStartTime;
-    }
-
-    public String getInterval() {
-        return interval;
-    }
-
-    public void setInterval(String interval) {
-        this.interval = interval;
-    }
-
-    public JobRequestBody() {
-    }
-
-    public JobRequestBody(String sourcePattern, String targetPattern, String blockStartTimestamp, String jobStartTime, String interval) {
-        this.sourcePattern = sourcePattern;
-        this.targetPattern = targetPattern;
-        this.blockStartTimestamp = blockStartTimestamp;
-        this.jobStartTime = jobStartTime;
-        this.interval = interval;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        JobRequestBody that = (JobRequestBody) o;
-
-        if (sourcePattern != null ? !sourcePattern.equals(that.sourcePattern) : that.sourcePattern != null) {
-            return false;
-        }
-        if (targetPattern != null ? !targetPattern.equals(that.targetPattern) : that.targetPattern != null) {
-            return false;
-        }
-        if (blockStartTimestamp != null ? !blockStartTimestamp.equals(that.blockStartTimestamp) : that.blockStartTimestamp != null) {
-            return false;
-        }
-        if (jobStartTime != null ? !jobStartTime.equals(that.jobStartTime) : that.jobStartTime != null){
-            return false;
-        }
-        return interval != null ? interval.equals(that.interval) : that.interval == null;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = sourcePattern != null ? sourcePattern.hashCode() : 0;
-        result = 31 * result + (targetPattern != null ? targetPattern.hashCode() : 0);
-        result = 31 * result + (blockStartTimestamp != null ? blockStartTimestamp.hashCode() : 0);
-        result = 31 * result + (jobStartTime != null ? jobStartTime.hashCode() : 0);
-        result = 31 * result + (interval != null ? interval.hashCode() : 0);
-        return result;
-    }
-}