You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/08/10 10:28:54 UTC

incubator-griffin git commit: update measure field to support new format and ut

Repository: incubator-griffin
Updated Branches:
  refs/heads/master 23ff999cd -> 7b749ad72


update measure field to support new format and ut

1.update env_batch.json and env_streaming.json
2.Rule
- add "inDataFrameName" and "outDataFrameName", remove "name"
-  add "out" param array, move "metric", "record" param inside "out" array
DataSource
- add boolean field "baseline"
- change "cache" to "checkpoint"
DataConnector
- add "dataFrameName"
Measure
- add "sinks" string array
- update dqType from String to enum
JobServiceImpl
- change "persist" to "sinks"
-  compare literal string "hdfs" case insensitively

3.update measure ut and fix predicate ut bug

Author: ahutsunshine <ah...@gmail.com>

Closes #389 from ahutsunshine/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/7b749ad7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/7b749ad7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/7b749ad7

Branch: refs/heads/master
Commit: 7b749ad72a78eb4244bcf47a80a52f2a6e3f222f
Parents: 23ff999
Author: ahutsunshine <ah...@gmail.com>
Authored: Fri Aug 10 18:28:48 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Fri Aug 10 18:28:48 2018 +0800

----------------------------------------------------------------------
 .../griffin/core/job/BatchJobOperatorImpl.java  |   2 +-
 .../apache/griffin/core/job/JobController.java  |   2 +-
 .../apache/griffin/core/job/JobInstance.java    |   6 +-
 .../org/apache/griffin/core/job/JobService.java |   2 +-
 .../apache/griffin/core/job/JobServiceImpl.java |  13 ++-
 .../griffin/core/job/entity/JobDataSegment.java |  14 +--
 .../core/measure/entity/DataConnector.java      |  19 ++-
 .../griffin/core/measure/entity/DataSource.java |  50 +++++---
 .../core/measure/entity/GriffinMeasure.java     |   5 +-
 .../griffin/core/measure/entity/Measure.java    |  65 +++++++++--
 .../griffin/core/measure/entity/Rule.java       | 115 +++++++++----------
 .../measure/entity/StreamingPreProcess.java     |  23 +++-
 service/src/main/resources/env/env_batch.json   |  50 +++-----
 .../src/main/resources/env/env_streaming.json   |  24 ++--
 .../core/job/FileExistPredicatorTest.java       |   4 +
 .../core/measure/repo/MeasureRepoTest.java      |  19 ++-
 .../apache/griffin/core/util/EntityHelper.java  |  42 ++++---
 17 files changed, 275 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index 41f5142..bc73cd8 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -315,7 +315,7 @@ public class BatchJobOperatorImpl implements JobOperator {
     private boolean isValidBaseLine(List<JobDataSegment> segments) {
         assert segments != null;
         for (JobDataSegment jds : segments) {
-            if (jds.isBaseline()) {
+            if (jds.isAsTsBaseline()) {
                 return true;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index ebfdd91..64b8e42 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -93,7 +93,7 @@ public class JobController {
 
     @RequestMapping(path = "/jobs/download", method = RequestMethod.GET)
     public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName, @RequestParam("ts") long timestamp) throws Exception {
-        String path = jobService.getJobHdfsPersistPath(jobName, timestamp);
+        String path = jobService.getJobHdfsSinksPath(jobName, timestamp);
         InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path));
         return ResponseEntity.ok().
                 header("content-disposition", "attachment; filename = sampleMissingData.json")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index 18a5a96..ab67c81 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -140,7 +140,7 @@ public class JobInstance implements Job {
     private void setSourcesPartitionsAndPredicates(List<DataSource> sources) throws Exception {
         boolean isFirstBaseline = true;
         for (JobDataSegment jds : job.getSegments()) {
-            if (jds.isBaseline() && isFirstBaseline) {
+            if (jds.isAsTsBaseline() && isFirstBaseline) {
                 Long tsOffset = TimeUtil.str2Long(jds.getSegmentRange().getBegin());
                 measure.setTimestamp(jobStartTime + tsOffset);
                 isFirstBaseline = false;
@@ -347,7 +347,7 @@ public class JobInstance implements Job {
 
     private void preProcessMeasure() throws IOException {
         for (DataSource source : measure.getDataSources()) {
-            Map cacheMap = source.getCacheMap();
+            Map cacheMap = source.getCheckpointMap();
             //to skip batch job
             if (cacheMap == null) {
                 return;
@@ -357,7 +357,7 @@ public class JobInstance implements Job {
             cache = cache.replaceAll("\\$\\{SOURCE_NAME}", source.getName());
             cache = cache.replaceAll("\\$\\{TARGET_NAME}", source.getName());
             cacheMap = toEntity(cache, Map.class);
-            source.setCacheMap(cacheMap);
+            source.setCheckpointMap(cacheMap);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java
index 65766e2..58e541f 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobService.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java
@@ -44,5 +44,5 @@ public interface JobService {
 
     JobHealth getHealthInfo();
 
-    String getJobHdfsPersistPath(String jobName, long timestamp);
+    String getJobHdfsSinksPath(String jobName, long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index b041c0b..5617065 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -545,24 +545,25 @@ public class JobServiceImpl implements JobService {
     }
 
     @Override
-    public String getJobHdfsPersistPath(String jobName, long timestamp) {
+    public String getJobHdfsSinksPath(String jobName, long timestamp) {
         List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false);
         if (jobList.size() == 0) {
             return null;
         }
         if (jobList.get(0).getType().toLowerCase().equals("batch")) {
-            return getPersistPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
+            return getSinksPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
         }
 
-        return getPersistPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
+        return getSinksPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
     }
 
-    private String getPersistPath(String jsonString) {
+    private String getSinksPath(String jsonString) {
         try {
             JSONObject obj = new JSONObject(jsonString);
-            JSONArray persistArray = obj.getJSONArray("persist");
+            JSONArray persistArray = obj.getJSONArray("sinks");
             for (int i = 0; i < persistArray.length(); i++) {
-                if (persistArray.getJSONObject(i).get("type").equals("hdfs")) {
+                Object type = persistArray.getJSONObject(i).get("type");
+                if (type instanceof String && "hdfs".equalsIgnoreCase(String.valueOf(type))) {
                     return persistArray.getJSONObject(i).getJSONObject("config").getString("path");
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
index 62abe3e..a990abe 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
@@ -43,19 +43,19 @@ public class JobDataSegment extends AbstractAuditableEntity {
     @NotNull
     private String dataConnectorName;
 
-    private boolean baseline = false;
+    private boolean asTsBaseline = false;
 
     @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "segment_range_id")
     private SegmentRange segmentRange = new SegmentRange();
 
     @JsonProperty("as.baseline")
-    public boolean isBaseline() {
-        return baseline;
+    public boolean isAsTsBaseline() {
+        return asTsBaseline;
     }
 
-    public void setBaseline(boolean baseline) {
-        this.baseline = baseline;
+    public void setAsTsBaseline(boolean asTsBaseline) {
+        this.asTsBaseline = asTsBaseline;
     }
 
     @JsonProperty("segment.range")
@@ -85,12 +85,12 @@ public class JobDataSegment extends AbstractAuditableEntity {
 
     public JobDataSegment(String dataConnectorName, boolean baseline) {
         this.dataConnectorName = dataConnectorName;
-        this.baseline = baseline;
+        this.asTsBaseline = baseline;
     }
 
     public JobDataSegment(String dataConnectorName, boolean baseline, SegmentRange segmentRange) {
         this.dataConnectorName = dataConnectorName;
-        this.baseline = baseline;
+        this.asTsBaseline = baseline;
         this.segmentRange = segmentRange;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
index 2ec3ed7..2a63b57 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
@@ -73,6 +73,9 @@ public class DataConnector extends AbstractAuditableEntity {
     private String version;
 
     @JsonInclude(JsonInclude.Include.NON_NULL)
+    private String dataFrameName;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
     private String dataUnit;
 
     @JsonInclude(JsonInclude.Include.NON_NULL)
@@ -131,6 +134,15 @@ public class DataConnector extends AbstractAuditableEntity {
         return config;
     }
 
+    @JsonProperty("dataframe.name")
+    public String getDataFrameName() {
+        return dataFrameName;
+    }
+
+    public void setDataFrameName(String dataFrameName) {
+        this.dataFrameName = dataFrameName;
+    }
+
     @JsonProperty("data.unit")
     public String getDataUnit() {
         return dataUnit;
@@ -204,16 +216,19 @@ public class DataConnector extends AbstractAuditableEntity {
     public DataConnector() {
     }
 
-    public DataConnector(String name, DataType type, String version, String config) throws IOException {
+    public DataConnector(String name, DataType type, String version,
+                         String config, String dataFrameName) throws IOException {
         this.name = name;
         this.type = type;
         this.version = version;
         this.config = config;
         this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() {
         });
+        this.dataFrameName = dataFrameName;
     }
 
-    public DataConnector(String name, String dataUnit, Map configMap, List<SegmentPredicate> predicates) {
+    public DataConnector(String name, String dataUnit, Map configMap,
+                         List<SegmentPredicate> predicates) {
         this.name = name;
         this.dataUnit = dataUnit;
         this.configMap = configMap;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
index 59878cc..79525b2 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
@@ -55,13 +55,15 @@ public class DataSource extends AbstractAuditableEntity {
     @JoinColumn(name = "data_source_id")
     private List<DataConnector> connectors = new ArrayList<>();
 
+    private boolean baseline = false;
+
     @JsonIgnore
     @Column(length = 1024)
-    private String cache;
+    private String checkpoint;
 
     @Transient
     @JsonInclude(JsonInclude.Include.NON_NULL)
-    private Map<String, Object> cacheMap;
+    private Map<String, Object> checkpointMap;
 
 
     public String getName() {
@@ -83,36 +85,44 @@ public class DataSource extends AbstractAuditableEntity {
         this.connectors = connectors;
     }
 
-    private String getCache() {
-        return cache;
+    public boolean isBaseline() {
+        return baseline;
+    }
+
+    public void setBaseline(boolean baseline) {
+        this.baseline = baseline;
+    }
+
+    private String getCheckpoint() {
+        return checkpoint;
     }
 
-    private void setCache(String cache) {
-        this.cache = cache;
+    private void setCheckpoint(String checkpoint) {
+        this.checkpoint = checkpoint;
 
     }
 
-    @JsonProperty("cache")
-    public Map<String, Object> getCacheMap() {
-        return cacheMap;
+    @JsonProperty("checkpoint")
+    public Map<String, Object> getCheckpointMap() {
+        return checkpointMap;
     }
 
-    public void setCacheMap(Map<String, Object> cacheMap) {
-        this.cacheMap = cacheMap;
+    public void setCheckpointMap(Map<String, Object> checkpointMap) {
+        this.checkpointMap = checkpointMap;
     }
 
     @PrePersist
     @PreUpdate
     public void save() throws JsonProcessingException {
-        if (cacheMap != null) {
-            this.cache = JsonUtil.toJson(cacheMap);
+        if (checkpointMap != null) {
+            this.checkpoint = JsonUtil.toJson(checkpointMap);
         }
     }
 
     @PostLoad
     public void load() throws IOException {
-        if (!StringUtils.isEmpty(cache)) {
-            this.cacheMap = JsonUtil.toEntity(cache, new TypeReference<Map<String, Object>>() {
+        if (!StringUtils.isEmpty(checkpoint)) {
+            this.checkpointMap = JsonUtil.toEntity(checkpoint, new TypeReference<Map<String, Object>>() {
             });
         }
     }
@@ -124,4 +134,14 @@ public class DataSource extends AbstractAuditableEntity {
         this.name = name;
         this.connectors = connectors;
     }
+
+    public DataSource(String name, boolean baseline,
+                      Map<String, Object> checkpointMap,
+                      List<DataConnector> connectors) {
+        this.name = name;
+        this.baseline = baseline;
+        this.checkpointMap = checkpointMap;
+        this.connectors = connectors;
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
index 6dd3172..e4a1e41 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
@@ -156,11 +156,14 @@ public class GriffinMeasure extends Measure {
         super();
     }
 
-    public GriffinMeasure(String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
+    public GriffinMeasure(String name, String owner, List<DataSource> dataSources,
+                          EvaluateRule evaluateRule,
+                          List<String> sinksList) {
         this.name = name;
         this.owner = owner;
         this.dataSources = dataSources;
         this.evaluateRule = evaluateRule;
+        setSinksList(sinksList);
     }
 
     public GriffinMeasure(Long measureId, String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
index fcecc1d..a831ea3 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
@@ -19,17 +19,18 @@ under the License.
 
 package org.apache.griffin.core.measure.entity;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import javax.persistence.Entity;
-import javax.persistence.EnumType;
-import javax.persistence.Enumerated;
-import javax.persistence.Inheritance;
-import javax.persistence.InheritanceType;
+import com.fasterxml.jackson.annotation.*;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.util.JsonUtil;
+
+import javax.persistence.*;
 import javax.validation.constraints.NotNull;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 
 @Entity
 @Inheritance(strategy = InheritanceType.JOINED)
@@ -53,6 +54,13 @@ public abstract class Measure extends AbstractAuditableEntity {
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private String organization;
 
+    @Transient
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private List<String> sinksList = Arrays.asList("ELASTICSEARCH", "HDFS");
+
+    @JsonIgnore
+    private String sinks;
+
     private boolean deleted = false;
 
     public String getName() {
@@ -96,6 +104,23 @@ public abstract class Measure extends AbstractAuditableEntity {
         this.owner = owner;
     }
 
+    @JsonProperty("sinks")
+    public List<String> getSinksList() {
+        return sinksList;
+    }
+
+    public void setSinksList(List<String> sinksList) {
+        this.sinksList = sinksList;
+    }
+
+    private String getSinks() {
+        return sinks;
+    }
+
+    private void setSinks(String sinks) {
+        this.sinks = sinks;
+    }
+
     public boolean isDeleted() {
         return deleted;
     }
@@ -104,6 +129,26 @@ public abstract class Measure extends AbstractAuditableEntity {
         this.deleted = deleted;
     }
 
+    @PrePersist
+    @PreUpdate
+    public void save() throws JsonProcessingException {
+        if (sinksList != null) {
+            this.sinks = JsonUtil.toJson(sinksList);
+        } else {
+            this.sinks = null;
+        }
+    }
+
+    @PostLoad
+    public void load() throws IOException {
+        if (!StringUtils.isEmpty(sinks)) {
+            this.sinksList = JsonUtil.toEntity(sinks, new TypeReference<List<String>>() {
+            });
+        } else {
+            this.sinksList = null;
+        }
+    }
+
     public Measure() {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
index 033af2e..10203e2 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
@@ -26,13 +26,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.PostLoad;
-import javax.persistence.PrePersist;
-import javax.persistence.PreUpdate;
-import javax.persistence.Transient;
+import javax.persistence.*;
 import javax.validation.constraints.NotNull;
 
 import org.apache.commons.lang.StringUtils;
@@ -49,14 +45,18 @@ public class Rule extends AbstractAuditableEntity {
     @NotNull
     private String dslType;
 
-    @NotNull
-    private String dqType;
+    @Enumerated(EnumType.STRING)
+    private DqType dqType;
 
     @Column(length = 8 * 1024)
     @NotNull
     private String rule;
 
-    private String name;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private String inDataFrameName;
+
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private String outDataFrameName;
 
     @JsonIgnore
 //    @Access(AccessType.PROPERTY)
@@ -67,21 +67,12 @@ public class Rule extends AbstractAuditableEntity {
     @JsonInclude(JsonInclude.Include.NON_NULL)
     private Map<String, Object> detailsMap;
 
-    @JsonIgnore
-//    @Access(AccessType.PROPERTY)
-    private String metric;
-
     @Transient
     @JsonInclude(JsonInclude.Include.NON_NULL)
-    private Map<String, Object> metricMap;
+    private List<Map<String, Object>> outList;
 
     @JsonIgnore
-//    @Access(AccessType.PROPERTY)
-    private String record;
-
-    @Transient
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    private Map<String, Object> recordMap;
+    private String out;
 
     @JsonProperty("dsl.type")
     public String getDslType() {
@@ -93,11 +84,11 @@ public class Rule extends AbstractAuditableEntity {
     }
 
     @JsonProperty("dq.type")
-    public String getDqType() {
+    public DqType getDqType() {
         return dqType;
     }
 
-    public void setDqType(String dqType) {
+    public void setDqType(DqType dqType) {
         this.dqType = dqType;
     }
 
@@ -109,31 +100,31 @@ public class Rule extends AbstractAuditableEntity {
         this.rule = rule;
     }
 
-    @JsonProperty("details")
-    public Map<String, Object> getDetailsMap() {
-        return detailsMap;
+    @JsonProperty("in.dataframe.name")
+    public String getInDataFrameName() {
+        return inDataFrameName;
     }
 
-    public void setDetailsMap(Map<String, Object> detailsMap) {
-        this.detailsMap = detailsMap;
+    public void setInDataFrameName(String inDataFrameName) {
+        this.inDataFrameName = inDataFrameName;
     }
 
-    @JsonProperty("metric")
-    public Map<String, Object> getMetricMap() {
-        return metricMap;
+    @JsonProperty("out.dataframe.name")
+    public String getOutDataFrameName() {
+        return outDataFrameName;
     }
 
-    public void setMetricMap(Map<String, Object> metricMap) {
-        this.metricMap = metricMap;
+    public void setOutDataFrameName(String outDataFrameName) {
+        this.outDataFrameName = outDataFrameName;
     }
 
-    @JsonProperty("record")
-    public Map<String, Object> getRecordMap() {
-        return recordMap;
+    @JsonProperty("details")
+    public Map<String, Object> getDetailsMap() {
+        return detailsMap;
     }
 
-    public void setRecordMap(Map<String, Object> recordMap) {
-        this.recordMap = recordMap;
+    public void setDetailsMap(Map<String, Object> detailsMap) {
+        this.detailsMap = detailsMap;
     }
 
     private String getDetails() {
@@ -144,24 +135,21 @@ public class Rule extends AbstractAuditableEntity {
         this.details = details;
     }
 
-    private void setMetric(String metric) {
-        this.metric = metric;
-    }
-
-    private String getRecord() {
-        return record;
+    @JsonProperty("out")
+    public List<Map<String, Object>> getOutList() {
+        return outList;
     }
 
-    private void setRecord(String record) {
-        this.record = record;
+    public void setOutList(List<Map<String, Object>> outList) {
+        this.outList = outList;
     }
 
-    public String getName() {
-        return name;
+    private String getOut() {
+        return out;
     }
 
-    public void setName(String name) {
-        this.name = name;
+    private void setOut(String out) {
+        this.out = out;
     }
 
     @PrePersist
@@ -170,13 +158,9 @@ public class Rule extends AbstractAuditableEntity {
         if (detailsMap != null) {
             this.details = JsonUtil.toJson(detailsMap);
         }
-        if (metricMap != null) {
-            this.metric = JsonUtil.toJson(metricMap);
+        if (outList != null) {
+            this.out = JsonUtil.toJson(outList);
         }
-        if (recordMap != null) {
-            this.record = JsonUtil.toJson(recordMap);
-        }
-
     }
 
     @PostLoad
@@ -185,12 +169,8 @@ public class Rule extends AbstractAuditableEntity {
             this.detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() {
             });
         }
-        if (!StringUtils.isEmpty(metric)) {
-            this.metricMap = JsonUtil.toEntity(metric, new TypeReference<Map<String, Object>>() {
-            });
-        }
-        if (!StringUtils.isEmpty(record)) {
-            this.recordMap = JsonUtil.toEntity(record, new TypeReference<Map<String, Object>>() {
+        if (!StringUtils.isEmpty(out)) {
+            this.outList = JsonUtil.toEntity(out, new TypeReference<List<Map<String, Object>>>() {
             });
         }
     }
@@ -198,10 +178,21 @@ public class Rule extends AbstractAuditableEntity {
     public Rule() {
     }
 
-    public Rule(String dslType, String dqType, String rule, Map<String, Object> detailsMap) throws JsonProcessingException {
+    public Rule(String dslType, DqType dqType, String rule, Map<String, Object> detailsMap) throws JsonProcessingException {
         this.dslType = dslType;
         this.dqType = dqType;
         this.rule = rule;
+        this.detailsMap = detailsMap;
         this.details = JsonUtil.toJson(detailsMap);
     }
+
+    public Rule(String dslType, DqType dqType, String rule,
+                String inDataFrameName, String outDataFrameName,
+                Map<String, Object> detailsMap,
+                List<Map<String, Object>> outList) throws JsonProcessingException {
+        this(dslType, dqType, rule, detailsMap);
+        this.inDataFrameName = inDataFrameName;
+        this.outDataFrameName = outDataFrameName;
+        this.outList = outList;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
index 9902c14..4be8e6c 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
@@ -43,7 +43,9 @@ public class StreamingPreProcess extends AbstractAuditableEntity {
 
     private String dslType;
 
-    private String name;
+    private String inDataFrameName;
+
+    private String outDataFrameName;
 
     private String rule;
 
@@ -64,14 +66,25 @@ public class StreamingPreProcess extends AbstractAuditableEntity {
         this.dslType = dslType;
     }
 
-    public String getName() {
-        return name;
+    @JsonProperty("in.dataframe.name")
+    public String getInDataFrameName() {
+        return inDataFrameName;
+    }
+
+    public void setInDataFrameName(String inDataFrameName) {
+        this.inDataFrameName = inDataFrameName;
     }
 
-    public void setName(String name) {
-        this.name = name;
+    @JsonProperty("out.dataframe.name")
+    public String getOutDataFrameName() {
+        return outDataFrameName;
     }
 
+    public void setOutDataFrameName(String outDataFrameName) {
+        this.outDataFrameName = outDataFrameName;
+    }
+
+
     public String getRule() {
         return rule;
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/resources/env/env_batch.json
----------------------------------------------------------------------
diff --git a/service/src/main/resources/env/env_batch.json b/service/src/main/resources/env/env_batch.json
index 1e3160f..72a3839 100644
--- a/service/src/main/resources/env/env_batch.json
+++ b/service/src/main/resources/env/env_batch.json
@@ -1,55 +1,31 @@
 {
   "spark": {
-    "log.level": "WARN",
-    "checkpoint.dir": "hdfs:///griffin/checkpoint/${JOB_NAME}",
-    "batch.interval": "20s",
-    "process.interval": "1m",
-    "config": {
-      "spark.default.parallelism": 4,
-      "spark.task.maxFailures": 5,
-      "spark.streaming.kafkaMaxRatePerPartition": 1000,
-      "spark.streaming.concurrentJobs": 4,
-      "spark.yarn.maxAppAttempts": 5,
-      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
-      "spark.yarn.max.executor.failures": 120,
-      "spark.yarn.executor.failuresValidityInterval": "1h",
-      "spark.hadoop.fs.hdfs.impl.disable.cache": true
-    }
+    "log.level": "WARN"
   },
-  "persist": [
+  "sinks": [
     {
-      "type": "log",
+      "type": "CONSOLE",
       "config": {
-        "max.log.lines": 2
+        "max.log.lines": 10
       }
     },
     {
-      "type": "hdfs",
+      "type": "HDFS",
       "config": {
-        "path": "hdfs:///griffin/persist"
+        "path": "hdfs:///griffin/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
       }
     },
     {
-      "type": "http",
+      "type": "ELASTICSEARCH",
       "config": {
         "method": "post",
-        "api": "http://es:9200/griffin/accuracy"
-      }
-    }
-  ],
-  "info.cache": [
-    {
-      "type": "zk",
-      "config": {
-        "hosts": "zk:2181",
-        "namespace": "griffin/infocache",
-        "lock.path": "lock",
-        "mode": "persist",
-        "init.clear": false,
-        "close.clear": false
+        "api": "http://es:9200/griffin/accuracy",
+        "connection.timeout": "1m",
+        "retry": 10
       }
     }
   ],
-  "cleaner": {
-  }
+  "griffin.checkpoint": []
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/resources/env/env_streaming.json
----------------------------------------------------------------------
diff --git a/service/src/main/resources/env/env_streaming.json b/service/src/main/resources/env/env_streaming.json
index 5c4b190..6508af1 100644
--- a/service/src/main/resources/env/env_streaming.json
+++ b/service/src/main/resources/env/env_streaming.json
@@ -3,8 +3,8 @@
     "log.level": "WARN",
     "checkpoint.dir": "hdfs:///griffin/checkpoint/${JOB_NAME}",
     "init.clear": true,
-    "batch.interval": "30s",
-    "process.interval": "3m",
+    "batch.interval": "1m",
+    "process.interval": "5m",
     "config": {
       "spark.default.parallelism": 4,
       "spark.task.maxFailures": 5,
@@ -17,28 +17,30 @@
       "spark.hadoop.fs.hdfs.impl.disable.cache": true
     }
   },
-  "persist": [
+  "sinks": [
     {
-      "type": "log",
+      "type": "CONSOLE",
       "config": {
-        "max.log.lines": 2
+        "max.log.lines": 100
       }
     },
     {
-      "type": "hdfs",
+      "type": "HDFS",
       "config": {
-        "path": "hdfs:///griffin/persist"
+        "path": "hdfs:///griffin/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
       }
     },
     {
-      "type": "http",
+      "type": "ELASTICSEARCH",
       "config": {
         "method": "post",
         "api": "http://es:9200/griffin/accuracy"
       }
     }
   ],
-  "info.cache": [
+  "griffin.checkpoint": [
     {
       "type": "zk",
       "config": {
@@ -50,7 +52,5 @@
         "close.clear": false
       }
     }
-  ],
-  "cleaner": {
-  }
+  ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java b/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java
index 0d144be..af981e3 100644
--- a/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java
@@ -21,7 +21,11 @@ public class FileExistPredicatorTest {
 
     @BeforeClass
     public static void mkFile() throws IOException {
+        File fileDirectory = new File(rootPath); // to fix createFileExclusively exception
         File file = new File(rootPath + fileName);
+        if (!fileDirectory.exists()) {
+            fileDirectory.mkdir();
+        }
         if (!file.exists()) {
             file.createNewFile();
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
index aa80608..ef37f97 100644
--- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
+++ b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
@@ -21,11 +21,12 @@ package org.apache.griffin.core.measure.repo;
 
 import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 
 import org.apache.griffin.core.config.EclipseLinkJpaConfigForTest;
-import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.measure.entity.*;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -57,7 +58,21 @@ public class MeasureRepoTest {
     public void testFindByNameAndDeleted() {
         String name = "m1";
         List<Measure> measures = measureRepo.findByNameAndDeleted(name, false);
-        assertThat(measures.get(0).getName()).isEqualTo(name);
+        GriffinMeasure m = (GriffinMeasure) measures.get(0);
+
+        List<DataSource> sources = m.getDataSources();
+        DataConnector connector = sources.get(0).getConnectors().get(0);
+        Rule rule = m.getEvaluateRule().getRules().get(0);
+        assertEquals(m.getSinksList().size(), 2);
+        assertEquals(sources.get(0).isBaseline(), true);
+        assertEquals(sources.get(0).getCheckpointMap().size(), 1);
+        assertEquals(connector.getDataFrameName(), "kafka");
+        assertEquals(connector.getConfigMap().size(), 3);
+        assertEquals(rule.getDqType(), DqType.ACCURACY);
+        assertEquals(rule.getInDataFrameName(), "in");
+        assertEquals(rule.getOutDataFrameName(), "out");
+        assertEquals(rule.getDetailsMap().size(), 1);
+        assertEquals(rule.getOutList().size(), 2);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java b/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
index e2254d1..4c28ea7 100644
--- a/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
+++ b/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
@@ -43,12 +43,7 @@ import org.apache.griffin.core.job.entity.LivySessionStates;
 import org.apache.griffin.core.job.entity.SegmentPredicate;
 import org.apache.griffin.core.job.entity.SegmentRange;
 import org.apache.griffin.core.job.entity.VirtualJob;
-import org.apache.griffin.core.measure.entity.DataConnector;
-import org.apache.griffin.core.measure.entity.DataSource;
-import org.apache.griffin.core.measure.entity.EvaluateRule;
-import org.apache.griffin.core.measure.entity.ExternalMeasure;
-import org.apache.griffin.core.measure.entity.GriffinMeasure;
-import org.apache.griffin.core.measure.entity.Rule;
+import org.apache.griffin.core.measure.entity.*;
 import org.quartz.JobDataMap;
 import org.quartz.JobKey;
 import org.quartz.SimpleTrigger;
@@ -81,17 +76,34 @@ public class EntityHelper {
     public static GriffinMeasure createGriffinMeasure(String name,
                                                       DataConnector dcSource,
                                                       DataConnector dcTarget) throws Exception {
-        DataSource dataSource = new DataSource("source", Arrays.asList(dcSource));
-        DataSource targetSource = new DataSource("target", Arrays.asList(dcTarget));
+        DataSource dataSource = new DataSource("source", true, createCheckpointMap(), Arrays.asList(dcSource));
+        DataSource targetSource = new DataSource("target", false, createCheckpointMap(), Arrays.asList(dcTarget));
         List<DataSource> dataSources = new ArrayList<>();
         dataSources.add(dataSource);
         dataSources.add(targetSource);
-        String rules = "source.id=target.id AND source.name=target.name AND source.age=target.age";
-        Map<String, Object> map = new HashMap<>();
-        map.put("detail", "detail info");
-        Rule rule = new Rule("griffin-dsl", "ACCURACY", rules, map);
+        Rule rule = createRule();
         EvaluateRule evaluateRule = new EvaluateRule(Arrays.asList(rule));
-        return new GriffinMeasure(name, "test", dataSources, evaluateRule);
+        return new GriffinMeasure(name, "test", dataSources, evaluateRule, Arrays.asList("ELASTICSEARCH", "HDFS"));
+    }
+
+    private static Rule createRule() throws JsonProcessingException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("detail", "detail");
+        String rule = "source.id=target.id AND source.name=target.name AND source.age=target.age";
+        Map<String, Object> metricMap = new HashMap<>();
+        Map<String, Object> recordMap = new HashMap<>();
+        metricMap.put("type", "metric");
+        metricMap.put("name", "accu");
+        recordMap.put("type", "record");
+        recordMap.put("name", "missRecords");
+        List<Map<String, Object>> outList = Arrays.asList(metricMap, recordMap);
+        return new Rule("griffin-dsl", DqType.ACCURACY, rule, "in", "out", map, outList);
+    }
+
+    private static Map<String, Object> createCheckpointMap() {
+        Map<String, Object> map = new HashMap<>();
+        map.put("info.path", "source");
+        return map;
     }
 
     public static DataConnector createDataConnector(String name,
@@ -102,14 +114,14 @@ public class EntityHelper {
         config.put("database", database);
         config.put("table.name", table);
         config.put("where", where);
-        return new DataConnector(name, "1h", config, null);
+        return new DataConnector(name, DataConnector.DataType.HIVE, "1.2", JsonUtil.toJson(config), "kafka");
     }
 
     public static DataConnector createDataConnector(String name,
                                                     String database,
                                                     String table,
                                                     String where,
-                                                    SegmentPredicate predicate) throws IOException {
+                                                    SegmentPredicate predicate) {
         HashMap<String, String> config = new HashMap<>();
         config.put("database", database);
         config.put("table.name", table);