You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/09 09:43:32 UTC
[05/11] incubator-griffin git commit: upgrade new version
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index b878854..90c18ec 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -23,34 +23,38 @@ import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang.StringUtils;
import org.apache.griffin.core.error.exception.GriffinException.GetHealthInfoFailureException;
import org.apache.griffin.core.error.exception.GriffinException.GetJobsFailureException;
-import org.apache.griffin.core.job.entity.JobHealth;
-import org.apache.griffin.core.job.entity.JobInstance;
-import org.apache.griffin.core.job.entity.JobRequestBody;
-import org.apache.griffin.core.job.entity.LivySessionStates;
+import org.apache.griffin.core.job.entity.*;
+import org.apache.griffin.core.job.repo.GriffinJobRepo;
import org.apache.griffin.core.job.repo.JobInstanceRepo;
+import org.apache.griffin.core.job.repo.JobScheduleRepo;
+import org.apache.griffin.core.measure.entity.DataSource;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.apache.griffin.core.measure.entity.Measure;
-import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
import org.apache.griffin.core.util.GriffinOperationMessage;
import org.apache.griffin.core.util.JsonUtil;
import org.quartz.*;
-import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.interceptor.TransactionAspectSupport;
+import org.springframework.util.CollectionUtils;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
-import java.io.Serializable;
+import java.text.ParseException;
import java.util.*;
-import static org.apache.griffin.core.util.GriffinOperationMessage.*;
+import static org.apache.griffin.core.util.GriffinOperationMessage.CREATE_JOB_FAIL;
+import static org.apache.griffin.core.util.GriffinOperationMessage.CREATE_JOB_SUCCESS;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.JobKey.jobKey;
import static org.quartz.TriggerBuilder.newTrigger;
@@ -59,200 +63,305 @@ import static org.quartz.TriggerKey.triggerKey;
@Service
public class JobServiceImpl implements JobService {
private static final Logger LOGGER = LoggerFactory.getLogger(JobServiceImpl.class);
+ static final String JOB_SCHEDULE_ID = "jobScheduleId";
+ static final String GRIFFIN_JOB_ID = "griffinJobId";
+ static final int MAX_PAGE_SIZE = 1024;
+ static final int DEFAULT_PAGE_SIZE = 10;
@Autowired
private SchedulerFactoryBean factory;
@Autowired
private JobInstanceRepo jobInstanceRepo;
@Autowired
- private Properties sparkJobProps;
+ @Qualifier("livyConf")
+ private Properties livyConf;
@Autowired
- private MeasureRepo measureRepo;
+ private GriffinMeasureRepo measureRepo;
+ @Autowired
+ private GriffinJobRepo jobRepo;
+ @Autowired
+ private JobScheduleRepo jobScheduleRepo;
private RestTemplate restTemplate;
-
public JobServiceImpl() {
restTemplate = new RestTemplate();
}
@Override
- public List<Map<String, Serializable>> getAliveJobs() {
+ public List<JobDataBean> getAliveJobs() {
Scheduler scheduler = factory.getObject();
- List<Map<String, Serializable>> list = new ArrayList<>();
+ List<JobDataBean> dataList = new ArrayList<>();
try {
- for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.anyGroup())) {
- Map jobInfoMap = getJobInfoMap(scheduler, jobKey);
- if (jobInfoMap.size() != 0 && !isJobDeleted(scheduler, jobKey)) {
- list.add(jobInfoMap);
+ List<GriffinJob> jobs = jobRepo.findByDeleted(false);
+ for (GriffinJob job : jobs) {
+ JobDataBean jobData = genJobData(scheduler, jobKey(job.getQuartzName(), job.getQuartzGroup()), job);
+ if (jobData != null) {
+ dataList.add(jobData);
}
}
- } catch (SchedulerException e) {
- LOGGER.error("failed to get running jobs.{}", e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to get running jobs.", e);
throw new GetJobsFailureException();
}
- return list;
+ return dataList;
}
- private boolean isJobDeleted(Scheduler scheduler, JobKey jobKey) throws SchedulerException {
- JobDataMap jobDataMap = scheduler.getJobDetail(jobKey).getJobDataMap();
- return jobDataMap.getBooleanFromString("deleted");
- }
-
- private Map getJobInfoMap(Scheduler scheduler, JobKey jobKey) throws SchedulerException {
+ private JobDataBean genJobData(Scheduler scheduler, JobKey jobKey, GriffinJob job) throws SchedulerException {
List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
- Map<String, Serializable> jobInfoMap = new HashMap<>();
- if (triggers == null || triggers.size() == 0) {
- return jobInfoMap;
- }
- JobDetail jd = scheduler.getJobDetail(jobKey);
- Date nextFireTime = triggers.get(0).getNextFireTime();
- Date previousFireTime = triggers.get(0).getPreviousFireTime();
- Trigger.TriggerState triggerState = scheduler.getTriggerState(triggers.get(0).getKey());
-
- jobInfoMap.put("jobName", jobKey.getName());
- jobInfoMap.put("groupName", jobKey.getGroup());
- if (nextFireTime != null) {
- jobInfoMap.put("nextFireTime", nextFireTime.getTime());
- } else {
- jobInfoMap.put("nextFireTime", -1);
- }
- if (previousFireTime != null) {
- jobInfoMap.put("previousFireTime", previousFireTime.getTime());
- } else {
- jobInfoMap.put("previousFireTime", -1);
+ if (CollectionUtils.isEmpty(triggers)) {
+ return null;
}
- jobInfoMap.put("triggerState", triggerState);
- jobInfoMap.put("measureId", jd.getJobDataMap().getString("measureId"));
- jobInfoMap.put("sourcePattern", jd.getJobDataMap().getString("sourcePattern"));
- jobInfoMap.put("targetPattern", jd.getJobDataMap().getString("targetPattern"));
- if (StringUtils.isNotEmpty(jd.getJobDataMap().getString("blockStartTimestamp"))) {
- jobInfoMap.put("blockStartTimestamp", jd.getJobDataMap().getString("blockStartTimestamp"));
+ JobDataBean jobData = new JobDataBean();
+ Trigger trigger = triggers.get(0);
+ setTriggerTime(trigger, jobData);
+ jobData.setJobId(job.getId());
+ jobData.setJobName(job.getJobName());
+ jobData.setMeasureId(job.getMeasureId());
+ jobData.setTriggerState(scheduler.getTriggerState(trigger.getKey()));
+ jobData.setCronExpression(getCronExpression(triggers));
+ return jobData;
+ }
+
+ private String getCronExpression(List<Trigger> triggers) {
+ for (Trigger trigger : triggers) {
+ if (trigger instanceof CronTrigger) {
+ return ((CronTrigger) trigger).getCronExpression();
+ }
}
- jobInfoMap.put("jobStartTime", jd.getJobDataMap().getString("jobStartTime"));
- jobInfoMap.put("interval", jd.getJobDataMap().getString("interval"));
- return jobInfoMap;
+ return null;
+ }
+
+ private void setTriggerTime(Trigger trigger, JobDataBean jobBean) throws SchedulerException {
+ Date nextFireTime = trigger.getNextFireTime();
+ Date previousFireTime = trigger.getPreviousFireTime();
+ jobBean.setNextFireTime(nextFireTime != null ? nextFireTime.getTime() : -1);
+ jobBean.setPreviousFireTime(previousFireTime != null ? previousFireTime.getTime() : -1);
}
@Override
- public GriffinOperationMessage addJob(String groupName, String jobName, Long measureId, JobRequestBody jobRequestBody) {
- int interval;
- Date jobStartTime;
+ public GriffinOperationMessage addJob(JobSchedule js) {
+ Long measureId = js.getMeasureId();
+ GriffinMeasure measure = getMeasureIfValid(measureId);
+ if (measure != null) {
+ return addJob(js, measure);
+ }
+ return CREATE_JOB_FAIL;
+ }
+
+ private GriffinOperationMessage addJob(JobSchedule js, GriffinMeasure measure) {
+ String qName = js.getJobName() + "_" + System.currentTimeMillis();
+ String qGroup = getQuartzGroupName();
try {
- interval = Integer.parseInt(jobRequestBody.getInterval());
- jobStartTime = new Date(Long.parseLong(jobRequestBody.getJobStartTime()));
- setJobStartTime(jobStartTime, interval);
-
- Scheduler scheduler = factory.getObject();
- TriggerKey triggerKey = triggerKey(jobName, groupName);
- if (scheduler.checkExists(triggerKey)) {
- LOGGER.error("the triggerKey({},{}) has been used.", jobName, groupName);
- return CREATE_JOB_FAIL;
+ if (addJob(js, measure, qName, qGroup)) {
+ return CREATE_JOB_SUCCESS;
}
+ } catch (Exception e) {
+ LOGGER.error("Add job exception happens.", e);
+ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
+ }
+ return CREATE_JOB_FAIL;
+ }
+
+ private boolean addJob(JobSchedule js, GriffinMeasure measure, String qName, String qGroup) throws SchedulerException, ParseException {
+ Scheduler scheduler = factory.getObject();
+ TriggerKey triggerKey = triggerKey(qName, qGroup);
+ if (!isJobScheduleParamValid(js, measure)) {
+ return false;
+ }
+ if (scheduler.checkExists(triggerKey)) {
+ return false;
+ }
+ GriffinJob job = saveGriffinJob(measure.getId(), js.getJobName(), qName, qGroup);
+ return job != null && saveAndAddQuartzJob(scheduler, triggerKey, js, job);
+ }
+
+ private String getQuartzGroupName() {
+ return "BA";
+ }
+
+ private boolean isJobScheduleParamValid(JobSchedule js, GriffinMeasure measure) throws SchedulerException {
+ if (!isJobNameValid(js.getJobName())) {
+ return false;
+ }
+ if (!isBaseLineValid(js.getSegments())) {
+ return false;
+ }
+ List<String> names = getConnectorNames(measure);
+ return isConnectorNamesValid(js.getSegments(), names);
+ }
+
+ private boolean isJobNameValid(String jobName) {
+ if (StringUtils.isEmpty(jobName)) {
+ LOGGER.warn("Job name cannot be empty.");
+ return false;
+ }
+ int size = jobRepo.countByJobNameAndDeleted(jobName, false);
+ if (size > 0) {
+ LOGGER.warn("Job name already exits.");
+ return false;
+ }
+ return true;
+ }
- if (!isMeasureIdAvailable(measureId)) {
- LOGGER.error("The measure id {} does't exist.", measureId);
- return CREATE_JOB_FAIL;
+ private boolean isBaseLineValid(List<JobDataSegment> segments) {
+ for (JobDataSegment jds : segments) {
+ if (jds.getBaseline()) {
+ return true;
}
+ }
+ LOGGER.warn("Please set segment timestamp baseline in as.baseline field.");
+ return false;
+ }
- JobDetail jobDetail = addJobDetail(scheduler, groupName, jobName, measureId, jobRequestBody);
- scheduler.scheduleJob(newTriggerInstance(triggerKey, jobDetail, interval, jobStartTime));
- return GriffinOperationMessage.CREATE_JOB_SUCCESS;
- } catch (NumberFormatException e) {
- LOGGER.info("jobStartTime or interval format error! {}", e.getMessage());
- return CREATE_JOB_FAIL;
- } catch (SchedulerException e) {
- LOGGER.error("SchedulerException when add job. {}", e.getMessage());
- return CREATE_JOB_FAIL;
+ private boolean isConnectorNamesValid(List<JobDataSegment> segments, List<String> names) {
+ for (JobDataSegment segment : segments) {
+ if (!isConnectorNameValid(segment.getDataConnectorName(), names)) {
+ return false;
+ }
}
+ return true;
}
- private Boolean isMeasureIdAvailable(long measureId) {
- Measure measure = measureRepo.findOne(measureId);
- if (measure != null && !measure.getDeleted()) {
- return true;
+ private boolean isConnectorNameValid(String param, List<String> names) {
+ for (String name : names) {
+ if (name.equals(param)) {
+ return true;
+ }
}
+ LOGGER.warn("Param {} is a illegal string. Please input one of strings in {}.", param, names);
return false;
}
- private JobDetail addJobDetail(Scheduler scheduler, String groupName, String jobName, Long measureId, JobRequestBody jobRequestBody) throws SchedulerException {
- JobKey jobKey = jobKey(jobName, groupName);
+ private List<String> getConnectorNames(GriffinMeasure measure) {
+ List<String> names = new ArrayList<>();
+ Set<String> sets = new HashSet<>();
+ List<DataSource> sources = measure.getDataSources();
+ for (DataSource source : sources) {
+ source.getConnectors().forEach(dc -> {
+ sets.add(dc.getName());
+ });
+ }
+ names.addAll(sets);
+ if (names.size() < sets.size()) {
+ LOGGER.error("Connector names cannot be repeated.");
+ throw new IllegalArgumentException();
+ }
+ return names;
+ }
+
+ private GriffinMeasure getMeasureIfValid(Long measureId) {
+ Measure measure = measureRepo.findByIdAndDeleted(measureId, false);
+ if (measure == null) {
+ LOGGER.warn("The measure id {} isn't valid. Maybe it doesn't exist or is deleted.", measureId);
+ return null;
+ }
+ return (GriffinMeasure) measure;
+ }
+
+ private GriffinJob saveGriffinJob(Long measureId, String jobName, String qName, String qGroup) {
+ GriffinJob job = new GriffinJob(measureId, jobName, qName, qGroup, false);
+ return jobRepo.save(job);
+ }
+
+ private boolean saveAndAddQuartzJob(Scheduler scheduler, TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws SchedulerException, ParseException {
+ js = jobScheduleRepo.save(js);
+ JobDetail jobDetail = addJobDetail(scheduler, triggerKey, js, job);
+ scheduler.scheduleJob(genTriggerInstance(triggerKey, jobDetail, js));
+ return true;
+ }
+
+
+ private Trigger genTriggerInstance(TriggerKey triggerKey, JobDetail jd, JobSchedule js) throws ParseException {
+ return newTrigger()
+ .withIdentity(triggerKey)
+ .forJob(jd)
+ .withSchedule(CronScheduleBuilder.cronSchedule(new CronExpression(js.getCronExpression()))
+ .inTimeZone(TimeZone.getTimeZone(js.getTimeZone()))
+ )
+ .build();
+ }
+
+ private JobDetail addJobDetail(Scheduler scheduler, TriggerKey triggerKey, JobSchedule js, GriffinJob job) throws SchedulerException {
+ JobKey jobKey = jobKey(triggerKey.getName(), triggerKey.getGroup());
JobDetail jobDetail;
- if (scheduler.checkExists(jobKey)) {
+ Boolean isJobKeyExist = scheduler.checkExists(jobKey);
+ if (isJobKeyExist) {
jobDetail = scheduler.getJobDetail(jobKey);
- setJobData(jobDetail, jobRequestBody, measureId, groupName, jobName);
- scheduler.addJob(jobDetail, true);
} else {
- jobDetail = newJob(SparkSubmitJob.class)
- .storeDurably()
- .withIdentity(jobKey)
- .build();
- //set JobData
- setJobData(jobDetail, jobRequestBody, measureId, groupName, jobName);
- scheduler.addJob(jobDetail, false);
+ jobDetail = newJob(JobInstance.class).storeDurably().withIdentity(jobKey).build();
}
+ setJobDataMap(jobDetail, js, job);
+ scheduler.addJob(jobDetail, isJobKeyExist);
return jobDetail;
}
- private Trigger newTriggerInstance(TriggerKey triggerKey, JobDetail jobDetail, int interval, Date jobStartTime) throws SchedulerException {
- Trigger trigger = newTrigger()
- .withIdentity(triggerKey)
- .forJob(jobDetail)
- .withSchedule(SimpleScheduleBuilder.simpleSchedule()
- .withIntervalInSeconds(interval)
- .repeatForever())
- .startAt(jobStartTime)
- .build();
- return trigger;
+
+ private void setJobDataMap(JobDetail jd, JobSchedule js, GriffinJob job) {
+ jd.getJobDataMap().put(JOB_SCHEDULE_ID, js.getId().toString());
+ jd.getJobDataMap().put(GRIFFIN_JOB_ID, job.getId().toString());
}
- private void setJobStartTime(Date jobStartTime, int interval) {
- long currentTimestamp = System.currentTimeMillis();
- long jobStartTimestamp = jobStartTime.getTime();
- //if jobStartTime is before currentTimestamp, reset it with a future time
- if (jobStartTime.before(new Date(currentTimestamp))) {
- long n = (currentTimestamp - jobStartTimestamp) / (long) (interval * 1000);
- jobStartTimestamp = jobStartTimestamp + (n + 1) * (long) (interval * 1000);
- jobStartTime.setTime(jobStartTimestamp);
+ private boolean pauseJob(List<JobInstanceBean> instances) {
+ if (CollectionUtils.isEmpty(instances)) {
+ return true;
+ }
+ List<JobInstanceBean> deletedInstances = new ArrayList<>();
+ boolean pauseStatus = true;
+ for (JobInstanceBean instance : instances) {
+ boolean status = pauseJob(instance, deletedInstances);
+ pauseStatus = pauseStatus && status;
}
+ jobInstanceRepo.save(deletedInstances);
+ return pauseStatus;
}
- private void setJobData(JobDetail jobDetail, JobRequestBody jobRequestBody, Long measureId, String groupName, String jobName) {
- jobDetail.getJobDataMap().put("groupName", groupName);
- jobDetail.getJobDataMap().put("jobName", jobName);
- jobDetail.getJobDataMap().put("measureId", measureId.toString());
- jobDetail.getJobDataMap().put("sourcePattern", jobRequestBody.getSourcePattern());
- jobDetail.getJobDataMap().put("targetPattern", jobRequestBody.getTargetPattern());
- jobDetail.getJobDataMap().put("blockStartTimestamp", jobRequestBody.getBlockStartTimestamp());
- jobDetail.getJobDataMap().put("jobStartTime", jobRequestBody.getJobStartTime());
- jobDetail.getJobDataMap().put("interval", jobRequestBody.getInterval());
- jobDetail.getJobDataMap().put("lastBlockStartTimestamp", "");
- jobDetail.getJobDataMap().putAsString("deleted", false);
+ private boolean pauseJob(JobInstanceBean instance, List<JobInstanceBean> deletedInstances) {
+ boolean status;
+ try {
+ status = pauseJob(instance.getPredicateGroup(), instance.getPredicateName());
+ if (status) {
+ instance.setDeleted(true);
+ deletedInstances.add(instance);
+ }
+ } catch (SchedulerException e) {
+ LOGGER.error("Pause predicate job({},{}) failure.", instance.getId(), instance.getPredicateName());
+ status = false;
+ }
+ return status;
}
@Override
- public GriffinOperationMessage pauseJob(String group, String name) {
- try {
- Scheduler scheduler = factory.getObject();
- scheduler.pauseJob(new JobKey(name, group));
- return GriffinOperationMessage.PAUSE_JOB_SUCCESS;
- } catch (SchedulerException | NullPointerException e) {
- LOGGER.error("{} {}", GriffinOperationMessage.PAUSE_JOB_FAIL, e.getMessage());
- return GriffinOperationMessage.PAUSE_JOB_FAIL;
+ public boolean pauseJob(String group, String name) throws SchedulerException {
+ Scheduler scheduler = factory.getObject();
+ JobKey jobKey = new JobKey(name, group);
+ if (!scheduler.checkExists(jobKey)) {
+ LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
+ return false;
}
+ scheduler.pauseJob(jobKey);
+ return true;
}
- private GriffinOperationMessage setJobDeleted(String group, String name) {
- try {
- Scheduler scheduler = factory.getObject();
- JobDetail jobDetail = scheduler.getJobDetail(new JobKey(name, group));
- jobDetail.getJobDataMap().putAsString("deleted", true);
- scheduler.addJob(jobDetail, true);
- return GriffinOperationMessage.SET_JOB_DELETED_STATUS_SUCCESS;
- } catch (SchedulerException | NullPointerException e) {
- LOGGER.error("{} {}", GriffinOperationMessage.PAUSE_JOB_FAIL, e.getMessage());
- return GriffinOperationMessage.SET_JOB_DELETED_STATUS_FAIL;
+ private boolean setJobDeleted(GriffinJob job) throws SchedulerException {
+ job.setDeleted(true);
+ jobRepo.save(job);
+ return true;
+ }
+
+ private boolean deletePredicateJob(GriffinJob job) throws SchedulerException {
+ boolean pauseStatus = true;
+ List<JobInstanceBean> instances = job.getJobInstances();
+ for (JobInstanceBean instance : instances) {
+ if (!instance.getDeleted()) {
+ pauseStatus = pauseStatus && deleteJob(instance.getPredicateGroup(), instance.getPredicateName());
+ instance.setDeleted(true);
+ if (instance.getState().equals(LivySessionStates.State.finding)) {
+ instance.setState(LivySessionStates.State.not_found);
+ }
+ }
}
+ return pauseStatus;
}
/**
@@ -260,18 +369,60 @@ public class JobServiceImpl implements JobService {
* 1. pause these jobs
* 2. set these jobs as deleted status
*
- * @param group job group name
- * @param name job name
+ * @param jobId griffin job id
+ * @return custom information
+ */
+ @Override
+ public GriffinOperationMessage deleteJob(Long jobId) {
+ GriffinJob job = jobRepo.findByIdAndDeleted(jobId, false);
+ return deleteJob(job) ? GriffinOperationMessage.DELETE_JOB_SUCCESS : GriffinOperationMessage.DELETE_JOB_FAIL;
+ }
+
+ /**
+ * logically delete
+ *
+ * @param name griffin job name which may not be unique.
* @return custom information
*/
@Override
- public GriffinOperationMessage deleteJob(String group, String name) {
- //logically delete
- if (pauseJob(group, name).equals(PAUSE_JOB_SUCCESS) &&
- setJobDeleted(group, name).equals(SET_JOB_DELETED_STATUS_SUCCESS)) {
- return GriffinOperationMessage.DELETE_JOB_SUCCESS;
+ public GriffinOperationMessage deleteJob(String name) {
+ List<GriffinJob> jobs = jobRepo.findByJobNameAndDeleted(name, false);
+ if (CollectionUtils.isEmpty(jobs)) {
+ LOGGER.warn("There is no job with '{}' name.", name);
+ return GriffinOperationMessage.DELETE_JOB_FAIL;
}
- return GriffinOperationMessage.DELETE_JOB_FAIL;
+ for (GriffinJob job : jobs) {
+ if (!deleteJob(job)) {
+ return GriffinOperationMessage.DELETE_JOB_FAIL;
+ }
+ }
+ return GriffinOperationMessage.DELETE_JOB_SUCCESS;
+ }
+
+ private boolean deleteJob(GriffinJob job) {
+ if (job == null) {
+ LOGGER.warn("Griffin job does not exist.");
+ return false;
+ }
+ try {
+ if (pauseJob(job.getQuartzGroup(), job.getQuartzName()) && deletePredicateJob(job) && setJobDeleted(job)) {
+ return true;
+ }
+ } catch (Exception e) {
+ LOGGER.error("Delete job failure.", e);
+ }
+ return false;
+ }
+
+ private boolean deleteJob(String group, String name) throws SchedulerException {
+ Scheduler scheduler = factory.getObject();
+ JobKey jobKey = new JobKey(name, group);
+ if (scheduler.checkExists(jobKey)) {
+ LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey.getName());
+ return false;
+ }
+ scheduler.deleteJob(jobKey);
+ return true;
}
/**
@@ -279,74 +430,62 @@ public class JobServiceImpl implements JobService {
* 1. search jobs related to measure
* 2. deleteJob
*
- * @param measure measure data quality between source and target dataset
- * @throws SchedulerException quartz throws if schedule has problem
+ * @param measureId measure id
*/
- public void deleteJobsRelateToMeasure(Measure measure) throws SchedulerException {
- Scheduler scheduler = factory.getObject();
- //get all jobs
- for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.anyGroup())) {
- JobDetail jobDetail = scheduler.getJobDetail(jobKey);
- JobDataMap jobDataMap = jobDetail.getJobDataMap();
- if (jobDataMap.getString("measureId").equals(measure.getId().toString())) {
- //select jobs related to measureId
- deleteJob(jobKey.getGroup(), jobKey.getName());
- LOGGER.info("{} {} is paused and logically deleted.", jobKey.getGroup(), jobKey.getName());
- }
+ public boolean deleteJobsRelateToMeasure(Long measureId) {
+ List<GriffinJob> jobs = jobRepo.findByMeasureIdAndDeleted(measureId, false);
+ if (CollectionUtils.isEmpty(jobs)) {
+ LOGGER.warn("Measure id {} has no related jobs.", measureId);
+ return false;
+ }
+ for (GriffinJob job : jobs) {
+ deleteJob(job);
}
+ return true;
}
@Override
- public List<JobInstance> findInstancesOfJob(String group, String jobName, int page, int size) {
- try {
- Scheduler scheduler = factory.getObject();
- JobKey jobKey = new JobKey(jobName, group);
- if (!scheduler.checkExists(jobKey) || isJobDeleted(scheduler, jobKey)) {
- return new ArrayList<>();
- }
- } catch (SchedulerException e) {
- LOGGER.error("Quartz schedule error. {}", e.getMessage());
+ public List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size) {
+ AbstractJob job = jobRepo.findByIdAndDeleted(jobId, false);
+ if (job == null) {
+ LOGGER.warn("Job id {} does not exist.", jobId);
return new ArrayList<>();
}
- //query and return instances
- Pageable pageRequest = new PageRequest(page, size, Sort.Direction.DESC, "timestamp");
- return jobInstanceRepo.findByGroupNameAndJobName(group, jobName, pageRequest);
+ size = size > MAX_PAGE_SIZE ? MAX_PAGE_SIZE : size;
+ size = size <= 0 ? DEFAULT_PAGE_SIZE : size;
+ Pageable pageable = new PageRequest(page, size, Sort.Direction.DESC, "tms");
+ return jobInstanceRepo.findByJobId(jobId, pageable);
+ }
+
+ @Scheduled(fixedDelayString = "${jobInstance.expired.milliseconds}")
+ public void deleteExpiredJobInstance() {
+ List<JobInstanceBean> instances = jobInstanceRepo.findByExpireTmsLessThanEqual(System.currentTimeMillis());
+ if (!pauseJob(instances)) {
+ LOGGER.error("Pause job failure.");
+ return;
+ }
+ jobInstanceRepo.deleteByExpireTimestamp(System.currentTimeMillis());
+ LOGGER.info("Delete expired job instances success.");
}
@Scheduled(fixedDelayString = "${jobInstance.fixedDelay.in.milliseconds}")
public void syncInstancesOfAllJobs() {
- List<Object> groupJobList = jobInstanceRepo.findGroupWithJobName();
- for (Object groupJobObj : groupJobList) {
- try {
- Object[] groupJob = (Object[]) groupJobObj;
- if (groupJob != null && groupJob.length == 2) {
- syncInstancesOfJob(groupJob[0].toString(), groupJob[1].toString());
- }
- } catch (Exception e) {
- LOGGER.error("schedule update instances of all jobs failed. {}", e.getMessage());
+ List<JobInstanceBean> beans = jobInstanceRepo.findByActiveState();
+ if (!CollectionUtils.isEmpty(beans)) {
+ for (JobInstanceBean jobInstance : beans) {
+ syncInstancesOfJob(jobInstance);
}
}
}
+
/**
- * call livy to update part of jobInstance table data associated with group and jobName in mysql.
+ * call livy to update part of job instance table data associated with group and jobName in mysql.
*
- * @param group group name of jobInstance
- * @param jobName job name of jobInstance
+ * @param jobInstance job instance livy info
*/
- private void syncInstancesOfJob(String group, String jobName) {
- //update all instance info belongs to this group and job.
- List<JobInstance> jobInstanceList = jobInstanceRepo.findByGroupNameAndJobName(group, jobName);
- for (JobInstance jobInstance : jobInstanceList) {
- if (LivySessionStates.isActive(jobInstance.getState())) {
- String uri = sparkJobProps.getProperty("livy.uri") + "/" + jobInstance.getSessionId();
- setJobInstanceInfo(jobInstance, uri, group, jobName);
- }
-
- }
- }
-
- private void setJobInstanceInfo(JobInstance jobInstance, String uri, String group, String jobName) {
+ private void syncInstancesOfJob(JobInstanceBean jobInstance) {
+ String uri = livyConf.getProperty("livy.uri") + "/" + jobInstance.getSessionId();
TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() {
};
try {
@@ -354,28 +493,31 @@ public class JobServiceImpl implements JobService {
HashMap<String, Object> resultMap = JsonUtil.toEntity(resultStr, type);
setJobInstanceIdAndUri(jobInstance, resultMap);
} catch (RestClientException e) {
- LOGGER.error("spark session {} has overdue, set state as unknown!\n {}", jobInstance.getSessionId(), e.getMessage());
+ LOGGER.error("Spark session {} has overdue, set state as unknown!\n {}", jobInstance.getSessionId(), e.getMessage());
setJobInstanceUnknownStatus(jobInstance);
} catch (IOException e) {
- LOGGER.error("jobInstance jsonStr convert to map failed. {}", e.getMessage());
+ LOGGER.error("Job instance json converts to map failed. {}", e.getMessage());
} catch (IllegalArgumentException e) {
- LOGGER.warn("Livy status is illegal. {}", group, jobName, e.getMessage());
+ LOGGER.error("Livy status is illegal. {}", e.getMessage());
}
}
- private void setJobInstanceIdAndUri(JobInstance jobInstance, HashMap<String, Object> resultMap) throws IllegalArgumentException {
+
+ private void setJobInstanceIdAndUri(JobInstanceBean instance, HashMap<String, Object> resultMap) {
if (resultMap != null && resultMap.size() != 0 && resultMap.get("state") != null) {
- jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
+ instance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
if (resultMap.get("appId") != null) {
- jobInstance.setAppId(resultMap.get("appId").toString());
- jobInstance.setAppUri(sparkJobProps.getProperty("spark.uri") + "/cluster/app/" + resultMap.get("appId").toString());
+ String appId = String.valueOf(resultMap.get("appId"));
+ String appUri = livyConf.getProperty("spark.uri") + "/cluster/app/" + appId;
+ instance.setAppId(appId);
+ instance.setAppUri(appUri);
}
- jobInstanceRepo.save(jobInstance);
+ jobInstanceRepo.save(instance);
}
}
- private void setJobInstanceUnknownStatus(JobInstance jobInstance) {
+ private void setJobInstanceUnknownStatus(JobInstanceBean jobInstance) {
//if server cannot get session from Livy, set State as unknown.
jobInstance.setState(LivySessionStates.State.unknown);
jobInstanceRepo.save(jobInstance);
@@ -388,55 +530,42 @@ public class JobServiceImpl implements JobService {
*/
@Override
public JobHealth getHealthInfo() {
- Scheduler scheduler = factory.getObject();
- int jobCount = 0;
- int notHealthyCount = 0;
- try {
- Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.anyGroup());
- for (JobKey jobKey : jobKeys) {
- List<Trigger> triggers = (List<Trigger>) scheduler.getTriggersOfJob(jobKey);
- if (triggers != null && triggers.size() != 0 && !isJobDeleted(scheduler, jobKey)) {
- jobCount++;
- notHealthyCount = getJobNotHealthyCount(notHealthyCount, jobKey);
- }
- }
- } catch (SchedulerException e) {
- LOGGER.error(e.getMessage());
- throw new GetHealthInfoFailureException();
+ JobHealth jobHealth = new JobHealth();
+ List<GriffinJob> jobs = jobRepo.findByDeleted(false);
+ for (GriffinJob job : jobs) {
+ jobHealth = getHealthInfo(jobHealth, job);
}
- return new JobHealth(jobCount - notHealthyCount, jobCount);
+ return jobHealth;
}
- private int getJobNotHealthyCount(int notHealthyCount, JobKey jobKey) {
- if (!isJobHealthy(jobKey)) {
- notHealthyCount++;
+ private JobHealth getHealthInfo(JobHealth jobHealth, GriffinJob job) {
+ List<Trigger> triggers = getTriggers(job);
+ if (!CollectionUtils.isEmpty(triggers)) {
+ jobHealth.setJobCount(jobHealth.getJobCount() + 1);
+ if (isJobHealthy(job.getId())) {
+ jobHealth.setHealthyJobCount(jobHealth.getHealthyJobCount() + 1);
+ }
}
- return notHealthyCount;
+ return jobHealth;
}
- private Boolean isJobHealthy(JobKey jobKey) {
- Pageable pageRequest = new PageRequest(0, 1, Sort.Direction.DESC, "timestamp");
- JobInstance latestJobInstance;
- List<JobInstance> jobInstances = jobInstanceRepo.findByGroupNameAndJobName(jobKey.getGroup(), jobKey.getName(), pageRequest);
- if (jobInstances != null && jobInstances.size() > 0) {
- latestJobInstance = jobInstances.get(0);
- if (LivySessionStates.isHealthy(latestJobInstance.getState())) {
- return true;
- }
+ private List<Trigger> getTriggers(GriffinJob job) {
+ JobKey jobKey = new JobKey(job.getQuartzName(), job.getQuartzGroup());
+ List<Trigger> triggers;
+ try {
+ triggers = (List<Trigger>) factory.getObject().getTriggersOfJob(jobKey);
+ } catch (SchedulerException e) {
+ LOGGER.error("Job schedule exception. {}", e.getMessage());
+ throw new GetHealthInfoFailureException();
}
- return false;
+ return triggers;
}
- @Override
- public Map<String, List<Map<String, Serializable>>> getJobDetailsGroupByMeasureId() {
- Map<String, List<Map<String, Serializable>>> jobDetailsMap = new HashMap<>();
- List<Map<String, Serializable>> jobInfoList = getAliveJobs();
- for (Map<String, Serializable> jobInfo : jobInfoList) {
- String measureId = (String) jobInfo.get("measureId");
- List<Map<String, Serializable>> jobs = jobDetailsMap.getOrDefault(measureId, new ArrayList<>());
- jobs.add(jobInfo);
- jobDetailsMap.put(measureId, jobs);
- }
- return jobDetailsMap;
+ private Boolean isJobHealthy(Long jobId) {
+ Pageable pageable = new PageRequest(0, 1, Sort.Direction.DESC, "tms");
+ List<JobInstanceBean> instances = jobInstanceRepo.findByJobId(jobId,pageable);
+ return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState());
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/Predicator.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/Predicator.java b/service/src/main/java/org/apache/griffin/core/job/Predicator.java
new file mode 100644
index 0000000..dd9e105
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/Predicator.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job;
+
+import java.io.IOException;
+
+public interface Predicator {
+ boolean predicate() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
index a1e1e9d..e089d15 100644
--- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java
@@ -21,205 +21,128 @@ package org.apache.griffin.core.job;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.commons.lang.StringUtils;
-import org.apache.griffin.core.job.entity.JobInstance;
+import org.apache.griffin.core.job.entity.JobInstanceBean;
+import org.apache.griffin.core.job.entity.LivyConf;
import org.apache.griffin.core.job.entity.LivySessionStates;
-import org.apache.griffin.core.job.entity.SparkJobDO;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+import org.apache.griffin.core.job.factory.PredicatorFactory;
import org.apache.griffin.core.job.repo.JobInstanceRepo;
-import org.apache.griffin.core.measure.entity.DataConnector;
-import org.apache.griffin.core.measure.entity.DataSource;
-import org.apache.griffin.core.measure.entity.Measure;
-import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.apache.griffin.core.util.JsonUtil;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.util.CollectionUtils;
import org.springframework.web.client.RestTemplate;
import java.io.IOException;
-import java.text.SimpleDateFormat;
import java.util.*;
+import static org.apache.griffin.core.job.JobInstance.*;
+
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class SparkSubmitJob implements Job {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkSubmitJob.class);
+ private static final String SPARK_JOB_JARS_SPLIT = ";";
@Autowired
- private MeasureRepo measureRepo;
- @Autowired
private JobInstanceRepo jobInstanceRepo;
@Autowired
- private Properties sparkJobProps;
-
- /**
- * partitionItems
- * for example
- * partitionItems like "date","hour",...
- */
- private String[] partitionItems;
- /**
- * sourcePatternItems targetPatternItems
- * for example
- * sourcePatternItems or targetPatternItems is like "YYYYMMDD","HH",...
- */
- private String[] sourcePatternItems, targetPatternItems;
+ @Qualifier("livyConf")
+ private Properties livyConfProps;
+ @Autowired
+ private JobServiceImpl jobService;
- private Measure measure;
- private String sourcePattern, targetPattern;
- private String blockStartTimestamp, lastBlockStartTimestamp;
- private String interval;
- private String uri;
+ private GriffinMeasure measure;
+ private String livyUri;
+ private List<SegmentPredicate> mPredicts;
+ private JobInstanceBean jobInstance;
private RestTemplate restTemplate = new RestTemplate();
- private SparkJobDO sparkJobDO = new SparkJobDO();
+ private LivyConf livyConf = new LivyConf();
- public SparkSubmitJob() {
- }
-
- /**
- * execute method is used to submit sparkJobDO to Livy.
- *
- * @param context Job execution context
- */
@Override
public void execute(JobExecutionContext context) {
JobDetail jd = context.getJobDetail();
- String groupName = jd.getJobDataMap().getString("groupName");
- String jobName = jd.getJobDataMap().getString("jobName");
- initParam(jd);
- //prepare current system timestamp
- long currentBlockStartTimestamp = setCurrentBlockStartTimestamp(System.currentTimeMillis());
- LOGGER.info("currentBlockStartTimestamp: {}", currentBlockStartTimestamp);
try {
- if (StringUtils.isNotEmpty(sourcePattern)) {
- setAllDataConnectorPartitions(measure.getDataSources(), sourcePattern.split("-"), partitionItems, "source", currentBlockStartTimestamp);
- }
- if (StringUtils.isNotEmpty(targetPattern)) {
- setAllDataConnectorPartitions(measure.getDataSources(), targetPattern.split("-"), partitionItems, "target", currentBlockStartTimestamp);
+ initParam(jd);
+ setLivyConf();
+ if (!success(mPredicts)) {
+ updateJobInstanceState(context);
+ return;
}
+ saveJobInstance(jd);
} catch (Exception e) {
- LOGGER.error("Can not execute job.Set partitions error. {}", e.getMessage());
- return;
+ LOGGER.error("Post spark task error.", e);
}
- jd.getJobDataMap().put("lastBlockStartTimestamp", currentBlockStartTimestamp + "");
- setSparkJobDO();
- String result;
- try {
- result = restTemplate.postForObject(uri, sparkJobDO, String.class);
- } catch (Exception e) {
- LOGGER.error("Post spark task error. {}", e.getMessage());
- return;
- }
- LOGGER.info(result);
- saveJobInstance(groupName, jobName, result);
}
- private void initParam(JobDetail jd) {
- /**
- * the field measureId is generated from `setJobData` in `JobServiceImpl`
- */
- String measureId = jd.getJobDataMap().getString("measureId");
- measure = measureRepo.findOne(Long.valueOf(measureId));
- if (measure == null) {
- LOGGER.error("Measure with id {} is not find!", measureId);
- return;
+ private void updateJobInstanceState(JobExecutionContext context) throws IOException {
+ SimpleTrigger simpleTrigger = (SimpleTrigger) context.getTrigger();
+ int repeatCount = simpleTrigger.getRepeatCount();
+ int fireCount = simpleTrigger.getTimesTriggered();
+ if (fireCount > repeatCount) {
+ saveJobInstance(null, LivySessionStates.State.not_found, true);
}
- setMeasureInstanceName(measure, jd);
- partitionItems = sparkJobProps.getProperty("sparkJob.dateAndHour").split(",");
- uri = sparkJobProps.getProperty("livy.uri");
- sourcePattern = jd.getJobDataMap().getString("sourcePattern");
- targetPattern = jd.getJobDataMap().getString("targetPattern");
- blockStartTimestamp = jd.getJobDataMap().getString("blockStartTimestamp");
- lastBlockStartTimestamp = jd.getJobDataMap().getString("lastBlockStartTimestamp");
- LOGGER.info("lastBlockStartTimestamp:{}", lastBlockStartTimestamp);
- interval = jd.getJobDataMap().getString("interval");
- }
-
- private void setMeasureInstanceName(Measure measure, JobDetail jd) {
- // in order to keep metric name unique, we set measure name as jobName at present
- measure.setName(jd.getJobDataMap().getString("jobName"));
}
- private void setAllDataConnectorPartitions(List<DataSource> sources, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) throws IOException {
- if (sources == null) {
- return;
- }
- for (DataSource dataSource : sources) {
- setDataSourcePartitions(dataSource, patternItemSet, partitionItems, sourceName, timestamp);
+ private String post2Livy() {
+ String result;
+ try {
+ result = restTemplate.postForObject(livyUri, livyConf, String.class);
+ LOGGER.info(result);
+ } catch (Exception e) {
+ LOGGER.error("Post to livy error. {}", e.getMessage());
+ result = null;
}
+ return result;
}
- private void setDataSourcePartitions(DataSource dataSource, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) throws IOException {
- String name = dataSource.getName();
- for (DataConnector dataConnector : dataSource.getConnectors()) {
- if (sourceName.equals(name)) {
- setDataConnectorPartitions(dataConnector, patternItemSet, partitionItems, timestamp);
- }
+ private boolean success(List<SegmentPredicate> predicates) throws IOException {
+ if (CollectionUtils.isEmpty(predicates)) {
+ return true;
}
- }
+ for (SegmentPredicate segPredicate : predicates) {
+ Predicator predicator = PredicatorFactory.newPredicateInstance(segPredicate);
+ try {
+ if (!predicator.predicate()) {
+ return false;
+ }
+ } catch (Exception e) {
+ return false;
+ }
- private void setDataConnectorPartitions(DataConnector dc, String[] patternItemSet, String[] partitionItems, long timestamp) throws IOException {
- Map<String, String> partitionItemMap = genPartitionMap(patternItemSet, partitionItems, timestamp);
- /**
- * partitions must be a string like: "dt=20170301, hour=12"
- * partitionItemMap.toString() is like "{dt=20170301, hour=12}"
- */
- String partitions = partitionItemMap.toString().substring(1, partitionItemMap.toString().length() - 1);
- partitions = partitions.replaceAll(",", " AND ");
- Map<String, String> configMap = dc.getConfigInMaps();
- //config should not be null
- configMap.put("where", partitions);
- try {
- dc.setConfig(configMap);
- } catch (JsonProcessingException e) {
- LOGGER.error(e.getMessage());
}
+ return true;
}
-
- private Map<String, String> genPartitionMap(String[] patternItemSet, String[] partitionItems, long timestamp) {
- /**
- * patternItemSet:{YYYYMMdd,HH}
- * partitionItems:{dt,hour}
- * partitionItemMap:{dt=20170804,hour=09}
- */
- int comparableSizeMin = Math.min(patternItemSet.length, partitionItems.length);
- Map<String, String> partitionItemMap = new HashMap<>();
- for (int i = 0; i < comparableSizeMin; i++) {
- /**
- * in order to get a standard date like 20170427 01 (YYYYMMdd-HH)
- */
- String pattern = patternItemSet[i].replace("mm", "MM");
- pattern = pattern.replace("DD", "dd");
- pattern = pattern.replace("hh", "HH");
- SimpleDateFormat sdf = new SimpleDateFormat(pattern);
- partitionItemMap.put(partitionItems[i], sdf.format(new Date(timestamp)));
- }
- return partitionItemMap;
+ private void initParam(JobDetail jd) throws IOException {
+ mPredicts = new ArrayList<>();
+ livyUri = livyConfProps.getProperty("livy.uri");
+ jobInstance = jobInstanceRepo.findByPredicateName(jd.getJobDataMap().getString(PREDICATE_JOB_NAME));
+ measure = JsonUtil.toEntity(jd.getJobDataMap().getString(MEASURE_KEY), GriffinMeasure.class);
+ setPredicts(jd.getJobDataMap().getString(PREDICATES_KEY));
+ setMeasureInstanceName(measure, jd);
}
-
- private long setCurrentBlockStartTimestamp(long currentSystemTimestamp) {
- long currentBlockStartTimestamp = 0;
- if (StringUtils.isNotEmpty(lastBlockStartTimestamp)) {
- try {
- currentBlockStartTimestamp = Long.parseLong(lastBlockStartTimestamp) + Integer.parseInt(interval) * 1000;
- } catch (Exception e) {
- LOGGER.info("lastBlockStartTimestamp or interval format problem! {}", e.getMessage());
- }
- } else {
- if (StringUtils.isNotEmpty(blockStartTimestamp)) {
- try {
- currentBlockStartTimestamp = Long.parseLong(blockStartTimestamp);
- } catch (Exception e) {
- LOGGER.info("blockStartTimestamp format problem! {}", e.getMessage());
- }
- } else {
- currentBlockStartTimestamp = currentSystemTimestamp;
+ private void setPredicts(String json) throws IOException {
+ List<Map<String, Object>> maps = JsonUtil.toEntity(json, new TypeReference<List<Map>>() {
+ });
+ if (maps != null) {
+ for (Map<String, Object> map : maps) {
+ SegmentPredicate sp = new SegmentPredicate();
+ sp.setType((String) map.get("type"));
+ sp.setConfigMap((Map<String, String>) map.get("config"));
+ mPredicts.add(sp);
}
}
- return currentBlockStartTimestamp;
+ }
+
+ private void setMeasureInstanceName(GriffinMeasure measure, JobDetail jd) {
+ // in order to keep metric name unique, we set job name as measure name at present
+ measure.setName(jd.getJobDataMap().getString(JOB_NAME));
}
private String escapeCharacter(String str, String regex) {
@@ -227,67 +150,85 @@ public class SparkSubmitJob implements Job {
return str.replaceAll(regex, escapeCh);
}
- private void setSparkJobDO() {
- sparkJobDO.setFile(sparkJobProps.getProperty("sparkJob.file"));
- sparkJobDO.setClassName(sparkJobProps.getProperty("sparkJob.className"));
+ private void setLivyConf() throws JsonProcessingException {
+ setLivyParams();
+ setLivyArgs();
+ setLivyJars();
+ setPropConf();
+ }
+ private void setLivyParams() {
+ livyConf.setFile(livyConfProps.getProperty("sparkJob.file"));
+ livyConf.setClassName(livyConfProps.getProperty("sparkJob.className"));
+ livyConf.setName(livyConfProps.getProperty("sparkJob.name"));
+ livyConf.setQueue(livyConfProps.getProperty("sparkJob.queue"));
+ livyConf.setNumExecutors(Long.parseLong(livyConfProps.getProperty("sparkJob.numExecutors")));
+ livyConf.setExecutorCores(Long.parseLong(livyConfProps.getProperty("sparkJob.executorCores")));
+ livyConf.setDriverMemory(livyConfProps.getProperty("sparkJob.driverMemory"));
+ livyConf.setExecutorMemory(livyConfProps.getProperty("sparkJob.executorMemory"));
+ livyConf.setFiles(new ArrayList<>());
+ }
+
+ private void setLivyArgs() throws JsonProcessingException {
List<String> args = new ArrayList<>();
- args.add(sparkJobProps.getProperty("sparkJob.args_1"));
- measure.setTriggerTimeStamp(System.currentTimeMillis());
+ args.add(livyConfProps.getProperty("sparkJob.args_1"));
String measureJson = JsonUtil.toJsonWithFormat(measure);
- // to fix livy bug: ` will be ignored by livy
+ // to fix livy bug: character ` will be ignored by livy
String finalMeasureJson = escapeCharacter(measureJson, "\\`");
+ LOGGER.info(finalMeasureJson);
args.add(finalMeasureJson);
- args.add(sparkJobProps.getProperty("sparkJob.args_3"));
- sparkJobDO.setArgs(args);
+ args.add(livyConfProps.getProperty("sparkJob.args_3"));
+ livyConf.setArgs(args);
+ }
- sparkJobDO.setName(sparkJobProps.getProperty("sparkJob.name"));
- sparkJobDO.setQueue(sparkJobProps.getProperty("sparkJob.queue"));
- sparkJobDO.setNumExecutors(Long.parseLong(sparkJobProps.getProperty("sparkJob.numExecutors")));
- sparkJobDO.setExecutorCores(Long.parseLong(sparkJobProps.getProperty("sparkJob.executorCores")));
- sparkJobDO.setDriverMemory(sparkJobProps.getProperty("sparkJob.driverMemory"));
- sparkJobDO.setExecutorMemory(sparkJobProps.getProperty("sparkJob.executorMemory"));
+ private void setLivyJars() {
+ String jarProp = livyConfProps.getProperty("sparkJob.jars");
+ List<String> jars = Arrays.asList(jarProp.split(SPARK_JOB_JARS_SPLIT));
+ livyConf.setJars(jars);
+ }
+ private void setPropConf() {
Map<String, String> conf = new HashMap<>();
- conf.put("spark.jars.packages", sparkJobProps.getProperty("sparkJob.spark.jars.packages"));
- sparkJobDO.setConf(conf);
-
- List<String> jars = new ArrayList<>();
- jars.add(sparkJobProps.getProperty("sparkJob.jars_1"));
- jars.add(sparkJobProps.getProperty("sparkJob.jars_2"));
- jars.add(sparkJobProps.getProperty("sparkJob.jars_3"));
- sparkJobDO.setJars(jars);
+ conf.put("spark.yarn.dist.files", livyConfProps.getProperty("spark.yarn.dist.files"));
+ livyConf.setConf(conf);
+ }
- List<String> files = new ArrayList<>();
- sparkJobDO.setFiles(files);
+ private void saveJobInstance(JobDetail jd) throws SchedulerException, IOException {
+ String result = post2Livy();
+ boolean pauseStatus = false;
+ if (result != null) {
+ pauseStatus = jobService.pauseJob(jd.getKey().getGroup(), jd.getKey().getName());
+ LOGGER.info("Delete predicate job {}.", pauseStatus ? "success" : "failure");
+ }
+ saveJobInstance(result, LivySessionStates.State.found, pauseStatus);
}
- public void saveJobInstance(String groupName, String jobName, String result) {
+ private void saveJobInstance(String result, LivySessionStates.State state, Boolean pauseStatus) throws IOException {
TypeReference<HashMap<String, Object>> type = new TypeReference<HashMap<String, Object>>() {
};
- try {
- Map<String, Object> resultMap = JsonUtil.toEntity(result, type);
- if (resultMap != null) {
- JobInstance jobInstance = genJobInstance(groupName, jobName, resultMap);
- jobInstanceRepo.save(jobInstance);
- }
- } catch (IOException e) {
- LOGGER.error("jobInstance jsonStr convert to map failed. {}", e.getMessage());
- } catch (IllegalArgumentException e) {
- LOGGER.warn("Livy status is illegal. {}", e.getMessage());
+ Map<String, Object> resultMap = null;
+ if (result != null) {
+ resultMap = JsonUtil.toEntity(result, type);
}
+ setJobInstance(resultMap, state, pauseStatus);
+ jobInstanceRepo.save(jobInstance);
}
- private JobInstance genJobInstance(String groupName, String jobName, Map<String, Object> resultMap) throws IllegalArgumentException {
- JobInstance jobInstance = new JobInstance();
- jobInstance.setGroupName(groupName);
- jobInstance.setJobName(jobName);
- jobInstance.setTimestamp(System.currentTimeMillis());
- jobInstance.setSessionId(Integer.parseInt(resultMap.get("id").toString()));
- jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
+ private void setJobInstance(Map<String, Object> resultMap, LivySessionStates.State state, Boolean pauseStatus) {
+ jobInstance.setState(state);
+ jobInstance.setDeleted(pauseStatus);
+ if (resultMap == null) {
+ return;
+ }
+ if (resultMap.get("state") != null) {
+ jobInstance.setState(LivySessionStates.State.valueOf(resultMap.get("state").toString()));
+ }
+ if (resultMap.get("id") != null) {
+ jobInstance.setSessionId(Long.parseLong(resultMap.get("id").toString()));
+ }
if (resultMap.get("appId") != null) {
jobInstance.setAppId(resultMap.get("appId").toString());
}
- return jobInstance;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
new file mode 100644
index 0000000..21ceec9
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/AbstractJob.java
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+
+import javax.persistence.*;
+
+@Entity
+@Table(name = "job")
+@Inheritance(strategy = InheritanceType.SINGLE_TABLE)
+@DiscriminatorColumn(name = "type")
+public abstract class AbstractJob extends AbstractAuditableEntity {
+ private static final long serialVersionUID = 7569493377868453677L;
+
+ protected Long measureId;
+
+ protected String jobName;
+
+ protected String metricName;
+
+ protected Boolean deleted = false;
+
+ AbstractJob() {
+ }
+
+ AbstractJob(Long measureId, String jobName, boolean deleted) {
+ this.measureId = measureId;
+ this.jobName = jobName;
+ this.deleted = deleted;
+ }
+
+ AbstractJob(String jobName, Long measureId, String metricName) {
+ this.jobName = jobName;
+ this.measureId = measureId;
+ this.metricName = metricName;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ public Long getMeasureId() {
+ return measureId;
+ }
+
+ public void setMeasureId(Long measureId) {
+ this.measureId = measureId;
+ }
+
+ public Boolean getDeleted() {
+ return deleted;
+ }
+
+ public void setDeleted(Boolean deleted) {
+ this.deleted = deleted;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
new file mode 100644
index 0000000..65d8e15
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/GriffinJob.java
@@ -0,0 +1,79 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import javax.persistence.*;
+import java.util.ArrayList;
+import java.util.List;
+
+@Entity
+@DiscriminatorValue("griffin_job")
+public class GriffinJob extends AbstractJob {
+
+ @Column(name = "quartz_job_name")
+ private String quartzName;
+
+ @Column(name = "quartz_group_name")
+ private String quartzGroup;
+
+ @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}, orphanRemoval = true)
+ @JoinColumn(name = "job_id")
+ private List<JobInstanceBean> jobInstances = new ArrayList<>();
+
+ public String getQuartzName() {
+ return quartzName;
+ }
+
+ public void setQuartzName(String quartzName) {
+ this.quartzName = quartzName;
+ }
+
+ public String getQuartzGroup() {
+ return quartzGroup;
+ }
+
+ public void setQuartzGroup(String quartzGroup) {
+ this.quartzGroup = quartzGroup;
+ }
+
+ public List<JobInstanceBean> getJobInstances() {
+ return jobInstances;
+ }
+
+ public void setJobInstances(List<JobInstanceBean> jobInstances) {
+ this.jobInstances = jobInstances;
+ }
+
+ public GriffinJob() {
+ super();
+ }
+
+ public GriffinJob(Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) {
+ super(measureId, jobName, deleted);
+ this.metricName = jobName;
+ this.quartzName = qJobName;
+ this.quartzGroup = qGroupName;
+ }
+
+ public GriffinJob(Long jobId, Long measureId, String jobName, String qJobName, String qGroupName, boolean deleted) {
+ this(measureId, jobName, qJobName, qGroupName, deleted);
+ setId(jobId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
new file mode 100644
index 0000000..b27ab91
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataBean.java
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+
+import org.quartz.Trigger;
+
+public class JobDataBean {
+
+ private Long jobId;
+
+ private String jobName;
+
+ private Long measureId;
+
+ private Trigger.TriggerState triggerState;
+
+ private Long nextFireTime;
+
+ private Long previousFireTime;
+
+ private String cronExpression;
+
+ public Long getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(Long jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public Long getMeasureId() {
+ return measureId;
+ }
+
+ public void setMeasureId(Long measureId) {
+ this.measureId = measureId;
+ }
+
+ public Trigger.TriggerState getTriggerState() {
+ return triggerState;
+ }
+
+ public void setTriggerState(Trigger.TriggerState triggerState) {
+ this.triggerState = triggerState;
+ }
+
+ public Long getNextFireTime() {
+ return nextFireTime;
+ }
+
+ public void setNextFireTime(Long nextFireTime) {
+ this.nextFireTime = nextFireTime;
+ }
+
+ public Long getPreviousFireTime() {
+ return previousFireTime;
+ }
+
+ public void setPreviousFireTime(Long previousFireTime) {
+ this.previousFireTime = previousFireTime;
+ }
+
+ public String getCronExpression() {
+ return cronExpression;
+ }
+
+ public void setCronExpression(String cronExpression) {
+ this.cronExpression = cronExpression;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
new file mode 100644
index 0000000..7009b5d
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.persistence.*;
+import javax.validation.constraints.NotNull;
+
+@Entity
+public class JobDataSegment extends AbstractAuditableEntity {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobDataSegment.class);
+
+ @NotNull
+ private String dataConnectorName;
+
+ private Boolean baseline = false;
+
+ @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+ @JoinColumn(name = "segment_range_id")
+ private SegmentRange segmentRange;
+
+ @JsonProperty("as.baseline")
+ public Boolean getBaseline() {
+ return baseline;
+ }
+
+ @JsonProperty("as.baseline")
+ public void setBaseline(Boolean baseline) {
+ this.baseline = baseline;
+ }
+
+ @JsonProperty("segment.range")
+ public SegmentRange getSegmentRange() {
+ return segmentRange;
+ }
+
+ @JsonProperty("segment.range")
+ public void setSegmentRange(SegmentRange segmentRange) {
+ this.segmentRange = segmentRange;
+ }
+
+ @JsonProperty("data.connector.name")
+ public String getDataConnectorName() {
+ return dataConnectorName;
+ }
+
+ @JsonProperty("data.connector.name")
+ public void setDataConnectorName(String dataConnectorName) {
+ if (StringUtils.isEmpty(dataConnectorName)) {
+ LOGGER.error(" Data connector name is invalid. Please check your connector name.");
+ throw new NullPointerException();
+ }
+ this.dataConnectorName = dataConnectorName;
+ }
+
+ public JobDataSegment() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java
index 9d2a654..ecb5feb 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobHealth.java
@@ -40,6 +40,8 @@ public class JobHealth {
}
public JobHealth() {
+ this.healthyJobCount = 0;
+ this.jobCount = 0;
}
public JobHealth(int healthyJobCount, int jobCount) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java
deleted file mode 100644
index 2cb5949..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstance.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job.entity;
-
-import org.apache.griffin.core.job.entity.LivySessionStates.State;
-import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
-
-import javax.persistence.*;
-
-@Entity
-public class JobInstance extends AbstractAuditableEntity {
-
- private static final long serialVersionUID = -4748881017029815874L;
-
- private String groupName;
- private String jobName;
- private int sessionId;
- @Enumerated(EnumType.STRING)
- private State state;
- private String appId;
- @Lob
- @Column(length = 1024)
- private String appUri;
- private long timestamp;
-
- public String getGroupName() {
- return groupName;
- }
-
- public void setGroupName(String groupName) {
- this.groupName = groupName;
- }
-
- public String getJobName() {
- return jobName;
- }
-
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- public int getSessionId() {
- return sessionId;
- }
-
- public void setSessionId(int sessionId) {
- this.sessionId = sessionId;
- }
-
- public State getState() {
- return state;
- }
-
- public void setState(State state) {
- this.state = state;
- }
-
- public String getAppId() {
- return appId;
- }
-
- public void setAppId(String appId) {
- this.appId = appId;
- }
-
- public String getAppUri() {
- return appUri;
- }
-
- public void setAppUri(String appUri) {
- this.appUri = appUri;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
-
- public JobInstance() {
- }
-
- public JobInstance(String groupName, String jobName, int sessionId, State state, String appId, String appUri, long timestamp) {
- this.groupName = groupName;
- this.jobName = jobName;
- this.sessionId = sessionId;
- this.state = state;
- this.appId = appId;
- this.appUri = appUri;
- this.timestamp = timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
new file mode 100644
index 0000000..ff4d444
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobInstanceBean.java
@@ -0,0 +1,156 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.griffin.core.job.entity.LivySessionStates.State;
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+
+@Entity
+public class JobInstanceBean extends AbstractAuditableEntity {
+
+ private static final long serialVersionUID = -4748881017029815874L;
+
+ private Long sessionId;
+
+ @Enumerated(EnumType.STRING)
+ private State state;
+
+ private String appId;
+
+ @Column(length = 10 * 1024)
+ private String appUri;
+
+ @Column(name = "timestamp")
+ private Long tms;
+
+ @Column(name = "expire_timestamp")
+ private Long expireTms;
+
+ @Column(name = "predicate_group_name")
+ private String predicateGroup;
+
+ @Column(name = "predicate_job_name")
+ private String predicateName;
+
+ @Column(name = "predicate_job_deleted")
+ private Boolean deleted = false;
+
+ public Long getSessionId() {
+ return sessionId;
+ }
+
+ public void setSessionId(Long sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public void setState(State state) {
+ this.state = state;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ public String getAppUri() {
+ return appUri;
+ }
+
+ public void setAppUri(String appUri) {
+ this.appUri = appUri;
+ }
+
+ @JsonProperty("timestamp")
+ public Long getTms() {
+ return tms;
+ }
+
+ @JsonProperty("timestamp")
+ public void setTms(Long tms) {
+ this.tms = tms;
+ }
+
+ @JsonProperty("expireTimestamp")
+ public Long getExpireTms() {
+ return expireTms;
+ }
+
+ @JsonProperty("expireTimestamp")
+ public void setExpireTms(Long expireTms) {
+ this.expireTms = expireTms;
+ }
+
+ public String getPredicateGroup() {
+ return predicateGroup;
+ }
+
+ public void setPredicateGroup(String predicateGroup) {
+ this.predicateGroup = predicateGroup;
+ }
+
+ public String getPredicateName() {
+ return predicateName;
+ }
+
+ public void setPredicateName(String predicateName) {
+ this.predicateName = predicateName;
+ }
+
+ public Boolean getDeleted() {
+ return deleted;
+ }
+
+ public void setDeleted(Boolean deleted) {
+ this.deleted = deleted;
+ }
+
+ public JobInstanceBean() {
+ }
+
+ public JobInstanceBean(State state, String pName, String pGroup, Long tms, Long expireTms) {
+ this.state = state;
+ this.predicateName = pName;
+ this.predicateGroup = pGroup;
+ this.tms = tms;
+ this.expireTms = expireTms;
+ }
+
+ public JobInstanceBean(Long sessionId, State state, String appId, String appUri, Long timestamp, Long expireTms) {
+ this.sessionId = sessionId;
+ this.state = state;
+ this.appId = appId;
+ this.appUri = appUri;
+ this.tms = timestamp;
+ this.expireTms = expireTms;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java
deleted file mode 100644
index 0d0ea40..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobRequestBody.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.core.job.entity;
-
-public class JobRequestBody {
- private String sourcePattern;
- private String targetPattern;
- private String blockStartTimestamp;
- private String jobStartTime;
- private String interval;
-
- public String getSourcePattern() {
- return sourcePattern;
- }
-
- public void setSourcePattern(String sourcePattern) {
- this.sourcePattern = sourcePattern;
- }
-
- public String getTargetPattern() {
- return targetPattern;
- }
-
- public void setTargetPattern(String targetPattern) {
- this.targetPattern = targetPattern;
- }
-
- public String getBlockStartTimestamp() {
- return blockStartTimestamp;
- }
-
- public void setBlockStartTimestamp(String blockStartTimestamp) {
- this.blockStartTimestamp = blockStartTimestamp;
- }
-
- public String getJobStartTime() {
- return jobStartTime;
- }
-
- public void setJobStartTime(String jobStartTime) {
- this.jobStartTime = jobStartTime;
- }
-
- public String getInterval() {
- return interval;
- }
-
- public void setInterval(String interval) {
- this.interval = interval;
- }
-
- public JobRequestBody() {
- }
-
- public JobRequestBody(String sourcePattern, String targetPattern, String blockStartTimestamp, String jobStartTime, String interval) {
- this.sourcePattern = sourcePattern;
- this.targetPattern = targetPattern;
- this.blockStartTimestamp = blockStartTimestamp;
- this.jobStartTime = jobStartTime;
- this.interval = interval;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- JobRequestBody that = (JobRequestBody) o;
-
- if (sourcePattern != null ? !sourcePattern.equals(that.sourcePattern) : that.sourcePattern != null) {
- return false;
- }
- if (targetPattern != null ? !targetPattern.equals(that.targetPattern) : that.targetPattern != null) {
- return false;
- }
- if (blockStartTimestamp != null ? !blockStartTimestamp.equals(that.blockStartTimestamp) : that.blockStartTimestamp != null) {
- return false;
- }
- if (jobStartTime != null ? !jobStartTime.equals(that.jobStartTime) : that.jobStartTime != null){
- return false;
- }
- return interval != null ? interval.equals(that.interval) : that.interval == null;
- }
-
- @Override
- public int hashCode() {
- int result = sourcePattern != null ? sourcePattern.hashCode() : 0;
- result = 31 * result + (targetPattern != null ? targetPattern.hashCode() : 0);
- result = 31 * result + (blockStartTimestamp != null ? blockStartTimestamp.hashCode() : 0);
- result = 31 * result + (jobStartTime != null ? jobStartTime.hashCode() : 0);
- result = 31 * result + (interval != null ? interval.hashCode() : 0);
- return result;
- }
-}