You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/06/05 08:40:07 UTC
incubator-griffin git commit: 20170605start time modify
Repository: incubator-griffin
Updated Branches:
refs/heads/master 9e408994b -> 95e6350d1
20170605start time modify
Author: Chen <xi...@lm-shc-16501061.corp.ebay.com>
Closes #58 from justACT/20170605startTimeModify.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/95e6350d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/95e6350d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/95e6350d
Branch: refs/heads/master
Commit: 95e6350d1103accc23ae4315ec592d346db6cc5f
Parents: 9e40899
Author: Chen <xi...@lm-shc-16501061.corp.ebay.com>
Authored: Mon Jun 5 16:39:57 2017 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Jun 5 16:39:57 2017 +0800
----------------------------------------------------------------------
.../griffin/core/measure/DataConnector.java | 4 +-
.../core/schedule/SchedulerController.java | 41 ++++++++++++--------
.../griffin/core/schedule/SparkSubmitJob.java | 3 +-
.../src/main/resources/application.properties | 4 +-
.../core/schedule/SparkSubmitJobTest.java | 12 ++++--
5 files changed, 40 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java
index 7143c36..cd7e0a4 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java
@@ -95,8 +95,8 @@ public class DataConnector extends AuditableEntity {
this.type = type;
}
- public String getConfig() {
- return config;
+ public Map<String,String> getConfig() {
+ return getConfigInMaps();
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java
index 147b71d..beae5b7 100644
--- a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java
+++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java
@@ -31,9 +31,9 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.io.Serializable;
import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.*;
-import static org.quartz.DateBuilder.futureDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.JobKey.jobKey;
import static org.quartz.TriggerBuilder.newTrigger;
@@ -121,8 +121,17 @@ public class SchedulerController {
@RequestBody SchedulerRequestBody schedulerRequestBody) {
//@RequestParam(value = "sourcePat", required = false) String sourcePat,
//@RequestParam(value = "targetPat", required = false) String targetPat
- int jobStartTime = 0;
+// int jobStartTime = 0;
+ Date jobStartTime=null;
int periodTime = 0;
+ SimpleDateFormat format=new SimpleDateFormat("YYYYMMdd HH:mm:ss");
+ try{
+ periodTime = Integer.parseInt(schedulerRequestBody.getPeriodTime());
+ jobStartTime=format.parse(schedulerRequestBody.getJobStartTime());
+ }catch (Exception e){
+ LOGGER.info("jobStartTime or periodTime format error! "+e);
+ return false;
+ }
try {
Scheduler scheduler = factory.getObject();
TriggerKey triggerKey = triggerKey(jobName, groupName);
@@ -134,6 +143,7 @@ public class SchedulerController {
JobDetail jobDetail;
if (scheduler.checkExists(jobKey)) {
jobDetail = scheduler.getJobDetail(jobKey);
+ setJobDetail(jobDetail,schedulerRequestBody,measureName,groupName,jobName);
scheduler.addJob(jobDetail, true);
} else {
jobDetail = newJob(SparkSubmitJob.class)
@@ -141,18 +151,7 @@ public class SchedulerController {
.withIdentity(jobKey)
.build();
- jobDetail.getJobDataMap().put("measure", measureName);
- jobDetail.getJobDataMap().put("sourcePat", schedulerRequestBody.getSourcePat());
- jobDetail.getJobDataMap().put("targetPat", schedulerRequestBody.getTargetPat());
- jobDetail.getJobDataMap().put("dataStartTimestamp", schedulerRequestBody.getDataStartTimestamp());
- jobDetail.getJobDataMap().put("jobStartTime", schedulerRequestBody.getJobStartTime());
- jobDetail.getJobDataMap().put("periodTime", schedulerRequestBody.getPeriodTime());
- jobDetail.getJobDataMap().put("lastTime", "");
- jobDetail.getJobDataMap().put("groupName",groupName);
- jobDetail.getJobDataMap().put("jobName",jobName);
-
- jobStartTime = Integer.parseInt(schedulerRequestBody.getJobStartTime());
- periodTime = Integer.parseInt(schedulerRequestBody.getPeriodTime());
+ setJobDetail(jobDetail,schedulerRequestBody,measureName,groupName,jobName);
scheduler.addJob(jobDetail, false);
}
@@ -164,9 +163,8 @@ public class SchedulerController {
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(periodTime)
.repeatForever())
- .startAt(futureDate(jobStartTime, DateBuilder.IntervalUnit.SECOND))
+ .startAt(jobStartTime)
.build();
-
scheduler.scheduleJob(trigger);
return true;
@@ -177,6 +175,17 @@ public class SchedulerController {
}
}
+ private void setJobDetail(JobDetail jobDetail,SchedulerRequestBody schedulerRequestBody,String measureName,String groupName,String jobName){
+ jobDetail.getJobDataMap().put("measure", measureName);
+ jobDetail.getJobDataMap().put("sourcePat", schedulerRequestBody.getSourcePat());
+ jobDetail.getJobDataMap().put("targetPat", schedulerRequestBody.getTargetPat());
+ jobDetail.getJobDataMap().put("dataStartTimestamp", schedulerRequestBody.getDataStartTimestamp());
+ jobDetail.getJobDataMap().put("jobStartTime", schedulerRequestBody.getJobStartTime());
+ jobDetail.getJobDataMap().put("periodTime", schedulerRequestBody.getPeriodTime());
+ jobDetail.getJobDataMap().put("lastTime", "");
+ jobDetail.getJobDataMap().put("groupName",groupName);
+ jobDetail.getJobDataMap().put("jobName",jobName);
+ }
@RequestMapping(value = "/groups/{group}/jobs/{name}", method = RequestMethod.DELETE)
public boolean deleteJob(@PathVariable String group, @PathVariable String name) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java
index 1c89deb..e9ccb4a 100644
--- a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java
@@ -109,7 +109,7 @@ public class SparkSubmitJob implements Job {
//prepare current system timestamp
long currentSystemTimestamp = System.currentTimeMillis();
- if (sourcePattern != null && !sourcePattern.isEmpty()) {
+ if (sourcePattern != null && !sourcePattern.equals("")) {
sourcePatternItemSet = sourcePattern.split("-");
long currentTimstamp = setCurrentTimestamp(currentSystemTimestamp);
setDataConnectorPartitions(measure.getSource(), sourcePatternItemSet, partitionItemSet, currentTimstamp);
@@ -125,6 +125,7 @@ public class SparkSubmitJob implements Job {
RestTemplate restTemplate = new RestTemplate();
setSparkJobDO();
// String result = restTemplate.postForObject(uri, sparkJobDO, String.class);
+ logger.info("measure: \n"+measure);
String result = restTemplate.postForObject(uri, sparkJobDO, String.class);
logger.info(result);
ScheduleResult scheduleResult=new ScheduleResult();
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties
index 685ca8a..6eba104 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -6,7 +6,7 @@ spring.datasource.driver-class-name=com.mysql.jdbc.Driver
## Hibernate ddl auto (validate,create, create-drop, update)
-spring.jpa.hibernate.ddl-auto = create-drop
+spring.jpa.hibernate.ddl-auto = update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
#
@@ -19,4 +19,4 @@ hive.metastore.uris = thrift://localhost:9083
hive.metastore.dbname = default
# kafka schema registry
-kafka.schema.registry.url = http://localhost:8081
+kafka.schema.registry.url = http://localhost:8081
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java
index 56a8440..18283b4 100644
--- a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java
@@ -29,6 +29,8 @@ import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -102,11 +104,15 @@ public class SparkSubmitJobTest {
public void test_genPartitions(){
String[] patternItemSet={"YYYYMMDD","HH"};
String[] partitionItemSet={"date","hour"};
- long timestamp=1460174400000l;
+// long timestamp=1460174400000l;
+ long timestamp=System.currentTimeMillis();
Map<String,String> par=ssj.genPartitions(patternItemSet,partitionItemSet,timestamp);
Map<String,String> verifyMap=new HashMap<>();
- verifyMap.put("date","20160409");
- verifyMap.put("hour","12");
+// verifyMap.put("date","20160409");
+ SimpleDateFormat sdf = new SimpleDateFormat("YYYYMMdd");
+ verifyMap.put("date",sdf.format(new Date(timestamp)));
+ SimpleDateFormat sdf1 = new SimpleDateFormat("HH");
+ verifyMap.put("hour",sdf1.format(new Date(timestamp)));
assertEquals(verifyMap,par);
}