You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/08/10 10:28:54 UTC
incubator-griffin git commit: update measure field to support new
format and ut
Repository: incubator-griffin
Updated Branches:
refs/heads/master 23ff999cd -> 7b749ad72
update measure field to support new format and ut
1.update env_batch.json and env_streaming.json
2.Rule
- add "inDataFrameName" and "outDataFrameName", remove "name"
- add "out" param array, move "metric", "record" param inside "out" array
DataSource
- add boolean field "baseline"
- change "cache" to "checkpoint"
DataConnector
- add "dataFrameName"
Measure
- add "sinks" string array
- update dqType from String to enum
JobServiceImpl
- change "persist" to "sinks"
- compare literal string "hdfs" case insensitively
3.update measure ut and fix predicate ut bug
Author: ahutsunshine <ah...@gmail.com>
Closes #389 from ahutsunshine/master.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/7b749ad7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/7b749ad7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/7b749ad7
Branch: refs/heads/master
Commit: 7b749ad72a78eb4244bcf47a80a52f2a6e3f222f
Parents: 23ff999
Author: ahutsunshine <ah...@gmail.com>
Authored: Fri Aug 10 18:28:48 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Fri Aug 10 18:28:48 2018 +0800
----------------------------------------------------------------------
.../griffin/core/job/BatchJobOperatorImpl.java | 2 +-
.../apache/griffin/core/job/JobController.java | 2 +-
.../apache/griffin/core/job/JobInstance.java | 6 +-
.../org/apache/griffin/core/job/JobService.java | 2 +-
.../apache/griffin/core/job/JobServiceImpl.java | 13 ++-
.../griffin/core/job/entity/JobDataSegment.java | 14 +--
.../core/measure/entity/DataConnector.java | 19 ++-
.../griffin/core/measure/entity/DataSource.java | 50 +++++---
.../core/measure/entity/GriffinMeasure.java | 5 +-
.../griffin/core/measure/entity/Measure.java | 65 +++++++++--
.../griffin/core/measure/entity/Rule.java | 115 +++++++++----------
.../measure/entity/StreamingPreProcess.java | 23 +++-
service/src/main/resources/env/env_batch.json | 50 +++-----
.../src/main/resources/env/env_streaming.json | 24 ++--
.../core/job/FileExistPredicatorTest.java | 4 +
.../core/measure/repo/MeasureRepoTest.java | 19 ++-
.../apache/griffin/core/util/EntityHelper.java | 42 ++++---
17 files changed, 275 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
index 41f5142..bc73cd8 100644
--- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java
@@ -315,7 +315,7 @@ public class BatchJobOperatorImpl implements JobOperator {
private boolean isValidBaseLine(List<JobDataSegment> segments) {
assert segments != null;
for (JobDataSegment jds : segments) {
- if (jds.isBaseline()) {
+ if (jds.isAsTsBaseline()) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index ebfdd91..64b8e42 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -93,7 +93,7 @@ public class JobController {
@RequestMapping(path = "/jobs/download", method = RequestMethod.GET)
public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName, @RequestParam("ts") long timestamp) throws Exception {
- String path = jobService.getJobHdfsPersistPath(jobName, timestamp);
+ String path = jobService.getJobHdfsSinksPath(jobName, timestamp);
InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path));
return ResponseEntity.ok().
header("content-disposition", "attachment; filename = sampleMissingData.json")
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
index 18a5a96..ab67c81 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java
@@ -140,7 +140,7 @@ public class JobInstance implements Job {
private void setSourcesPartitionsAndPredicates(List<DataSource> sources) throws Exception {
boolean isFirstBaseline = true;
for (JobDataSegment jds : job.getSegments()) {
- if (jds.isBaseline() && isFirstBaseline) {
+ if (jds.isAsTsBaseline() && isFirstBaseline) {
Long tsOffset = TimeUtil.str2Long(jds.getSegmentRange().getBegin());
measure.setTimestamp(jobStartTime + tsOffset);
isFirstBaseline = false;
@@ -347,7 +347,7 @@ public class JobInstance implements Job {
private void preProcessMeasure() throws IOException {
for (DataSource source : measure.getDataSources()) {
- Map cacheMap = source.getCacheMap();
+ Map cacheMap = source.getCheckpointMap();
//to skip batch job
if (cacheMap == null) {
return;
@@ -357,7 +357,7 @@ public class JobInstance implements Job {
cache = cache.replaceAll("\\$\\{SOURCE_NAME}", source.getName());
cache = cache.replaceAll("\\$\\{TARGET_NAME}", source.getName());
cacheMap = toEntity(cache, Map.class);
- source.setCacheMap(cacheMap);
+ source.setCheckpointMap(cacheMap);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java
index 65766e2..58e541f 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobService.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java
@@ -44,5 +44,5 @@ public interface JobService {
JobHealth getHealthInfo();
- String getJobHdfsPersistPath(String jobName, long timestamp);
+ String getJobHdfsSinksPath(String jobName, long timestamp);
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index b041c0b..5617065 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -545,24 +545,25 @@ public class JobServiceImpl implements JobService {
}
@Override
- public String getJobHdfsPersistPath(String jobName, long timestamp) {
+ public String getJobHdfsSinksPath(String jobName, long timestamp) {
List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false);
if (jobList.size() == 0) {
return null;
}
if (jobList.get(0).getType().toLowerCase().equals("batch")) {
- return getPersistPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
+ return getSinksPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
}
- return getPersistPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
+ return getSinksPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
}
- private String getPersistPath(String jsonString) {
+ private String getSinksPath(String jsonString) {
try {
JSONObject obj = new JSONObject(jsonString);
- JSONArray persistArray = obj.getJSONArray("persist");
+ JSONArray persistArray = obj.getJSONArray("sinks");
for (int i = 0; i < persistArray.length(); i++) {
- if (persistArray.getJSONObject(i).get("type").equals("hdfs")) {
+ Object type = persistArray.getJSONObject(i).get("type");
+ if (type instanceof String && "hdfs".equalsIgnoreCase(String.valueOf(type))) {
return persistArray.getJSONObject(i).getJSONObject("config").getString("path");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
index 62abe3e..a990abe 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java
@@ -43,19 +43,19 @@ public class JobDataSegment extends AbstractAuditableEntity {
@NotNull
private String dataConnectorName;
- private boolean baseline = false;
+ private boolean asTsBaseline = false;
@OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "segment_range_id")
private SegmentRange segmentRange = new SegmentRange();
@JsonProperty("as.baseline")
- public boolean isBaseline() {
- return baseline;
+ public boolean isAsTsBaseline() {
+ return asTsBaseline;
}
- public void setBaseline(boolean baseline) {
- this.baseline = baseline;
+ public void setAsTsBaseline(boolean asTsBaseline) {
+ this.asTsBaseline = asTsBaseline;
}
@JsonProperty("segment.range")
@@ -85,12 +85,12 @@ public class JobDataSegment extends AbstractAuditableEntity {
public JobDataSegment(String dataConnectorName, boolean baseline) {
this.dataConnectorName = dataConnectorName;
- this.baseline = baseline;
+ this.asTsBaseline = baseline;
}
public JobDataSegment(String dataConnectorName, boolean baseline, SegmentRange segmentRange) {
this.dataConnectorName = dataConnectorName;
- this.baseline = baseline;
+ this.asTsBaseline = baseline;
this.segmentRange = segmentRange;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
index 2ec3ed7..2a63b57 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
@@ -73,6 +73,9 @@ public class DataConnector extends AbstractAuditableEntity {
private String version;
@JsonInclude(JsonInclude.Include.NON_NULL)
+ private String dataFrameName;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
private String dataUnit;
@JsonInclude(JsonInclude.Include.NON_NULL)
@@ -131,6 +134,15 @@ public class DataConnector extends AbstractAuditableEntity {
return config;
}
+ @JsonProperty("dataframe.name")
+ public String getDataFrameName() {
+ return dataFrameName;
+ }
+
+ public void setDataFrameName(String dataFrameName) {
+ this.dataFrameName = dataFrameName;
+ }
+
@JsonProperty("data.unit")
public String getDataUnit() {
return dataUnit;
@@ -204,16 +216,19 @@ public class DataConnector extends AbstractAuditableEntity {
public DataConnector() {
}
- public DataConnector(String name, DataType type, String version, String config) throws IOException {
+ public DataConnector(String name, DataType type, String version,
+ String config, String dataFrameName) throws IOException {
this.name = name;
this.type = type;
this.version = version;
this.config = config;
this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() {
});
+ this.dataFrameName = dataFrameName;
}
- public DataConnector(String name, String dataUnit, Map configMap, List<SegmentPredicate> predicates) {
+ public DataConnector(String name, String dataUnit, Map configMap,
+ List<SegmentPredicate> predicates) {
this.name = name;
this.dataUnit = dataUnit;
this.configMap = configMap;
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
index 59878cc..79525b2 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
@@ -55,13 +55,15 @@ public class DataSource extends AbstractAuditableEntity {
@JoinColumn(name = "data_source_id")
private List<DataConnector> connectors = new ArrayList<>();
+ private boolean baseline = false;
+
@JsonIgnore
@Column(length = 1024)
- private String cache;
+ private String checkpoint;
@Transient
@JsonInclude(JsonInclude.Include.NON_NULL)
- private Map<String, Object> cacheMap;
+ private Map<String, Object> checkpointMap;
public String getName() {
@@ -83,36 +85,44 @@ public class DataSource extends AbstractAuditableEntity {
this.connectors = connectors;
}
- private String getCache() {
- return cache;
+ public boolean isBaseline() {
+ return baseline;
+ }
+
+ public void setBaseline(boolean baseline) {
+ this.baseline = baseline;
+ }
+
+ private String getCheckpoint() {
+ return checkpoint;
}
- private void setCache(String cache) {
- this.cache = cache;
+ private void setCheckpoint(String checkpoint) {
+ this.checkpoint = checkpoint;
}
- @JsonProperty("cache")
- public Map<String, Object> getCacheMap() {
- return cacheMap;
+ @JsonProperty("checkpoint")
+ public Map<String, Object> getCheckpointMap() {
+ return checkpointMap;
}
- public void setCacheMap(Map<String, Object> cacheMap) {
- this.cacheMap = cacheMap;
+ public void setCheckpointMap(Map<String, Object> checkpointMap) {
+ this.checkpointMap = checkpointMap;
}
@PrePersist
@PreUpdate
public void save() throws JsonProcessingException {
- if (cacheMap != null) {
- this.cache = JsonUtil.toJson(cacheMap);
+ if (checkpointMap != null) {
+ this.checkpoint = JsonUtil.toJson(checkpointMap);
}
}
@PostLoad
public void load() throws IOException {
- if (!StringUtils.isEmpty(cache)) {
- this.cacheMap = JsonUtil.toEntity(cache, new TypeReference<Map<String, Object>>() {
+ if (!StringUtils.isEmpty(checkpoint)) {
+ this.checkpointMap = JsonUtil.toEntity(checkpoint, new TypeReference<Map<String, Object>>() {
});
}
}
@@ -124,4 +134,14 @@ public class DataSource extends AbstractAuditableEntity {
this.name = name;
this.connectors = connectors;
}
+
+ public DataSource(String name, boolean baseline,
+ Map<String, Object> checkpointMap,
+ List<DataConnector> connectors) {
+ this.name = name;
+ this.baseline = baseline;
+ this.checkpointMap = checkpointMap;
+ this.connectors = connectors;
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
index 6dd3172..e4a1e41 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
@@ -156,11 +156,14 @@ public class GriffinMeasure extends Measure {
super();
}
- public GriffinMeasure(String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
+ public GriffinMeasure(String name, String owner, List<DataSource> dataSources,
+ EvaluateRule evaluateRule,
+ List<String> sinksList) {
this.name = name;
this.owner = owner;
this.dataSources = dataSources;
this.evaluateRule = evaluateRule;
+ setSinksList(sinksList);
}
public GriffinMeasure(Long measureId, String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
index fcecc1d..a831ea3 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
@@ -19,17 +19,18 @@ under the License.
package org.apache.griffin.core.measure.entity;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import javax.persistence.Entity;
-import javax.persistence.EnumType;
-import javax.persistence.Enumerated;
-import javax.persistence.Inheritance;
-import javax.persistence.InheritanceType;
+import com.fasterxml.jackson.annotation.*;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.util.JsonUtil;
+
+import javax.persistence.*;
import javax.validation.constraints.NotNull;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
@Entity
@Inheritance(strategy = InheritanceType.JOINED)
@@ -53,6 +54,13 @@ public abstract class Measure extends AbstractAuditableEntity {
@JsonInclude(JsonInclude.Include.NON_NULL)
private String organization;
+ @Transient
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private List<String> sinksList = Arrays.asList("ELASTICSEARCH", "HDFS");
+
+ @JsonIgnore
+ private String sinks;
+
private boolean deleted = false;
public String getName() {
@@ -96,6 +104,23 @@ public abstract class Measure extends AbstractAuditableEntity {
this.owner = owner;
}
+ @JsonProperty("sinks")
+ public List<String> getSinksList() {
+ return sinksList;
+ }
+
+ public void setSinksList(List<String> sinksList) {
+ this.sinksList = sinksList;
+ }
+
+ private String getSinks() {
+ return sinks;
+ }
+
+ private void setSinks(String sinks) {
+ this.sinks = sinks;
+ }
+
public boolean isDeleted() {
return deleted;
}
@@ -104,6 +129,26 @@ public abstract class Measure extends AbstractAuditableEntity {
this.deleted = deleted;
}
+ @PrePersist
+ @PreUpdate
+ public void save() throws JsonProcessingException {
+ if (sinksList != null) {
+ this.sinks = JsonUtil.toJson(sinksList);
+ } else {
+ this.sinks = null;
+ }
+ }
+
+ @PostLoad
+ public void load() throws IOException {
+ if (!StringUtils.isEmpty(sinks)) {
+ this.sinksList = JsonUtil.toEntity(sinks, new TypeReference<List<String>>() {
+ });
+ } else {
+ this.sinksList = null;
+ }
+ }
+
public Measure() {
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
index 033af2e..10203e2 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
@@ -26,13 +26,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.PostLoad;
-import javax.persistence.PrePersist;
-import javax.persistence.PreUpdate;
-import javax.persistence.Transient;
+import javax.persistence.*;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.StringUtils;
@@ -49,14 +45,18 @@ public class Rule extends AbstractAuditableEntity {
@NotNull
private String dslType;
- @NotNull
- private String dqType;
+ @Enumerated(EnumType.STRING)
+ private DqType dqType;
@Column(length = 8 * 1024)
@NotNull
private String rule;
- private String name;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private String inDataFrameName;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private String outDataFrameName;
@JsonIgnore
// @Access(AccessType.PROPERTY)
@@ -67,21 +67,12 @@ public class Rule extends AbstractAuditableEntity {
@JsonInclude(JsonInclude.Include.NON_NULL)
private Map<String, Object> detailsMap;
- @JsonIgnore
-// @Access(AccessType.PROPERTY)
- private String metric;
-
@Transient
@JsonInclude(JsonInclude.Include.NON_NULL)
- private Map<String, Object> metricMap;
+ private List<Map<String, Object>> outList;
@JsonIgnore
-// @Access(AccessType.PROPERTY)
- private String record;
-
- @Transient
- @JsonInclude(JsonInclude.Include.NON_NULL)
- private Map<String, Object> recordMap;
+ private String out;
@JsonProperty("dsl.type")
public String getDslType() {
@@ -93,11 +84,11 @@ public class Rule extends AbstractAuditableEntity {
}
@JsonProperty("dq.type")
- public String getDqType() {
+ public DqType getDqType() {
return dqType;
}
- public void setDqType(String dqType) {
+ public void setDqType(DqType dqType) {
this.dqType = dqType;
}
@@ -109,31 +100,31 @@ public class Rule extends AbstractAuditableEntity {
this.rule = rule;
}
- @JsonProperty("details")
- public Map<String, Object> getDetailsMap() {
- return detailsMap;
+ @JsonProperty("in.dataframe.name")
+ public String getInDataFrameName() {
+ return inDataFrameName;
}
- public void setDetailsMap(Map<String, Object> detailsMap) {
- this.detailsMap = detailsMap;
+ public void setInDataFrameName(String inDataFrameName) {
+ this.inDataFrameName = inDataFrameName;
}
- @JsonProperty("metric")
- public Map<String, Object> getMetricMap() {
- return metricMap;
+ @JsonProperty("out.dataframe.name")
+ public String getOutDataFrameName() {
+ return outDataFrameName;
}
- public void setMetricMap(Map<String, Object> metricMap) {
- this.metricMap = metricMap;
+ public void setOutDataFrameName(String outDataFrameName) {
+ this.outDataFrameName = outDataFrameName;
}
- @JsonProperty("record")
- public Map<String, Object> getRecordMap() {
- return recordMap;
+ @JsonProperty("details")
+ public Map<String, Object> getDetailsMap() {
+ return detailsMap;
}
- public void setRecordMap(Map<String, Object> recordMap) {
- this.recordMap = recordMap;
+ public void setDetailsMap(Map<String, Object> detailsMap) {
+ this.detailsMap = detailsMap;
}
private String getDetails() {
@@ -144,24 +135,21 @@ public class Rule extends AbstractAuditableEntity {
this.details = details;
}
- private void setMetric(String metric) {
- this.metric = metric;
- }
-
- private String getRecord() {
- return record;
+ @JsonProperty("out")
+ public List<Map<String, Object>> getOutList() {
+ return outList;
}
- private void setRecord(String record) {
- this.record = record;
+ public void setOutList(List<Map<String, Object>> outList) {
+ this.outList = outList;
}
- public String getName() {
- return name;
+ private String getOut() {
+ return out;
}
- public void setName(String name) {
- this.name = name;
+ private void setOut(String out) {
+ this.out = out;
}
@PrePersist
@@ -170,13 +158,9 @@ public class Rule extends AbstractAuditableEntity {
if (detailsMap != null) {
this.details = JsonUtil.toJson(detailsMap);
}
- if (metricMap != null) {
- this.metric = JsonUtil.toJson(metricMap);
+ if (outList != null) {
+ this.out = JsonUtil.toJson(outList);
}
- if (recordMap != null) {
- this.record = JsonUtil.toJson(recordMap);
- }
-
}
@PostLoad
@@ -185,12 +169,8 @@ public class Rule extends AbstractAuditableEntity {
this.detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() {
});
}
- if (!StringUtils.isEmpty(metric)) {
- this.metricMap = JsonUtil.toEntity(metric, new TypeReference<Map<String, Object>>() {
- });
- }
- if (!StringUtils.isEmpty(record)) {
- this.recordMap = JsonUtil.toEntity(record, new TypeReference<Map<String, Object>>() {
+ if (!StringUtils.isEmpty(out)) {
+ this.outList = JsonUtil.toEntity(out, new TypeReference<List<Map<String, Object>>>() {
});
}
}
@@ -198,10 +178,21 @@ public class Rule extends AbstractAuditableEntity {
public Rule() {
}
- public Rule(String dslType, String dqType, String rule, Map<String, Object> detailsMap) throws JsonProcessingException {
+ public Rule(String dslType, DqType dqType, String rule, Map<String, Object> detailsMap) throws JsonProcessingException {
this.dslType = dslType;
this.dqType = dqType;
this.rule = rule;
+ this.detailsMap = detailsMap;
this.details = JsonUtil.toJson(detailsMap);
}
+
+ public Rule(String dslType, DqType dqType, String rule,
+ String inDataFrameName, String outDataFrameName,
+ Map<String, Object> detailsMap,
+ List<Map<String, Object>> outList) throws JsonProcessingException {
+ this(dslType, dqType, rule, detailsMap);
+ this.inDataFrameName = inDataFrameName;
+ this.outDataFrameName = outDataFrameName;
+ this.outList = outList;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
index 9902c14..4be8e6c 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java
@@ -43,7 +43,9 @@ public class StreamingPreProcess extends AbstractAuditableEntity {
private String dslType;
- private String name;
+ private String inDataFrameName;
+
+ private String outDataFrameName;
private String rule;
@@ -64,14 +66,25 @@ public class StreamingPreProcess extends AbstractAuditableEntity {
this.dslType = dslType;
}
- public String getName() {
- return name;
+ @JsonProperty("in.dataframe.name")
+ public String getInDataFrameName() {
+ return inDataFrameName;
+ }
+
+ public void setInDataFrameName(String inDataFrameName) {
+ this.inDataFrameName = inDataFrameName;
}
- public void setName(String name) {
- this.name = name;
+ @JsonProperty("out.dataframe.name")
+ public String getOutDataFrameName() {
+ return outDataFrameName;
}
+ public void setOutDataFrameName(String outDataFrameName) {
+ this.outDataFrameName = outDataFrameName;
+ }
+
+
public String getRule() {
return rule;
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/resources/env/env_batch.json
----------------------------------------------------------------------
diff --git a/service/src/main/resources/env/env_batch.json b/service/src/main/resources/env/env_batch.json
index 1e3160f..72a3839 100644
--- a/service/src/main/resources/env/env_batch.json
+++ b/service/src/main/resources/env/env_batch.json
@@ -1,55 +1,31 @@
{
"spark": {
- "log.level": "WARN",
- "checkpoint.dir": "hdfs:///griffin/checkpoint/${JOB_NAME}",
- "batch.interval": "20s",
- "process.interval": "1m",
- "config": {
- "spark.default.parallelism": 4,
- "spark.task.maxFailures": 5,
- "spark.streaming.kafkaMaxRatePerPartition": 1000,
- "spark.streaming.concurrentJobs": 4,
- "spark.yarn.maxAppAttempts": 5,
- "spark.yarn.am.attemptFailuresValidityInterval": "1h",
- "spark.yarn.max.executor.failures": 120,
- "spark.yarn.executor.failuresValidityInterval": "1h",
- "spark.hadoop.fs.hdfs.impl.disable.cache": true
- }
+ "log.level": "WARN"
},
- "persist": [
+ "sinks": [
{
- "type": "log",
+ "type": "CONSOLE",
"config": {
- "max.log.lines": 2
+ "max.log.lines": 10
}
},
{
- "type": "hdfs",
+ "type": "HDFS",
"config": {
- "path": "hdfs:///griffin/persist"
+ "path": "hdfs:///griffin/persist",
+ "max.persist.lines": 10000,
+ "max.lines.per.file": 10000
}
},
{
- "type": "http",
+ "type": "ELASTICSEARCH",
"config": {
"method": "post",
- "api": "http://es:9200/griffin/accuracy"
- }
- }
- ],
- "info.cache": [
- {
- "type": "zk",
- "config": {
- "hosts": "zk:2181",
- "namespace": "griffin/infocache",
- "lock.path": "lock",
- "mode": "persist",
- "init.clear": false,
- "close.clear": false
+ "api": "http://es:9200/griffin/accuracy",
+ "connection.timeout": "1m",
+ "retry": 10
}
}
],
- "cleaner": {
- }
+ "griffin.checkpoint": []
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/resources/env/env_streaming.json
----------------------------------------------------------------------
diff --git a/service/src/main/resources/env/env_streaming.json b/service/src/main/resources/env/env_streaming.json
index 5c4b190..6508af1 100644
--- a/service/src/main/resources/env/env_streaming.json
+++ b/service/src/main/resources/env/env_streaming.json
@@ -3,8 +3,8 @@
"log.level": "WARN",
"checkpoint.dir": "hdfs:///griffin/checkpoint/${JOB_NAME}",
"init.clear": true,
- "batch.interval": "30s",
- "process.interval": "3m",
+ "batch.interval": "1m",
+ "process.interval": "5m",
"config": {
"spark.default.parallelism": 4,
"spark.task.maxFailures": 5,
@@ -17,28 +17,30 @@
"spark.hadoop.fs.hdfs.impl.disable.cache": true
}
},
- "persist": [
+ "sinks": [
{
- "type": "log",
+ "type": "CONSOLE",
"config": {
- "max.log.lines": 2
+ "max.log.lines": 100
}
},
{
- "type": "hdfs",
+ "type": "HDFS",
"config": {
- "path": "hdfs:///griffin/persist"
+ "path": "hdfs:///griffin/persist",
+ "max.persist.lines": 10000,
+ "max.lines.per.file": 10000
}
},
{
- "type": "http",
+ "type": "ELASTICSEARCH",
"config": {
"method": "post",
"api": "http://es:9200/griffin/accuracy"
}
}
],
- "info.cache": [
+ "griffin.checkpoint": [
{
"type": "zk",
"config": {
@@ -50,7 +52,5 @@
"close.clear": false
}
}
- ],
- "cleaner": {
- }
+ ]
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java b/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java
index 0d144be..af981e3 100644
--- a/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java
+++ b/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java
@@ -21,7 +21,11 @@ public class FileExistPredicatorTest {
@BeforeClass
public static void mkFile() throws IOException {
+ File fileDirectory = new File(rootPath); // to fix createFileExclusively exception
File file = new File(rootPath + fileName);
+ if (!fileDirectory.exists()) {
+ fileDirectory.mkdir();
+ }
if (!file.exists()) {
file.createNewFile();
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
index aa80608..ef37f97 100644
--- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
+++ b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java
@@ -21,11 +21,12 @@ package org.apache.griffin.core.measure.repo;
import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
import java.util.List;
import org.apache.griffin.core.config.EclipseLinkJpaConfigForTest;
-import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.measure.entity.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -57,7 +58,21 @@ public class MeasureRepoTest {
public void testFindByNameAndDeleted() {
String name = "m1";
List<Measure> measures = measureRepo.findByNameAndDeleted(name, false);
- assertThat(measures.get(0).getName()).isEqualTo(name);
+ GriffinMeasure m = (GriffinMeasure) measures.get(0);
+
+ List<DataSource> sources = m.getDataSources();
+ DataConnector connector = sources.get(0).getConnectors().get(0);
+ Rule rule = m.getEvaluateRule().getRules().get(0);
+ assertEquals(m.getSinksList().size(), 2);
+ assertEquals(sources.get(0).isBaseline(), true);
+ assertEquals(sources.get(0).getCheckpointMap().size(), 1);
+ assertEquals(connector.getDataFrameName(), "kafka");
+ assertEquals(connector.getConfigMap().size(), 3);
+ assertEquals(rule.getDqType(), DqType.ACCURACY);
+ assertEquals(rule.getInDataFrameName(), "in");
+ assertEquals(rule.getOutDataFrameName(), "out");
+ assertEquals(rule.getDetailsMap().size(), 1);
+ assertEquals(rule.getOutList().size(), 2);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
----------------------------------------------------------------------
diff --git a/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java b/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
index e2254d1..4c28ea7 100644
--- a/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
+++ b/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java
@@ -43,12 +43,7 @@ import org.apache.griffin.core.job.entity.LivySessionStates;
import org.apache.griffin.core.job.entity.SegmentPredicate;
import org.apache.griffin.core.job.entity.SegmentRange;
import org.apache.griffin.core.job.entity.VirtualJob;
-import org.apache.griffin.core.measure.entity.DataConnector;
-import org.apache.griffin.core.measure.entity.DataSource;
-import org.apache.griffin.core.measure.entity.EvaluateRule;
-import org.apache.griffin.core.measure.entity.ExternalMeasure;
-import org.apache.griffin.core.measure.entity.GriffinMeasure;
-import org.apache.griffin.core.measure.entity.Rule;
+import org.apache.griffin.core.measure.entity.*;
import org.quartz.JobDataMap;
import org.quartz.JobKey;
import org.quartz.SimpleTrigger;
@@ -81,17 +76,34 @@ public class EntityHelper {
public static GriffinMeasure createGriffinMeasure(String name,
DataConnector dcSource,
DataConnector dcTarget) throws Exception {
- DataSource dataSource = new DataSource("source", Arrays.asList(dcSource));
- DataSource targetSource = new DataSource("target", Arrays.asList(dcTarget));
+ DataSource dataSource = new DataSource("source", true, createCheckpointMap(), Arrays.asList(dcSource));
+ DataSource targetSource = new DataSource("target", false, createCheckpointMap(), Arrays.asList(dcTarget));
List<DataSource> dataSources = new ArrayList<>();
dataSources.add(dataSource);
dataSources.add(targetSource);
- String rules = "source.id=target.id AND source.name=target.name AND source.age=target.age";
- Map<String, Object> map = new HashMap<>();
- map.put("detail", "detail info");
- Rule rule = new Rule("griffin-dsl", "ACCURACY", rules, map);
+ Rule rule = createRule();
EvaluateRule evaluateRule = new EvaluateRule(Arrays.asList(rule));
- return new GriffinMeasure(name, "test", dataSources, evaluateRule);
+ return new GriffinMeasure(name, "test", dataSources, evaluateRule, Arrays.asList("ELASTICSEARCH", "HDFS"));
+ }
+
+ private static Rule createRule() throws JsonProcessingException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("detail", "detail");
+ String rule = "source.id=target.id AND source.name=target.name AND source.age=target.age";
+ Map<String, Object> metricMap = new HashMap<>();
+ Map<String, Object> recordMap = new HashMap<>();
+ metricMap.put("type", "metric");
+ metricMap.put("name", "accu");
+ recordMap.put("type", "record");
+ recordMap.put("name", "missRecords");
+ List<Map<String, Object>> outList = Arrays.asList(metricMap, recordMap);
+ return new Rule("griffin-dsl", DqType.ACCURACY, rule, "in", "out", map, outList);
+ }
+
+ private static Map<String, Object> createCheckpointMap() {
+ Map<String, Object> map = new HashMap<>();
+ map.put("info.path", "source");
+ return map;
}
public static DataConnector createDataConnector(String name,
@@ -102,14 +114,14 @@ public class EntityHelper {
config.put("database", database);
config.put("table.name", table);
config.put("where", where);
- return new DataConnector(name, "1h", config, null);
+ return new DataConnector(name, DataConnector.DataType.HIVE, "1.2", JsonUtil.toJson(config), "kafka");
}
public static DataConnector createDataConnector(String name,
String database,
String table,
String where,
- SegmentPredicate predicate) throws IOException {
+ SegmentPredicate predicate) {
HashMap<String, String> config = new HashMap<>();
config.put("database", database);
config.put("table.name", table);