You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/09 09:43:31 UTC
[04/11] incubator-griffin git commit: upgrade new version
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
new file mode 100644
index 0000000..131fe03
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
@@ -0,0 +1,188 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+import org.apache.griffin.core.util.JsonUtil;
+import org.apache.griffin.core.util.PropertiesUtil;
+import org.quartz.CronExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Configurable;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.stereotype.Component;
+
+import javax.persistence.*;
+import javax.validation.constraints.NotNull;
+import java.io.IOException;
+import java.util.*;
+
+@Configurable(preConstruction = true)
+@Component
+@Entity
+public class JobSchedule extends AbstractAuditableEntity {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(JobSchedule.class);
+
+ @NotNull
+ private Long measureId;
+
+ @NotNull
+ private String jobName;
+
+ @NotNull
+ private String cronExpression;
+
+ @NotNull
+ private String timeZone;
+
+ @JsonIgnore
+ @Access(AccessType.PROPERTY)
+ private String predicateConfig;
+
+ @Transient
+ private Map<String, Object> configMap = defaultPredicatesConfig();
+
+ @NotNull
+ @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+ @JoinColumn(name = "job_schedule_id")
+ private List<JobDataSegment> segments = new ArrayList<>();
+
+ @JsonProperty("measure.id")
+ public Long getMeasureId() {
+ return measureId;
+ }
+
+ @JsonProperty("measure.id")
+ public void setMeasureId(Long measureId) {
+ this.measureId = measureId;
+ }
+
+ @JsonProperty("job.name")
+ public String getJobName() {
+ return jobName;
+ }
+
+ @JsonProperty("job.name")
+ public void setJobName(String jobName) {
+ if (StringUtils.isEmpty(jobName)) {
+ LOGGER.error("Job name cannot be empty.");
+ throw new NullPointerException();
+ }
+ this.jobName = jobName;
+ }
+
+ @JsonProperty("cron.expression")
+ public String getCronExpression() {
+ return cronExpression;
+ }
+
+ @JsonProperty("cron.expression")
+ public void setCronExpression(String cronExpression) {
+ if (StringUtils.isEmpty(cronExpression) || !isCronExpressionValid(cronExpression)) {
+ LOGGER.error("Cron expression is invalid.Please check your cron expression.");
+ throw new IllegalArgumentException();
+ }
+ this.cronExpression = cronExpression;
+ }
+
+ @JsonProperty("cron.time.zone")
+ public String getTimeZone() {
+ return timeZone;
+ }
+
+ @JsonProperty("cron.time.zone")
+ public void setTimeZone(String timeZone) {
+ this.timeZone = timeZone;
+ }
+
+ @JsonProperty("data.segments")
+ public List<JobDataSegment> getSegments() {
+ return segments;
+ }
+
+ @JsonProperty("data.segments")
+ public void setSegments(List<JobDataSegment> segments) {
+ this.segments = segments;
+ }
+
+ private String getPredicateConfig() {
+ return predicateConfig;
+ }
+
+ private void setPredicateConfig(String config) throws IOException {
+ this.predicateConfig = config;
+ this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() {
+ });
+ }
+
+ @JsonProperty("predicate.config")
+ public Map<String, Object> getConfigMap() throws IOException {
+ return configMap;
+ }
+
+ @JsonProperty("predicate.config")
+ private void setConfigMap(Map<String, Object> configMap) throws JsonProcessingException {
+ this.configMap = configMap;
+ this.predicateConfig = JsonUtil.toJson(configMap);
+ }
+
+ /**
+ * @return set default predicate config
+ * @throws JsonProcessingException json exception
+ */
+ private Map<String, Object> defaultPredicatesConfig() throws JsonProcessingException {
+ String path = "/application.properties";
+ Properties appConf = PropertiesUtil.getProperties(path,new ClassPathResource(path));
+ Map<String, Object> scheduleConf = new HashMap<>();
+ Map<String, Object> map = new HashMap<>();
+ map.put("interval", appConf.getProperty("predicate.job.interval"));
+ map.put("repeat", appConf.getProperty("predicate.job.repeat.count"));
+ scheduleConf.put("checkdonefile.schedule", map);
+ setConfigMap(scheduleConf);
+ return scheduleConf;
+ }
+
+ private boolean isCronExpressionValid(String cronExpression) {
+ if (!CronExpression.isValidExpression(cronExpression)) {
+ LOGGER.error("Cron expression {} is invalid.", cronExpression);
+ return false;
+ }
+ return true;
+ }
+
+ public JobSchedule() throws JsonProcessingException {
+ }
+
+ public JobSchedule(Long measureId, String jobName, String cronExpression, Map configMap, List<JobDataSegment> segments) throws JsonProcessingException {
+ this.measureId = measureId;
+ this.jobName = jobName;
+ this.cronExpression = cronExpression;
+ setConfigMap(configMap);
+ this.segments = segments;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java
new file mode 100644
index 0000000..208fa8c
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class LivyConf implements Serializable {
+
+ private String file;
+
+ private String className;
+
+ private List<String> args;
+
+ private String name;
+
+ private String queue;
+
+ private Long numExecutors;
+
+ private Long executorCores;
+
+ private String driverMemory;
+
+ private String executorMemory;
+
+ private Map<String, String> conf;
+
+ private List<String> jars;
+
+ private List<String> files;
+
+ public String getFile() {
+ return file;
+ }
+
+ public void setFile(String file) {
+ this.file = file;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
+
+ public List<String> getArgs() {
+ return args;
+ }
+
+ public void setArgs(List<String> args) {
+ this.args = args;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ public Long getNumExecutors() {
+ return numExecutors;
+ }
+
+ public void setNumExecutors(Long numExecutors) {
+ this.numExecutors = numExecutors;
+ }
+
+ public Long getExecutorCores() {
+ return executorCores;
+ }
+
+ public void setExecutorCores(Long executorCores) {
+ this.executorCores = executorCores;
+ }
+
+ public String getDriverMemory() {
+ return driverMemory;
+ }
+
+ public void setDriverMemory(String driverMemory) {
+ this.driverMemory = driverMemory;
+ }
+
+ public String getExecutorMemory() {
+ return executorMemory;
+ }
+
+ public void setExecutorMemory(String executorMemory) {
+ this.executorMemory = executorMemory;
+ }
+
+ public Map<String, String> getConf() {
+ return conf;
+ }
+
+ public void setConf(Map<String, String> conf) {
+ this.conf = conf;
+ }
+
+ public List<String> getJars() {
+ return jars;
+ }
+
+ public void setJars(List<String> jars) {
+ this.jars = jars;
+ }
+
+ public List<String> getFiles() {
+ return files;
+ }
+
+ public void setFiles(List<String> files) {
+ this.files = files;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
index 773bd98..01e5070 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
@@ -38,10 +38,16 @@ public class LivySessionStates {
error,
dead,
success,
- unknown
+ unknown,
+ finding,
+ not_found,
+ found
}
- public static SessionState toSessionState(State state) {
+ private static SessionState toSessionState(State state) {
+ if (state == null) {
+ return null;
+ }
switch (state) {
case not_started:
return new SessionState.NotStarted();
@@ -69,22 +75,17 @@ public class LivySessionStates {
}
public static boolean isActive(State state) {
- if (State.unknown.equals(state)) {
- // set unknown isactive() as false.
+ if (State.unknown.equals(state) || State.finding.equals(state) || State.not_found.equals(state) || State.found.equals(state)) {
+ // set unknown isActive() as false.
return false;
}
SessionState sessionState = toSessionState(state);
- if (sessionState == null) {
- return false;
- } else {
- return sessionState.isActive();
- }
+ return sessionState != null && sessionState.isActive();
}
public static boolean isHealthy(State state) {
- if (State.error.equals(state) || State.dead.equals(state) || State.shutting_down.equals(state)) {
- return false;
- }
- return true;
+ return !(State.error.equals(state) || State.dead.equals(state) ||
+ State.shutting_down.equals(state) || State.finding.equals(state) ||
+ State.not_found.equals(state) || State.found.equals(state));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
new file mode 100644
index 0000000..0f5a624
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+
+package org.apache.griffin.core.job.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+import org.apache.griffin.core.util.JsonUtil;
+
+import javax.persistence.Access;
+import javax.persistence.AccessType;
+import javax.persistence.Entity;
+import javax.persistence.Transient;
+import java.io.IOException;
+import java.util.Map;
+
+@Entity
+public class SegmentPredicate extends AbstractAuditableEntity {
+
+ private String type;
+
+ @JsonIgnore
+ @Access(AccessType.PROPERTY)
+ private String config;
+
+ @Transient
+ private Map<String, String> configMap;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getConfig() {
+ return config;
+ }
+
+ public void setConfig(String config) throws IOException {
+ this.config = config;
+ this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {
+ });
+ }
+
+ @JsonProperty("config")
+ public Map<String, String> getConfigMap() throws IOException {
+ return configMap;
+ }
+
+ @JsonProperty("config")
+ public void setConfigMap(Map<String, String> configMap) throws JsonProcessingException {
+ this.configMap = configMap;
+ this.config = JsonUtil.toJson(configMap);
+ }
+
+ public SegmentPredicate() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
new file mode 100644
index 0000000..b8ca5cf
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+
+package org.apache.griffin.core.job.entity;
+
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+
+@Entity
+public class SegmentRange extends AbstractAuditableEntity {
+
+ @Column(name = "data_begin")
+ private String begin = "1h";
+
+ private String length = "1h";
+
+
+ public String getBegin() {
+ return begin;
+ }
+
+ public void setBegin(String begin) {
+ this.begin = begin;
+ }
+
+ public String getLength() {
+ return length;
+ }
+
+ public void setLength(String length) {
+ this.length = length;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java b/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java
deleted file mode 100644
index b5925f6..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job.entity;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public class SparkJobDO implements Serializable {
-
- private String file;
-
- private String className;
-
- private List<String> args;
-
- private String name;
-
- private String queue;
-
- private Long numExecutors;
-
- private Long executorCores;
-
- private String driverMemory;
-
- private String executorMemory;
-
- private Map<String, String> conf;
-
- private List<String> jars;
-
- private List<String> files;
-
- public String getFile() {
- return file;
- }
-
- public void setFile(String file) {
- this.file = file;
- }
-
- public String getClassName() {
- return className;
- }
-
- public void setClassName(String className) {
- this.className = className;
- }
-
- public List<String> getArgs() {
- return args;
- }
-
- public void setArgs(List<String> args) {
- this.args = args;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getQueue() {
- return queue;
- }
-
- public void setQueue(String queue) {
- this.queue = queue;
- }
-
- public Long getNumExecutors() {
- return numExecutors;
- }
-
- public void setNumExecutors(Long numExecutors) {
- this.numExecutors = numExecutors;
- }
-
- public Long getExecutorCores() {
- return executorCores;
- }
-
- public void setExecutorCores(Long executorCores) {
- this.executorCores = executorCores;
- }
-
- public String getDriverMemory() {
- return driverMemory;
- }
-
- public void setDriverMemory(String driverMemory) {
- this.driverMemory = driverMemory;
- }
-
- public String getExecutorMemory() {
- return executorMemory;
- }
-
- public void setExecutorMemory(String executorMemory) {
- this.executorMemory = executorMemory;
- }
-
- public Map<String, String> getConf() {
- return conf;
- }
-
- public void setConf(Map<String, String> conf) {
- this.conf = conf;
- }
-
- public List<String> getJars() {
- return jars;
- }
-
- public void setJars(List<String> jars) {
- this.jars = jars;
- }
-
- public List<String> getFiles() {
- return files;
- }
-
- public void setFiles(List<String> files) {
- this.files = files;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
new file mode 100644
index 0000000..ad98603
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import javax.persistence.Entity;
+
+@Entity
+public class VirtualJob extends AbstractJob {
+
+ public VirtualJob() {
+ super();
+ }
+
+ public VirtualJob(String jobName, Long measureId, String metricName) {
+ super(jobName, measureId, metricName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
new file mode 100644
index 0000000..cc2ff15
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.factory;
+
+import org.quartz.spi.TriggerFiredBundle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.scheduling.quartz.SpringBeanJobFactory;
+
+public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory
+ implements ApplicationContextAware {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AutowiringSpringBeanJobFactory.class);
+
+ private transient AutowireCapableBeanFactory beanFactory;
+
+ @Override
+ public void setApplicationContext(final ApplicationContext context) {
+ beanFactory = context.getAutowireCapableBeanFactory();
+ }
+
+ @Override
+ protected Object createJobInstance(final TriggerFiredBundle bundle) {
+
+ try {
+ final Object job = super.createJobInstance(bundle);
+ beanFactory.autowireBean(job);
+ return job;
+
+ } catch (Exception e) {
+ LOGGER.error("fail to create job instance. {}", e.getMessage());
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
new file mode 100644
index 0000000..8af39f4
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.factory;
+
+import org.apache.griffin.core.job.FileExistPredicator;
+import org.apache.griffin.core.job.Predicator;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+
+public class PredicatorFactory {
+ public static Predicator newPredicateInstance(SegmentPredicate segPredicate) {
+ Predicator predicate = null;
+ switch (segPredicate.getType()) {
+ case "file.exist":
+ predicate = new FileExistPredicator(segPredicate);
+ break;
+ default:
+ break;
+ }
+ return predicate;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java
new file mode 100644
index 0000000..aaaa77d
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.GriffinJob;
+
+public interface GriffinJobRepo extends JobRepo<GriffinJob> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java
new file mode 100644
index 0000000..48dd3b4
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.JobDataSegment;
+import org.springframework.data.repository.CrudRepository;
+
+public interface JobDataSegmentRepo extends CrudRepository<JobDataSegment, Long> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
index 610d282..1714789 100644
--- a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
@@ -18,45 +18,31 @@ under the License.
*/
package org.apache.griffin.core.job.repo;
-import org.apache.griffin.core.job.entity.JobInstance;
-import org.apache.griffin.core.job.entity.LivySessionStates;
+import org.apache.griffin.core.job.entity.JobInstanceBean;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
-import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
import java.util.List;
+public interface JobInstanceRepo extends CrudRepository<JobInstanceBean, Long> {
-@Repository
-public interface JobInstanceRepo extends CrudRepository<JobInstance, Long> {
- /**
- * @param group is group name
- * @param name is job name
- * @param pageable page info
- * @return all job instances scheduled at different time using the same prototype job,
- * the prototype job is determined by SCHED_NAME, group name and job name in table QRTZ_JOB_DETAILS.
- */
- @Query("select s from JobInstance s " +
- "where s.groupName= ?1 and s.jobName=?2 ")
- List<JobInstance> findByGroupNameAndJobName(String group, String name, Pageable pageable);
+ @Query("select DISTINCT s from JobInstanceBean s " +
+ "where s.state in ('starting', 'not_started', 'recovering', 'idle', 'running', 'busy')")
+ List<JobInstanceBean> findByActiveState();
- @Query("select s from JobInstance s " +
- "where s.groupName= ?1 and s.jobName=?2 ")
- List<JobInstance> findByGroupNameAndJobName(String group, String name);
+ JobInstanceBean findByPredicateName(String name);
- @Query("select DISTINCT s.groupName, s.jobName from JobInstance s")
- List<Object> findGroupWithJobName();
+ @Query("select s from JobInstanceBean s where job_id = ?1")
+ List<JobInstanceBean> findByJobId(Long jobId, Pageable pageable);
- @Modifying
- @Query("delete from JobInstance s " +
- "where s.groupName= ?1 and s.jobName=?2 ")
- void deleteByGroupAndJobName(String groupName, String jobName);
+ List<JobInstanceBean> findByExpireTmsLessThanEqual(Long expireTms);
+ @Transactional
@Modifying
- @Query("update JobInstance s " +
- "set s.state= ?2, s.appId= ?3, s.appUri= ?4 where s.id= ?1")
- void update(Long id, LivySessionStates.State state, String appId, String appUri);
+ @Query("delete from JobInstanceBean j where j.expireTms <= ?1")
+ int deleteByExpireTimestamp(Long expireTms);
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
new file mode 100644
index 0000000..a3fcce3
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.AbstractJob;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+
+import java.util.List;
+
+public interface JobRepo<T extends AbstractJob> extends CrudRepository<T, Long> {
+
+ @Query("select count(j) from #{#entityName} j where j.jobName = ?1 and j.deleted = ?2")
+ int countByJobNameAndDeleted(String jobName, Boolean deleted);
+
+ List<T> findByDeleted(boolean deleted);
+
+ List<T> findByJobNameAndDeleted(String jobName, boolean deleted);
+
+ List<T> findByMeasureIdAndDeleted(Long measureId, boolean deleted);
+
+ T findByIdAndDeleted(Long jobId, boolean deleted);
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
new file mode 100644
index 0000000..1b360e4
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.JobSchedule;
+import org.springframework.data.repository.CrudRepository;
+
+public interface JobScheduleRepo extends CrudRepository<JobSchedule, Long> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java
new file mode 100644
index 0000000..914a1ff
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.VirtualJob;
+
+public interface VirtualJobRepo extends JobRepo<VirtualJob> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
new file mode 100644
index 0000000..f38982a
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
@@ -0,0 +1,102 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.job.entity.VirtualJob;
+import org.apache.griffin.core.job.repo.VirtualJobRepo;
+import org.apache.griffin.core.measure.entity.ExternalMeasure;
+import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.measure.repo.ExternalMeasureRepo;
+import org.apache.griffin.core.util.GriffinOperationMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component("externalOperation")
+public class ExternalMeasureOperationImpl implements MeasureOperation {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExternalMeasureOperationImpl.class);
+
+ @Autowired
+ private ExternalMeasureRepo measureRepo;
+ @Autowired
+ private VirtualJobRepo jobRepo;
+
+ @Override
+ public GriffinOperationMessage create(Measure measure) {
+ ExternalMeasure em = (ExternalMeasure) measure;
+ if (StringUtils.isBlank(em.getMetricName())) {
+ LOGGER.error("Failed to create external measure {}. Its metric name is blank.", measure.getName());
+ return GriffinOperationMessage.CREATE_MEASURE_FAIL;
+ }
+ try {
+ em.setVirtualJob(new VirtualJob());
+ em = measureRepo.save(em);
+ VirtualJob vj = genVirtualJob(em, em.getVirtualJob());
+ jobRepo.save(vj);
+ return GriffinOperationMessage.CREATE_MEASURE_SUCCESS;
+ } catch (Exception e) {
+ LOGGER.error("Failed to create new measure {}.{}", em.getName(), e.getMessage());
+ }
+ return GriffinOperationMessage.CREATE_MEASURE_FAIL;
+ }
+
+ @Override
+ public GriffinOperationMessage update(Measure measure) {
+ ExternalMeasure latestMeasure = (ExternalMeasure) measure;
+ if (StringUtils.isBlank(latestMeasure.getMetricName())) {
+ LOGGER.error("Failed to create external measure {}. Its metric name is blank.", measure.getName());
+ return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+ }
+ try {
+ ExternalMeasure originMeasure = measureRepo.findOne(latestMeasure.getId());
+ VirtualJob vj = genVirtualJob(latestMeasure, originMeasure.getVirtualJob());
+ latestMeasure.setVirtualJob(vj);
+ measureRepo.save(latestMeasure);
+ return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS;
+ } catch (Exception e) {
+ LOGGER.error("Failed to update measure. {}", e.getMessage());
+ }
+ return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+ }
+
+ @Override
+ public Boolean delete(Measure measure) {
+ try {
+ ExternalMeasure em = (ExternalMeasure) measure;
+ em.setDeleted(true);
+ em.getVirtualJob().setDeleted(true);
+ measureRepo.save(em);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("Failed to delete measure. {}", e.getMessage());
+ }
+ return false;
+
+ }
+
+ private VirtualJob genVirtualJob(ExternalMeasure em, VirtualJob vj) {
+ vj.setMeasureId(em.getId());
+ vj.setJobName(em.getName());
+ vj.setMetricName(em.getMetricName());
+ return vj;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
new file mode 100644
index 0000000..88c5409
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.job.JobServiceImpl;
+import org.apache.griffin.core.measure.entity.DataConnector;
+import org.apache.griffin.core.measure.entity.DataSource;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.measure.repo.DataConnectorRepo;
+import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.apache.griffin.core.util.GriffinOperationMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Component("griffinOperation")
+public class GriffinMeasureOperationImpl implements MeasureOperation {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GriffinMeasureOperationImpl.class);
+
+ @Autowired
+ private MeasureRepo<Measure> measureRepo;
+ @Autowired
+ private DataConnectorRepo dcRepo;
+ @Autowired
+ private JobServiceImpl jobService;
+
+
+ @Override
+ public GriffinOperationMessage create(Measure measure) {
+ if (!isConnectorNamesValid((GriffinMeasure) measure)) {
+ return GriffinOperationMessage.CREATE_MEASURE_FAIL;
+ }
+ try {
+ measureRepo.save(measure);
+ return GriffinOperationMessage.CREATE_MEASURE_SUCCESS;
+ } catch (Exception e) {
+ LOGGER.error("Failed to create new measure {}.", measure.getName(), e);
+ }
+ return GriffinOperationMessage.CREATE_MEASURE_FAIL;
+ }
+
+ @Override
+ public GriffinOperationMessage update(Measure measure) {
+ try {
+ measureRepo.save(measure);
+ return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS;
+ } catch (Exception e) {
+ LOGGER.error("Failed to update measure. {}", e.getMessage());
+ }
+ return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+ }
+
+ @Override
+ public Boolean delete(Measure measure) {
+ boolean pauseStatus = jobService.deleteJobsRelateToMeasure(measure.getId());
+ if (!pauseStatus) {
+ return false;
+ }
+ measure.setDeleted(true);
+ measureRepo.save(measure);
+ return true;
+ }
+
+ private boolean isConnectorNamesValid(GriffinMeasure measure) {
+ List<String> names = getConnectorNames(measure);
+ if (names.size() == 0) {
+ LOGGER.warn("Connector names cannot be empty.");
+ return false;
+ }
+ List<DataConnector> connectors = dcRepo.findByConnectorNames(names);
+ if (!CollectionUtils.isEmpty(connectors)) {
+ LOGGER.warn("Failed to create new measure {}. It's connector names already exist. ", measure.getName());
+ return false;
+ }
+ return true;
+ }
+
+ private List<String> getConnectorNames(GriffinMeasure measure) {
+ List<String> names = new ArrayList<>();
+ for (DataSource source : measure.getDataSources()) {
+ for (DataConnector dc : source.getConnectors()) {
+ String name = dc.getName();
+ if (!StringUtils.isEmpty(name)) {
+ names.add(name);
+ }
+ }
+ }
+ return names;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
index fae0169..3b557ca 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
@@ -37,17 +37,17 @@ public class MeasureController {
return measureService.getAllAliveMeasures();
}
- @RequestMapping(value = "/measure/{id}", method = RequestMethod.GET)
+ @RequestMapping(value = "/measures/{id}", method = RequestMethod.GET)
public Measure getMeasureById(@PathVariable("id") long id) {
return measureService.getMeasureById(id);
}
- @RequestMapping(value = "/measure/{id}", method = RequestMethod.DELETE)
+ @RequestMapping(value = "/measures/{id}", method = RequestMethod.DELETE)
public GriffinOperationMessage deleteMeasureById(@PathVariable("id") Long id) {
return measureService.deleteMeasureById(id);
}
- @RequestMapping(value = "/measure", method = RequestMethod.PUT)
+ @RequestMapping(value = "/measures", method = RequestMethod.PUT)
public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) {
return measureService.updateMeasure(measure);
}
@@ -57,7 +57,7 @@ public class MeasureController {
return measureService.getAliveMeasuresByOwner(owner);
}
- @RequestMapping(value = "/measure", method = RequestMethod.POST)
+ @RequestMapping(value = "/measures", method = RequestMethod.POST)
public GriffinOperationMessage createMeasure(@RequestBody Measure measure) {
return measureService.createMeasure(measure);
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
new file mode 100644
index 0000000..80f1f30
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+
+import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.util.GriffinOperationMessage;
+
+public interface MeasureOperation {
+
+ GriffinOperationMessage create(Measure measure);
+
+ GriffinOperationMessage update(Measure measure);
+
+ Boolean delete(Measure measure);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java
index abe36d9..499ee8e 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java
@@ -19,14 +19,12 @@ under the License.
package org.apache.griffin.core.measure;
-import org.apache.griffin.core.job.JobService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
-import java.io.Serializable;
import java.util.List;
import java.util.Map;
@@ -37,9 +35,6 @@ public class MeasureOrgController {
@Autowired
private MeasureOrgService measureOrgService;
- @Autowired
- private JobService jobService;
-
@RequestMapping(value = "/org", method = RequestMethod.GET)
public List<String> getOrgs() {
return measureOrgService.getOrgs();
@@ -59,9 +54,4 @@ public class MeasureOrgController {
public Map<String, List<String>> getMeasureNamesGroupByOrg() {
return measureOrgService.getMeasureNamesGroupByOrg();
}
-
- @RequestMapping(value = "/org/measure/jobs", method = RequestMethod.GET)
- public Map<String, Map<String, List<Map<String, Serializable>>>> getMeasureWithJobsGroupByOrg() {
- return measureOrgService.getMeasureWithJobDetailsGroupByOrg(jobService.getJobDetailsGroupByMeasureId());
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
index f45c636..754f3d1 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
@@ -31,5 +31,5 @@ public interface MeasureOrgService {
Map<String, List<String>> getMeasureNamesGroupByOrg();
- Map<String, Map<String, List<Map<String, Serializable>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Serializable>>> jobDetailsGroupByMeasure);
+ Map<String, Map<String, List<Map<String, Object>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Object>>> jobDetailsGroupByMeasure);
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
index d4cb6a9..1d64830 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
@@ -19,12 +19,12 @@ under the License.
package org.apache.griffin.core.measure;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.apache.griffin.core.measure.entity.Measure;
-import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -34,7 +34,7 @@ import java.util.Map;
public class MeasureOrgServiceImpl implements MeasureOrgService {
@Autowired
- private MeasureRepo measureRepo;
+ private GriffinMeasureRepo measureRepo;
@Override
public List<String> getOrgs() {
@@ -43,20 +43,20 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
@Override
public List<String> getMetricNameListByOrg(String org) {
- return measureRepo.findNameByOrganization(org,false);
+ return measureRepo.findNameByOrganization(org, false);
}
@Override
public Map<String, List<String>> getMeasureNamesGroupByOrg() {
Map<String, List<String>> orgWithMetricsMap = new HashMap<>();
- List<Measure> measures = measureRepo.findByDeleted(false);
+ List<GriffinMeasure> measures = measureRepo.findByDeleted(false);
if (measures == null) {
return null;
}
for (Measure measure : measures) {
String orgName = measure.getOrganization();
String measureName = measure.getName();
- List<String> measureList = orgWithMetricsMap.getOrDefault(orgName, new ArrayList<String>());
+ List<String> measureList = orgWithMetricsMap.getOrDefault(orgName, new ArrayList<>());
measureList.add(measureName);
orgWithMetricsMap.put(orgName, measureList);
}
@@ -64,9 +64,9 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
}
@Override
- public Map<String, Map<String, List<Map<String, Serializable>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Serializable>>> jobDetails) {
- Map<String, Map<String, List<Map<String, Serializable>>>> result = new HashMap<>();
- List<Measure> measures = measureRepo.findByDeleted(false);
+ public Map<String, Map<String, List<Map<String, Object>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Object>>> jobDetails) {
+ Map<String, Map<String, List<Map<String, Object>>>> result = new HashMap<>();
+ List<GriffinMeasure> measures = measureRepo.findByDeleted(false);
if (measures == null) {
return null;
}
@@ -74,8 +74,8 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
String orgName = measure.getOrganization();
String measureName = measure.getName();
String measureId = measure.getId().toString();
- List<Map<String, Serializable>> jobList = jobDetails.getOrDefault(measureId, new ArrayList<>());
- Map<String, List<Map<String, Serializable>>> measureWithJobs = result.getOrDefault(orgName, new HashMap<>());
+ List<Map<String, Object>> jobList = jobDetails.getOrDefault(measureId, new ArrayList<>());
+ Map<String, List<Map<String, Object>>> measureWithJobs = result.getOrDefault(orgName, new HashMap<>());
measureWithJobs.put(measureName, jobList);
result.put(orgName, measureWithJobs);
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
index 0e20b4f..a330d0a 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
@@ -24,11 +24,10 @@ import org.apache.griffin.core.measure.entity.Measure;
import org.apache.griffin.core.util.GriffinOperationMessage;
import java.util.List;
-import java.util.Map;
public interface MeasureService {
- Iterable<Measure> getAllAliveMeasures();
+ List<Measure> getAllAliveMeasures();
Measure getMeasureById(long id);
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
index 8c088c8..ecb9fdd 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
@@ -20,17 +20,16 @@ under the License.
package org.apache.griffin.core.measure;
-import org.apache.griffin.core.job.JobServiceImpl;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.apache.griffin.core.measure.entity.Measure;
import org.apache.griffin.core.measure.repo.MeasureRepo;
import org.apache.griffin.core.util.GriffinOperationMessage;
-import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.util.CollectionUtils;
import java.util.List;
@@ -39,79 +38,76 @@ public class MeasureServiceImpl implements MeasureService {
private static final Logger LOGGER = LoggerFactory.getLogger(MeasureServiceImpl.class);
@Autowired
- private JobServiceImpl jobService;
+ private MeasureRepo<Measure> measureRepo;
@Autowired
- private MeasureRepo measureRepo;
+ @Qualifier("griffinOperation")
+ private MeasureOperation griffinOp;
+ @Autowired
+ @Qualifier("externalOperation")
+ private MeasureOperation externalOp;
@Override
- public Iterable<Measure> getAllAliveMeasures() {
+ public List<Measure> getAllAliveMeasures() {
return measureRepo.findByDeleted(false);
}
@Override
- public Measure getMeasureById(@PathVariable("id") long id) {
+ public Measure getMeasureById(long id) {
return measureRepo.findByIdAndDeleted(id, false);
}
@Override
- public GriffinOperationMessage deleteMeasureById(Long measureId) {
- if (!measureRepo.exists(measureId)) {
- return GriffinOperationMessage.RESOURCE_NOT_FOUND;
- } else {
- Measure measure = measureRepo.findOne(measureId);
- try {
- //pause all jobs related to the measure
- jobService.deleteJobsRelateToMeasure(measure);
- measure.setDeleted(true);
- measureRepo.save(measure);
- } catch (SchedulerException e) {
- LOGGER.error("Delete measure id: {} name: {} failure. {}", measure.getId(), measure.getName(), e.getMessage());
- return GriffinOperationMessage.DELETE_MEASURE_BY_ID_FAIL;
- }
-
- return GriffinOperationMessage.DELETE_MEASURE_BY_ID_SUCCESS;
- }
+ public List<Measure> getAliveMeasuresByOwner(String owner) {
+ return measureRepo.findByOwnerAndDeleted(owner, false);
}
@Override
public GriffinOperationMessage createMeasure(Measure measure) {
List<Measure> aliveMeasureList = measureRepo.findByNameAndDeleted(measure.getName(), false);
- if (aliveMeasureList.size() == 0) {
- try {
- if (measureRepo.save(measure) != null) {
- return GriffinOperationMessage.CREATE_MEASURE_SUCCESS;
- } else {
- return GriffinOperationMessage.CREATE_MEASURE_FAIL;
- }
- } catch (Exception e) {
- LOGGER.info("Failed to create new measure {}.{}", measure.getName(), e.getMessage());
- return GriffinOperationMessage.CREATE_MEASURE_FAIL;
- }
-
- } else {
- LOGGER.info("Failed to create new measure {}, it already exists.", measure.getName());
+ if (!CollectionUtils.isEmpty(aliveMeasureList)) {
+ LOGGER.warn("Failed to create new measure {}, it already exists.", measure.getName());
return GriffinOperationMessage.CREATE_MEASURE_FAIL_DUPLICATE;
}
+ MeasureOperation op = getOperation(measure);
+ return op.create(measure);
}
@Override
- public List<Measure> getAliveMeasuresByOwner(String owner) {
- return measureRepo.findByOwnerAndDeleted(owner, false);
+ public GriffinOperationMessage updateMeasure(Measure measure) {
+ Measure m = measureRepo.findByIdAndDeleted(measure.getId(), false);
+ if (m == null) {
+ return GriffinOperationMessage.RESOURCE_NOT_FOUND;
+ }
+ if (!m.getType().equals(measure.getType())) {
+ LOGGER.error("Can't update measure to different type.");
+ return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+ }
+ MeasureOperation op = getOperation(measure);
+ return op.update(measure);
}
@Override
- public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) {
- if (measureRepo.findByIdAndDeleted(measure.getId(), false) == null) {
+ public GriffinOperationMessage deleteMeasureById(Long measureId) {
+ Measure measure = measureRepo.findByIdAndDeleted(measureId, false);
+ if (measure == null) {
return GriffinOperationMessage.RESOURCE_NOT_FOUND;
- } else {
- try {
- measureRepo.save(measure);
- } catch (Exception e) {
- LOGGER.error("Failed to update measure. {}", e.getMessage());
- return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+ }
+ try {
+ MeasureOperation op = getOperation(measure);
+ if (op.delete(measure)) {
+ return GriffinOperationMessage.DELETE_MEASURE_BY_ID_SUCCESS;
}
+ } catch (Exception e) {
+ LOGGER.error("Delete measure id: {} name: {} failure. {}", measure.getId(), measure.getName(), e.getMessage());
+ }
+ return GriffinOperationMessage.DELETE_MEASURE_BY_ID_FAIL;
+ }
- return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS;
+ private MeasureOperation getOperation(Measure measure) {
+ if (measure instanceof GriffinMeasure) {
+ return griffinOp;
}
+ return externalOp;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
index a36c240..3c4abf5 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
@@ -20,16 +20,21 @@ under the License.
package org.apache.griffin.core.measure.entity;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
import org.apache.griffin.core.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
-import javax.persistence.Entity;
-import javax.persistence.Transient;
+import javax.persistence.*;
+import javax.validation.constraints.NotNull;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
@Entity
@@ -38,30 +43,84 @@ public class DataConnector extends AbstractAuditableEntity {
private final static Logger LOGGER = LoggerFactory.getLogger(DataConnector.class);
+ @NotNull
+ private String name;
+
private String type;
private String version;
- private String config;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private String dataUnit;
@JsonIgnore
@Transient
- private Map<String, String> configInMaps;
+ private String defaultDataUnit = "365000d";
+
+ @JsonIgnore
+ @Access(AccessType.PROPERTY)
+ private String config;
+
+ @Transient
+ private Map<String, String> configMap;
+
+ @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+ @JoinColumn(name = "data_connector_id")
+ private List<SegmentPredicate> predicates = new ArrayList<>();
+
+ public List<SegmentPredicate> getPredicates() {
+ return predicates;
+ }
+
+ public void setPredicates(List<SegmentPredicate> predicates) {
+ this.predicates = predicates;
+ }
+
+ @JsonProperty("config")
+ public Map<String, String> getConfigMap() throws IOException {
+ return configMap;
+ }
+
+ @JsonProperty("config")
+ public void setConfigMap(Map<String, String> configMap) throws JsonProcessingException {
+ this.configMap = configMap;
+ this.config = JsonUtil.toJson(configMap);
+ }
- public Map<String, String> getConfigInMaps() throws IOException {
- if (this.configInMaps == null && !StringUtils.isEmpty(config)) {
- this.configInMaps = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {});
+ public void setConfig(String config) throws IOException {
+ this.config = config;
+ this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {
+ });
+ }
+
+ public String getConfig() throws IOException {
+ return config;
+ }
+
+
+ @JsonProperty("data.unit")
+ public String getDataUnit() {
+ if (dataUnit != null) {
+ return dataUnit;
}
- return configInMaps;
+ return defaultDataUnit;
+ }
+
+ @JsonProperty("data.unit")
+ public void setDataUnit(String dataUnit) {
+ this.dataUnit = dataUnit;
}
- public void setConfig(Map<String, String> configInMaps) throws JsonProcessingException {
- this.configInMaps = configInMaps;
- this.config = JsonUtil.toJson(configInMaps);
+ public String getName() {
+ return name;
}
- public Map<String, String> getConfig() throws IOException {
- return getConfigInMaps();
+ public void setName(String name) {
+ if (StringUtils.isEmpty(name)) {
+ LOGGER.error("Connector name cannot be empty.");
+ throw new NullPointerException();
+ }
+ this.name = name;
}
public String getType() {
@@ -84,22 +143,19 @@ public class DataConnector extends AbstractAuditableEntity {
public DataConnector() {
}
- public DataConnector(String type, String version, String config) {
+ public DataConnector(String name, String type, String version, String config) throws IOException {
+ this.name = name;
this.type = type;
this.version = version;
this.config = config;
- TypeReference<Map<String, String>> mapType = new TypeReference<Map<String, String>>() {
- };
- try {
- this.configInMaps = JsonUtil.toEntity(config, mapType);
- } catch (IOException e) {
- LOGGER.error("Error in converting json to map. {}", e.getMessage());
- }
+ this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {
+ });
}
@Override
public String toString() {
return "DataConnector{" +
+ "name=" + name +
"type=" + type +
", version='" + version + '\'' +
", config=" + config +
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
index 14619cb..0466992 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
@@ -20,7 +20,11 @@ under the License.
package org.apache.griffin.core.measure.entity;
+
+import org.springframework.util.CollectionUtils;
+
import javax.persistence.*;
+import java.util.ArrayList;
import java.util.List;
@Entity
@@ -30,8 +34,8 @@ public class DataSource extends AbstractAuditableEntity {
private String name;
@OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
- @JoinColumn(name = "dataSource_id")
- private List<DataConnector> connectors;
+ @JoinColumn(name = "data_source_id")
+ private List<DataConnector> connectors = new ArrayList<>();
public String getName() {
return name;
@@ -46,6 +50,9 @@ public class DataSource extends AbstractAuditableEntity {
}
public void setConnectors(List<DataConnector> connectors) {
+ if (CollectionUtils.isEmpty(connectors)) {
+ throw new NullPointerException("Data connector can not be empty.");
+ }
this.connectors = connectors;
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
index 2a70636..75a39ce 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
@@ -24,6 +24,7 @@ import org.hibernate.annotations.Fetch;
import org.hibernate.annotations.FetchMode;
import javax.persistence.*;
+import java.util.ArrayList;
import java.util.List;
@@ -34,7 +35,7 @@ public class EvaluateRule extends AbstractAuditableEntity {
@OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
@JoinColumn(name = "evaluateRule_id")
@Fetch(FetchMode.SUBSELECT)
- private List<Rule> rules;
+ private List<Rule> rules = new ArrayList<>();
public List<Rule> getRules() {
return rules;
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
new file mode 100644
index 0000000..eb4a19d
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.griffin.core.job.entity.VirtualJob;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.OneToOne;
+
+/**
+ * Measures to publish metrics that processed externally
+ */
+@Entity
+public class ExternalMeasure extends Measure {
+
+ private String metricName;
+
+ @JsonIgnore
+ @OneToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
+ private VirtualJob virtualJob;
+
+ public ExternalMeasure() {
+ super();
+ }
+
+ public ExternalMeasure(String name, String description, String organization, String owner, String metricName) {
+ super(name, description, organization, owner);
+ this.metricName = metricName;
+ }
+
+ public String getMetricName() {
+ return metricName;
+ }
+
+ public void setMetricName(String metricName) {
+ this.metricName = metricName;
+ }
+
+ public VirtualJob getVirtualJob() {
+ return virtualJob;
+ }
+
+ public void setVirtualJob(VirtualJob virtualJob) {
+ this.virtualJob = virtualJob;
+ }
+
+ @Override
+ public String getType() {
+ return "external";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
new file mode 100644
index 0000000..3c5c602
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
@@ -0,0 +1,115 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure.entity;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.collections.CollectionUtils;
+
+import javax.persistence.*;
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Measures processed on Griffin
+ */
+@Entity
+public class GriffinMeasure extends Measure {
+
+ private String processType;
+
+ @Transient
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private Long timestamp;
+
+
+ @NotNull
+ @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+ @JoinColumn(name = "measure_id")
+ private List<DataSource> dataSources = new ArrayList<>();
+
+ @NotNull
+ @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+ @JoinColumn(name = "evaluate_rule_id")
+ private EvaluateRule evaluateRule;
+
+ @JsonProperty("process.type")
+ public String getProcessType() {
+ return processType;
+ }
+
+ @JsonProperty("process.type")
+ public void setProcessType(String processType) {
+ this.processType = processType;
+ }
+
+ @JsonProperty("data.sources")
+ public List<DataSource> getDataSources() {
+ return dataSources;
+ }
+
+ @JsonProperty("data.sources")
+ public void setDataSources(List<DataSource> dataSources) {
+ if (CollectionUtils.isEmpty(dataSources)) {
+ throw new NullPointerException("Data source can not be empty.");
+ }
+ this.dataSources = dataSources;
+ }
+
+ @JsonProperty("evaluate.rule")
+ public EvaluateRule getEvaluateRule() {
+ return evaluateRule;
+ }
+
+ @JsonProperty("evaluate.rule")
+ public void setEvaluateRule(EvaluateRule evaluateRule) {
+ if (evaluateRule == null || CollectionUtils.isEmpty(evaluateRule.getRules())) {
+ throw new NullPointerException("Evaluate rule can not be empty.");
+ }
+ this.evaluateRule = evaluateRule;
+ }
+
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(Long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String getType() {
+ return "griffin";
+ }
+
+ public GriffinMeasure() {
+ super();
+ }
+
+ public GriffinMeasure(Long measureId,String name, String description, String organization, String processType, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
+ super(name, description, organization, owner);
+ this.setId(measureId);
+ this.processType = processType;
+ this.dataSources = dataSources;
+ this.evaluateRule = evaluateRule;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
index d8afba4..cf2daec 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
@@ -19,39 +19,31 @@ under the License.
package org.apache.griffin.core.measure.entity;
-import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import javax.persistence.*;
-import java.util.List;
+import javax.persistence.Entity;
+import javax.persistence.Inheritance;
+import javax.persistence.InheritanceType;
+import javax.validation.constraints.NotNull;
@Entity
-public class Measure extends AbstractAuditableEntity {
+@Inheritance(strategy = InheritanceType.JOINED)
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes({@JsonSubTypes.Type(value = GriffinMeasure.class, name = "griffin"), @JsonSubTypes.Type(value = ExternalMeasure.class, name = "external")})
+public abstract class Measure extends AbstractAuditableEntity {
private static final long serialVersionUID = -4748881017029815714L;
- private String name;
+ @NotNull
+ protected String name;
- private String description;
+ protected String description;
- private String organization;
+ protected String organization;
- private String processType;
+ protected String owner;
- /**
- * record triggered time of measure
- */
- private Long triggerTimeStamp = -1L;
-
-
- @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
- @JoinColumn(name = "measure_id")
- private List<DataSource> dataSources;
-
- @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
- @JoinColumn(name = "evaluateRule_id")
- private EvaluateRule evaluateRule;
-
- private String owner;
- private Boolean deleted = false;
+ protected Boolean deleted = false;
public String getName() {
return name;
@@ -85,34 +77,6 @@ public class Measure extends AbstractAuditableEntity {
this.owner = owner;
}
- @JsonProperty("process.type")
- public String getProcessType() {
- return processType;
- }
-
- @JsonProperty("process.type")
- public void setProcessType(String processType) {
- this.processType = processType;
- }
-
- @JsonProperty("data.sources")
- public List<DataSource> getDataSources() {
- return dataSources;
- }
-
- @JsonProperty("data.sources")
- public void setDataSources(List<DataSource> dataSources) {
- this.dataSources = dataSources;
- }
-
- public EvaluateRule getEvaluateRule() {
- return evaluateRule;
- }
-
- public void setEvaluateRule(EvaluateRule evaluateRule) {
- this.evaluateRule = evaluateRule;
- }
-
public Boolean getDeleted() {
return this.deleted;
}
@@ -121,26 +85,15 @@ public class Measure extends AbstractAuditableEntity {
this.deleted = deleted;
}
- @JsonProperty("timestamp")
- public Long getTriggerTimeStamp() {
- return triggerTimeStamp;
- }
-
- @JsonProperty("timestamp")
- public void setTriggerTimeStamp(Long triggerTimeStamp) {
- this.triggerTimeStamp = triggerTimeStamp;
- }
-
public Measure() {
}
- public Measure(String name, String description, String organization, String processType, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
+ public Measure(String name, String description, String organization, String owner) {
this.name = name;
this.description = description;
this.organization = organization;
- this.processType = processType;
this.owner = owner;
- this.dataSources = dataSources;
- this.evaluateRule = evaluateRule;
}
+
+ public abstract String getType();
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
index b060bc4..f0c6516 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
@@ -23,12 +23,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.commons.lang.StringUtils;
import org.apache.griffin.core.util.JsonUtil;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Transient;
+import javax.persistence.*;
import java.io.IOException;
import java.util.Map;
@@ -43,10 +40,18 @@ public class Rule extends AbstractAuditableEntity {
private String dqType;
- @Column(length = 1024)
+ @Column(length = 10 * 1024)
private String rule;
@JsonIgnore
+ private String name;
+
+ @JsonIgnore
+ private String description;
+
+ @JsonIgnore
+ @Access(AccessType.PROPERTY)
+ @Column(length = 10 * 1024)
private String details;
@Transient
@@ -86,15 +91,14 @@ public class Rule extends AbstractAuditableEntity {
return details;
}
- public void setDetails(String details) {
+ private void setDetails(String details) throws IOException {
this.details = details;
+ detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() {
+ });
}
@JsonProperty("details")
- public Map<String, Object> getDetailsMap() throws IOException {
- if (detailsMap == null && !StringUtils.isEmpty(details)) {
- detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() {});
- }
+ public Map<String, Object> getDetailsMap() {
return detailsMap;
}
@@ -104,6 +108,22 @@ public class Rule extends AbstractAuditableEntity {
this.details = JsonUtil.toJson(details);
}
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
public Rule() {
}