You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/06/05 08:40:07 UTC

incubator-griffin git commit: 20170605start time modify

Repository: incubator-griffin
Updated Branches:
  refs/heads/master 9e408994b -> 95e6350d1


20170605start time modify

Author: Chen <xi...@lm-shc-16501061.corp.ebay.com>

Closes #58 from justACT/20170605startTimeModify.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/95e6350d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/95e6350d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/95e6350d

Branch: refs/heads/master
Commit: 95e6350d1103accc23ae4315ec592d346db6cc5f
Parents: 9e40899
Author: Chen <xi...@lm-shc-16501061.corp.ebay.com>
Authored: Mon Jun 5 16:39:57 2017 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Jun 5 16:39:57 2017 +0800

----------------------------------------------------------------------
 .../griffin/core/measure/DataConnector.java     |  4 +-
 .../core/schedule/SchedulerController.java      | 41 ++++++++++++--------
 .../griffin/core/schedule/SparkSubmitJob.java   |  3 +-
 .../src/main/resources/application.properties   |  4 +-
 .../core/schedule/SparkSubmitJobTest.java       | 12 ++++--
 5 files changed, 40 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java
index 7143c36..cd7e0a4 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/DataConnector.java
@@ -95,8 +95,8 @@ public class DataConnector extends AuditableEntity  {
         this.type = type;
     }
 
-    public String getConfig() {
-        return config;
+    public Map<String,String> getConfig() {
+        return getConfigInMaps();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java
index 147b71d..beae5b7 100644
--- a/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java
+++ b/service/src/main/java/org/apache/griffin/core/schedule/SchedulerController.java
@@ -31,9 +31,9 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import java.io.Serializable;
 import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.*;
 
-import static org.quartz.DateBuilder.futureDate;
 import static org.quartz.JobBuilder.newJob;
 import static org.quartz.JobKey.jobKey;
 import static org.quartz.TriggerBuilder.newTrigger;
@@ -121,8 +121,17 @@ public class SchedulerController {
                            @RequestBody SchedulerRequestBody schedulerRequestBody) {
         //@RequestParam(value = "sourcePat", required = false) String sourcePat,
         //@RequestParam(value = "targetPat", required = false) String targetPat
-        int jobStartTime = 0;
+//        int jobStartTime = 0;
+        Date jobStartTime=null;
         int periodTime = 0;
+        SimpleDateFormat format=new SimpleDateFormat("YYYYMMdd HH:mm:ss");
+        try{
+            periodTime = Integer.parseInt(schedulerRequestBody.getPeriodTime());
+            jobStartTime=format.parse(schedulerRequestBody.getJobStartTime());
+        }catch (Exception e){
+            LOGGER.info("jobStartTime or periodTime format error! "+e);
+            return false;
+        }
         try {
             Scheduler scheduler = factory.getObject();
             TriggerKey triggerKey = triggerKey(jobName, groupName);
@@ -134,6 +143,7 @@ public class SchedulerController {
             JobDetail jobDetail;
             if (scheduler.checkExists(jobKey)) {
                 jobDetail = scheduler.getJobDetail(jobKey);
+                setJobDetail(jobDetail,schedulerRequestBody,measureName,groupName,jobName);
                 scheduler.addJob(jobDetail, true);
             } else {
                 jobDetail = newJob(SparkSubmitJob.class)
@@ -141,18 +151,7 @@ public class SchedulerController {
                         .withIdentity(jobKey)
                         .build();
 
-                jobDetail.getJobDataMap().put("measure", measureName);
-                jobDetail.getJobDataMap().put("sourcePat", schedulerRequestBody.getSourcePat());
-                jobDetail.getJobDataMap().put("targetPat", schedulerRequestBody.getTargetPat());
-                jobDetail.getJobDataMap().put("dataStartTimestamp", schedulerRequestBody.getDataStartTimestamp());
-                jobDetail.getJobDataMap().put("jobStartTime", schedulerRequestBody.getJobStartTime());
-                jobDetail.getJobDataMap().put("periodTime", schedulerRequestBody.getPeriodTime());
-                jobDetail.getJobDataMap().put("lastTime", "");
-                jobDetail.getJobDataMap().put("groupName",groupName);
-                jobDetail.getJobDataMap().put("jobName",jobName);
-
-                jobStartTime = Integer.parseInt(schedulerRequestBody.getJobStartTime());
-                periodTime = Integer.parseInt(schedulerRequestBody.getPeriodTime());
+                setJobDetail(jobDetail,schedulerRequestBody,measureName,groupName,jobName);
                 scheduler.addJob(jobDetail, false);
             }
 
@@ -164,9 +163,8 @@ public class SchedulerController {
                     .withSchedule(SimpleScheduleBuilder.simpleSchedule()
                             .withIntervalInSeconds(periodTime)
                             .repeatForever())
-                    .startAt(futureDate(jobStartTime, DateBuilder.IntervalUnit.SECOND))
+                    .startAt(jobStartTime)
                     .build();
-
             scheduler.scheduleJob(trigger);
 
             return true;
@@ -177,6 +175,17 @@ public class SchedulerController {
         }
     }
 
+    private void setJobDetail(JobDetail jobDetail,SchedulerRequestBody schedulerRequestBody,String measureName,String groupName,String jobName){
+        jobDetail.getJobDataMap().put("measure", measureName);
+        jobDetail.getJobDataMap().put("sourcePat", schedulerRequestBody.getSourcePat());
+        jobDetail.getJobDataMap().put("targetPat", schedulerRequestBody.getTargetPat());
+        jobDetail.getJobDataMap().put("dataStartTimestamp", schedulerRequestBody.getDataStartTimestamp());
+        jobDetail.getJobDataMap().put("jobStartTime", schedulerRequestBody.getJobStartTime());
+        jobDetail.getJobDataMap().put("periodTime", schedulerRequestBody.getPeriodTime());
+        jobDetail.getJobDataMap().put("lastTime", "");
+        jobDetail.getJobDataMap().put("groupName",groupName);
+        jobDetail.getJobDataMap().put("jobName",jobName);
+    }
 
     @RequestMapping(value = "/groups/{group}/jobs/{name}", method = RequestMethod.DELETE)
     public boolean deleteJob(@PathVariable String group, @PathVariable String name) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java
index 1c89deb..e9ccb4a 100644
--- a/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java
+++ b/service/src/main/java/org/apache/griffin/core/schedule/SparkSubmitJob.java
@@ -109,7 +109,7 @@ public class SparkSubmitJob implements Job {
         //prepare current system timestamp
         long currentSystemTimestamp = System.currentTimeMillis();
 
-        if (sourcePattern != null && !sourcePattern.isEmpty()) {
+        if (sourcePattern != null && !sourcePattern.equals("")) {
             sourcePatternItemSet = sourcePattern.split("-");
             long currentTimstamp = setCurrentTimestamp(currentSystemTimestamp);
             setDataConnectorPartitions(measure.getSource(), sourcePatternItemSet, partitionItemSet, currentTimstamp);
@@ -125,6 +125,7 @@ public class SparkSubmitJob implements Job {
         RestTemplate restTemplate = new RestTemplate();
         setSparkJobDO();
 //        String result = restTemplate.postForObject(uri, sparkJobDO, String.class);
+        logger.info("measure: \n"+measure);
         String result = restTemplate.postForObject(uri, sparkJobDO, String.class);
         logger.info(result);
         ScheduleResult scheduleResult=new ScheduleResult();

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/service/src/main/resources/application.properties b/service/src/main/resources/application.properties
index 685ca8a..6eba104 100644
--- a/service/src/main/resources/application.properties
+++ b/service/src/main/resources/application.properties
@@ -6,7 +6,7 @@ spring.datasource.driver-class-name=com.mysql.jdbc.Driver
 
 ## Hibernate ddl auto (validate,create, create-drop, update)
 
-spring.jpa.hibernate.ddl-auto = create-drop
+spring.jpa.hibernate.ddl-auto = update
 spring.jpa.show-sql=true
 spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
 #
@@ -19,4 +19,4 @@ hive.metastore.uris = thrift://localhost:9083
 hive.metastore.dbname = default
 
 # kafka schema registry
-kafka.schema.registry.url = http://localhost:8081
+kafka.schema.registry.url = http://localhost:8081
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/95e6350d/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java
index 56a8440..18283b4 100644
--- a/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java
+++ b/service/src/test/java/org/apache/griffin/core/schedule/SparkSubmitJobTest.java
@@ -29,6 +29,8 @@ import org.quartz.JobExecutionContext;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -102,11 +104,15 @@ public class SparkSubmitJobTest {
     public void test_genPartitions(){
         String[] patternItemSet={"YYYYMMDD","HH"};
         String[] partitionItemSet={"date","hour"};
-        long timestamp=1460174400000l;
+//        long timestamp=1460174400000l;
+        long timestamp=System.currentTimeMillis();
         Map<String,String> par=ssj.genPartitions(patternItemSet,partitionItemSet,timestamp);
         Map<String,String> verifyMap=new HashMap<>();
-        verifyMap.put("date","20160409");
-        verifyMap.put("hour","12");
+//        verifyMap.put("date","20160409");
+        SimpleDateFormat sdf = new SimpleDateFormat("YYYYMMdd");
+        verifyMap.put("date",sdf.format(new Date(timestamp)));
+        SimpleDateFormat sdf1 = new SimpleDateFormat("HH");
+        verifyMap.put("hour",sdf1.format(new Date(timestamp)));
         assertEquals(verifyMap,par);
     }