You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/01/09 09:43:31 UTC

[04/11] incubator-griffin git commit: upgrade new version

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
new file mode 100644
index 0000000..131fe03
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobSchedule.java
@@ -0,0 +1,188 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+import org.apache.griffin.core.util.JsonUtil;
+import org.apache.griffin.core.util.PropertiesUtil;
+import org.quartz.CronExpression;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Configurable;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.stereotype.Component;
+
+import javax.persistence.*;
+import javax.validation.constraints.NotNull;
+import java.io.IOException;
+import java.util.*;
+
+@Configurable(preConstruction = true)
+@Component
+@Entity
+public class JobSchedule extends AbstractAuditableEntity {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(JobSchedule.class);
+
+    @NotNull
+    private Long measureId;
+
+    @NotNull
+    private String jobName;
+
+    @NotNull
+    private String cronExpression;
+
+    @NotNull
+    private String timeZone;
+
+    @JsonIgnore
+    @Access(AccessType.PROPERTY)
+    private String predicateConfig;
+
+    @Transient
+    private Map<String, Object> configMap = defaultPredicatesConfig();
+
+    @NotNull
+    @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+    @JoinColumn(name = "job_schedule_id")
+    private List<JobDataSegment> segments = new ArrayList<>();
+
+    @JsonProperty("measure.id")
+    public Long getMeasureId() {
+        return measureId;
+    }
+
+    @JsonProperty("measure.id")
+    public void setMeasureId(Long measureId) {
+        this.measureId = measureId;
+    }
+
+    @JsonProperty("job.name")
+    public String getJobName() {
+        return jobName;
+    }
+
+    @JsonProperty("job.name")
+    public void setJobName(String jobName) {
+        if (StringUtils.isEmpty(jobName)) {
+            LOGGER.error("Job name cannot be empty.");
+            throw new NullPointerException();
+        }
+        this.jobName = jobName;
+    }
+
+    @JsonProperty("cron.expression")
+    public String getCronExpression() {
+        return cronExpression;
+    }
+
+    @JsonProperty("cron.expression")
+    public void setCronExpression(String cronExpression) {
+        if (StringUtils.isEmpty(cronExpression) || !isCronExpressionValid(cronExpression)) {
+            LOGGER.error("Cron expression is invalid.Please check your cron expression.");
+            throw new IllegalArgumentException();
+        }
+        this.cronExpression = cronExpression;
+    }
+
+    @JsonProperty("cron.time.zone")
+    public String getTimeZone() {
+        return timeZone;
+    }
+
+    @JsonProperty("cron.time.zone")
+    public void setTimeZone(String timeZone) {
+        this.timeZone = timeZone;
+    }
+
+    @JsonProperty("data.segments")
+    public List<JobDataSegment> getSegments() {
+        return segments;
+    }
+
+    @JsonProperty("data.segments")
+    public void setSegments(List<JobDataSegment> segments) {
+        this.segments = segments;
+    }
+
+    private String getPredicateConfig() {
+        return predicateConfig;
+    }
+
+    private void setPredicateConfig(String config) throws IOException {
+        this.predicateConfig = config;
+        this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() {
+        });
+    }
+
+    @JsonProperty("predicate.config")
+    public Map<String, Object> getConfigMap() throws IOException {
+        return configMap;
+    }
+
+    @JsonProperty("predicate.config")
+    private void setConfigMap(Map<String, Object> configMap) throws JsonProcessingException {
+        this.configMap = configMap;
+        this.predicateConfig = JsonUtil.toJson(configMap);
+    }
+
+    /**
+     * @return set default predicate config
+     * @throws JsonProcessingException json exception
+     */
+    private Map<String, Object> defaultPredicatesConfig() throws JsonProcessingException {
+        String path = "/application.properties";
+        Properties appConf = PropertiesUtil.getProperties(path,new ClassPathResource(path));
+        Map<String, Object> scheduleConf = new HashMap<>();
+        Map<String, Object> map = new HashMap<>();
+        map.put("interval", appConf.getProperty("predicate.job.interval"));
+        map.put("repeat", appConf.getProperty("predicate.job.repeat.count"));
+        scheduleConf.put("checkdonefile.schedule", map);
+        setConfigMap(scheduleConf);
+        return scheduleConf;
+    }
+
+    private boolean isCronExpressionValid(String cronExpression) {
+        if (!CronExpression.isValidExpression(cronExpression)) {
+            LOGGER.error("Cron expression {} is invalid.", cronExpression);
+            return false;
+        }
+        return true;
+    }
+
+    public JobSchedule() throws JsonProcessingException {
+    }
+
+    public JobSchedule(Long measureId, String jobName, String cronExpression, Map configMap, List<JobDataSegment> segments) throws JsonProcessingException {
+        this.measureId = measureId;
+        this.jobName = jobName;
+        this.cronExpression = cronExpression;
+        setConfigMap(configMap);
+        this.segments = segments;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java
new file mode 100644
index 0000000..208fa8c
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivyConf.java
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+public class LivyConf implements Serializable {
+
+    private String file;
+
+    private String className;
+
+    private List<String> args;
+
+    private String name;
+
+    private String queue;
+
+    private Long numExecutors;
+
+    private Long executorCores;
+
+    private String driverMemory;
+
+    private String executorMemory;
+
+    private Map<String, String> conf;
+
+    private List<String> jars;
+
+    private List<String> files;
+
+    public String getFile() {
+        return file;
+    }
+
+    public void setFile(String file) {
+        this.file = file;
+    }
+
+    public String getClassName() {
+        return className;
+    }
+
+    public void setClassName(String className) {
+        this.className = className;
+    }
+
+    public List<String> getArgs() {
+        return args;
+    }
+
+    public void setArgs(List<String> args) {
+        this.args = args;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public Long getNumExecutors() {
+        return numExecutors;
+    }
+
+    public void setNumExecutors(Long numExecutors) {
+        this.numExecutors = numExecutors;
+    }
+
+    public Long getExecutorCores() {
+        return executorCores;
+    }
+
+    public void setExecutorCores(Long executorCores) {
+        this.executorCores = executorCores;
+    }
+
+    public String getDriverMemory() {
+        return driverMemory;
+    }
+
+    public void setDriverMemory(String driverMemory) {
+        this.driverMemory = driverMemory;
+    }
+
+    public String getExecutorMemory() {
+        return executorMemory;
+    }
+
+    public void setExecutorMemory(String executorMemory) {
+        this.executorMemory = executorMemory;
+    }
+
+    public Map<String, String> getConf() {
+        return conf;
+    }
+
+    public void setConf(Map<String, String> conf) {
+        this.conf = conf;
+    }
+
+    public List<String> getJars() {
+        return jars;
+    }
+
+    public void setJars(List<String> jars) {
+        this.jars = jars;
+    }
+
+    public List<String> getFiles() {
+        return files;
+    }
+
+    public void setFiles(List<String> files) {
+        this.files = files;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
index 773bd98..01e5070 100644
--- a/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/LivySessionStates.java
@@ -38,10 +38,16 @@ public class LivySessionStates {
         error,
         dead,
         success,
-        unknown
+        unknown,
+        finding,
+        not_found,
+        found
     }
 
-    public static SessionState toSessionState(State state) {
+    private static SessionState toSessionState(State state) {
+        if (state == null) {
+            return null;
+        }
         switch (state) {
             case not_started:
                 return new SessionState.NotStarted();
@@ -69,22 +75,17 @@ public class LivySessionStates {
     }
 
     public static boolean isActive(State state) {
-        if (State.unknown.equals(state)) {
-            // set unknown isactive() as false.
+        if (State.unknown.equals(state) || State.finding.equals(state) || State.not_found.equals(state) || State.found.equals(state)) {
+            // set unknown isActive() as false.
             return false;
         }
         SessionState sessionState = toSessionState(state);
-        if (sessionState == null) {
-            return false;
-        } else {
-            return sessionState.isActive();
-        }
+        return sessionState != null && sessionState.isActive();
     }
 
     public static boolean isHealthy(State state) {
-        if (State.error.equals(state) || State.dead.equals(state) || State.shutting_down.equals(state)) {
-            return false;
-        }
-        return true;
+        return !(State.error.equals(state) || State.dead.equals(state) ||
+                State.shutting_down.equals(state) || State.finding.equals(state) ||
+                State.not_found.equals(state) || State.found.equals(state));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
new file mode 100644
index 0000000..0f5a624
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentPredicate.java
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+
+package org.apache.griffin.core.job.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+import org.apache.griffin.core.util.JsonUtil;
+
+import javax.persistence.Access;
+import javax.persistence.AccessType;
+import javax.persistence.Entity;
+import javax.persistence.Transient;
+import java.io.IOException;
+import java.util.Map;
+
+@Entity
+public class SegmentPredicate extends AbstractAuditableEntity {
+
+    private String type;
+
+    @JsonIgnore
+    @Access(AccessType.PROPERTY)
+    private String config;
+
+    @Transient
+    private Map<String, String> configMap;
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getConfig() {
+        return config;
+    }
+
+    public void setConfig(String config) throws IOException {
+        this.config = config;
+        this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {
+        });
+    }
+
+    @JsonProperty("config")
+    public Map<String, String> getConfigMap() throws IOException {
+        return configMap;
+    }
+
+    @JsonProperty("config")
+    public void setConfigMap(Map<String, String> configMap) throws JsonProcessingException {
+        this.configMap = configMap;
+        this.config = JsonUtil.toJson(configMap);
+    }
+
+    public SegmentPredicate() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
new file mode 100644
index 0000000..b8ca5cf
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/SegmentRange.java
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+
+package org.apache.griffin.core.job.entity;
+
+import org.apache.griffin.core.measure.entity.AbstractAuditableEntity;
+
+import javax.persistence.Column;
+import javax.persistence.Entity;
+
+@Entity
+public class SegmentRange extends AbstractAuditableEntity {
+
+    @Column(name = "data_begin")
+    private String begin = "1h";
+
+    private String length = "1h";
+
+
+    public String getBegin() {
+        return begin;
+    }
+
+    public void setBegin(String begin) {
+        this.begin = begin;
+    }
+
+    public String getLength() {
+        return length;
+    }
+
+    public void setLength(String length) {
+        this.length = length;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java b/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java
deleted file mode 100644
index b5925f6..0000000
--- a/service/src/main/java/org/apache/griffin/core/job/entity/SparkJobDO.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.core.job.entity;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public class SparkJobDO implements Serializable {
-
-    private String file;
-
-    private String className;
-
-    private List<String> args;
-
-    private String name;
-
-    private String queue;
-
-    private Long numExecutors;
-
-    private Long executorCores;
-
-    private String driverMemory;
-
-    private String executorMemory;
-
-    private Map<String, String> conf;
-
-    private List<String> jars;
-
-    private List<String> files;
-
-    public String getFile() {
-        return file;
-    }
-
-    public void setFile(String file) {
-        this.file = file;
-    }
-
-    public String getClassName() {
-        return className;
-    }
-
-    public void setClassName(String className) {
-        this.className = className;
-    }
-
-    public List<String> getArgs() {
-        return args;
-    }
-
-    public void setArgs(List<String> args) {
-        this.args = args;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public String getQueue() {
-        return queue;
-    }
-
-    public void setQueue(String queue) {
-        this.queue = queue;
-    }
-
-    public Long getNumExecutors() {
-        return numExecutors;
-    }
-
-    public void setNumExecutors(Long numExecutors) {
-        this.numExecutors = numExecutors;
-    }
-
-    public Long getExecutorCores() {
-        return executorCores;
-    }
-
-    public void setExecutorCores(Long executorCores) {
-        this.executorCores = executorCores;
-    }
-
-    public String getDriverMemory() {
-        return driverMemory;
-    }
-
-    public void setDriverMemory(String driverMemory) {
-        this.driverMemory = driverMemory;
-    }
-
-    public String getExecutorMemory() {
-        return executorMemory;
-    }
-
-    public void setExecutorMemory(String executorMemory) {
-        this.executorMemory = executorMemory;
-    }
-
-    public Map<String, String> getConf() {
-        return conf;
-    }
-
-    public void setConf(Map<String, String> conf) {
-        this.conf = conf;
-    }
-
-    public List<String> getJars() {
-        return jars;
-    }
-
-    public void setJars(List<String> jars) {
-        this.jars = jars;
-    }
-
-    public List<String> getFiles() {
-        return files;
-    }
-
-    public void setFiles(List<String> files) {
-        this.files = files;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
new file mode 100644
index 0000000..ad98603
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/entity/VirtualJob.java
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.entity;
+
+import javax.persistence.Entity;
+
+@Entity
+public class VirtualJob extends AbstractJob {
+
+    public VirtualJob() {
+        super();
+    }
+
+    public VirtualJob(String jobName, Long measureId, String metricName) {
+        super(jobName, measureId, metricName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
new file mode 100644
index 0000000..cc2ff15
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/AutowiringSpringBeanJobFactory.java
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.factory;
+
+import org.quartz.spi.TriggerFiredBundle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.scheduling.quartz.SpringBeanJobFactory;
+
+public final class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory
+        implements ApplicationContextAware {
+    private static final Logger LOGGER = LoggerFactory.getLogger(AutowiringSpringBeanJobFactory.class);
+
+    private transient AutowireCapableBeanFactory beanFactory;
+
+    @Override
+    public void setApplicationContext(final ApplicationContext context) {
+        beanFactory = context.getAutowireCapableBeanFactory();
+    }
+
+    @Override
+    protected Object createJobInstance(final TriggerFiredBundle bundle) {
+
+        try {
+            final Object job = super.createJobInstance(bundle);
+            beanFactory.autowireBean(job);
+            return job;
+
+        } catch (Exception e) {
+            LOGGER.error("fail to create job instance. {}", e.getMessage());
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
new file mode 100644
index 0000000..8af39f4
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/factory/PredicatorFactory.java
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.factory;
+
+import org.apache.griffin.core.job.FileExistPredicator;
+import org.apache.griffin.core.job.Predicator;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
+
+public class PredicatorFactory {
+    public static Predicator newPredicateInstance(SegmentPredicate segPredicate) {
+        Predicator predicate = null;
+        switch (segPredicate.getType()) {
+            case "file.exist":
+                predicate = new FileExistPredicator(segPredicate);
+                break;
+            default:
+                break;
+        }
+        return predicate;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java
new file mode 100644
index 0000000..aaaa77d
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/GriffinJobRepo.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.GriffinJob;
+
+public interface GriffinJobRepo extends JobRepo<GriffinJob> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java
new file mode 100644
index 0000000..48dd3b4
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobDataSegmentRepo.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.JobDataSegment;
+import org.springframework.data.repository.CrudRepository;
+
+public interface JobDataSegmentRepo extends CrudRepository<JobDataSegment, Long> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
index 610d282..1714789 100644
--- a/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobInstanceRepo.java
@@ -18,45 +18,31 @@ under the License.
 */
 package org.apache.griffin.core.job.repo;
 
-import org.apache.griffin.core.job.entity.JobInstance;
-import org.apache.griffin.core.job.entity.LivySessionStates;
+import org.apache.griffin.core.job.entity.JobInstanceBean;
 import org.springframework.data.domain.Pageable;
 import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.repository.CrudRepository;
-import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.util.List;
 
+public interface JobInstanceRepo extends CrudRepository<JobInstanceBean, Long> {
 
-@Repository
-public interface JobInstanceRepo extends CrudRepository<JobInstance, Long> {
-    /**
-     * @param group    is group name
-     * @param name     is job name
-     * @param pageable page info
-     * @return all job instances scheduled at different time using the same prototype job,
-     * the prototype job is determined by SCHED_NAME, group name and job name in table QRTZ_JOB_DETAILS.
-     */
-    @Query("select s from JobInstance s " +
-            "where s.groupName= ?1 and s.jobName=?2 ")
-    List<JobInstance> findByGroupNameAndJobName(String group, String name, Pageable pageable);
+    @Query("select DISTINCT s from JobInstanceBean s " +
+            "where s.state in ('starting', 'not_started', 'recovering', 'idle', 'running', 'busy')")
+    List<JobInstanceBean> findByActiveState();
 
-    @Query("select s from JobInstance s " +
-            "where s.groupName= ?1 and s.jobName=?2 ")
-    List<JobInstance> findByGroupNameAndJobName(String group, String name);
+    JobInstanceBean findByPredicateName(String name);
 
-    @Query("select DISTINCT s.groupName, s.jobName from JobInstance s")
-    List<Object> findGroupWithJobName();
+    @Query("select s from JobInstanceBean s where job_id = ?1")
+    List<JobInstanceBean> findByJobId(Long jobId, Pageable pageable);
 
-    @Modifying
-    @Query("delete from JobInstance s " +
-            "where s.groupName= ?1 and s.jobName=?2 ")
-    void deleteByGroupAndJobName(String groupName, String jobName);
+    List<JobInstanceBean> findByExpireTmsLessThanEqual(Long expireTms);
 
+    @Transactional
     @Modifying
-    @Query("update JobInstance s " +
-            "set s.state= ?2, s.appId= ?3, s.appUri= ?4 where s.id= ?1")
-    void update(Long id, LivySessionStates.State state, String appId, String appUri);
+    @Query("delete from JobInstanceBean j where j.expireTms <= ?1")
+    int deleteByExpireTimestamp(Long expireTms);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
new file mode 100644
index 0000000..a3fcce3
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobRepo.java
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.AbstractJob;
+import org.springframework.data.jpa.repository.Query;
+import org.springframework.data.repository.CrudRepository;
+
+import java.util.List;
+
+public interface JobRepo<T extends AbstractJob> extends CrudRepository<T, Long> {
+
+    @Query("select count(j) from #{#entityName} j where j.jobName = ?1 and j.deleted = ?2")
+    int countByJobNameAndDeleted(String jobName, Boolean deleted);
+
+    List<T> findByDeleted(boolean deleted);
+
+    List<T> findByJobNameAndDeleted(String jobName, boolean deleted);
+
+    List<T> findByMeasureIdAndDeleted(Long measureId, boolean deleted);
+
+    T findByIdAndDeleted(Long jobId, boolean deleted);
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
new file mode 100644
index 0000000..1b360e4
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/JobScheduleRepo.java
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.JobSchedule;
+import org.springframework.data.repository.CrudRepository;
+
+public interface JobScheduleRepo extends CrudRepository<JobSchedule, Long> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java b/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java
new file mode 100644
index 0000000..914a1ff
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/job/repo/VirtualJobRepo.java
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.job.repo;
+
+import org.apache.griffin.core.job.entity.VirtualJob;
+
+public interface VirtualJobRepo extends JobRepo<VirtualJob> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
new file mode 100644
index 0000000..f38982a
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/ExternalMeasureOperationImpl.java
@@ -0,0 +1,102 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.job.entity.VirtualJob;
+import org.apache.griffin.core.job.repo.VirtualJobRepo;
+import org.apache.griffin.core.measure.entity.ExternalMeasure;
+import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.measure.repo.ExternalMeasureRepo;
+import org.apache.griffin.core.util.GriffinOperationMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component("externalOperation")
+public class ExternalMeasureOperationImpl implements MeasureOperation {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ExternalMeasureOperationImpl.class);
+
+    @Autowired
+    private ExternalMeasureRepo measureRepo;
+    @Autowired
+    private VirtualJobRepo jobRepo;
+
+    @Override
+    public GriffinOperationMessage create(Measure measure) {
+        ExternalMeasure em = (ExternalMeasure) measure;
+        if (StringUtils.isBlank(em.getMetricName())) {
+            LOGGER.error("Failed to create external measure {}. Its metric name is blank.", measure.getName());
+            return GriffinOperationMessage.CREATE_MEASURE_FAIL;
+        }
+        try {
+            em.setVirtualJob(new VirtualJob());
+            em = measureRepo.save(em);
+            VirtualJob vj = genVirtualJob(em, em.getVirtualJob());
+            jobRepo.save(vj);
+            return GriffinOperationMessage.CREATE_MEASURE_SUCCESS;
+        } catch (Exception e) {
+            LOGGER.error("Failed to create new measure {}.{}", em.getName(), e.getMessage());
+        }
+        return GriffinOperationMessage.CREATE_MEASURE_FAIL;
+    }
+
+    @Override
+    public GriffinOperationMessage update(Measure measure) {
+        ExternalMeasure latestMeasure = (ExternalMeasure) measure;
+        if (StringUtils.isBlank(latestMeasure.getMetricName())) {
+            LOGGER.error("Failed to create external measure {}. Its metric name is blank.", measure.getName());
+            return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+        }
+        try {
+            ExternalMeasure originMeasure = measureRepo.findOne(latestMeasure.getId());
+            VirtualJob vj = genVirtualJob(latestMeasure, originMeasure.getVirtualJob());
+            latestMeasure.setVirtualJob(vj);
+            measureRepo.save(latestMeasure);
+            return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS;
+        } catch (Exception e) {
+            LOGGER.error("Failed to update measure. {}", e.getMessage());
+        }
+        return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+    }
+
+    @Override
+    public Boolean delete(Measure measure) {
+        try {
+            ExternalMeasure em = (ExternalMeasure) measure;
+            em.setDeleted(true);
+            em.getVirtualJob().setDeleted(true);
+            measureRepo.save(em);
+            return true;
+        } catch (Exception e) {
+            LOGGER.error("Failed to delete measure. {}", e.getMessage());
+        }
+        return false;
+
+    }
+
+    private VirtualJob genVirtualJob(ExternalMeasure em, VirtualJob vj) {
+        vj.setMeasureId(em.getId());
+        vj.setJobName(em.getName());
+        vj.setMetricName(em.getMetricName());
+        return vj;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
new file mode 100644
index 0000000..88c5409
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/GriffinMeasureOperationImpl.java
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.job.JobServiceImpl;
+import org.apache.griffin.core.measure.entity.DataConnector;
+import org.apache.griffin.core.measure.entity.DataSource;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
+import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.measure.repo.DataConnectorRepo;
+import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.apache.griffin.core.util.GriffinOperationMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Component("griffinOperation")
+public class GriffinMeasureOperationImpl implements MeasureOperation {
+    private static final Logger LOGGER = LoggerFactory.getLogger(GriffinMeasureOperationImpl.class);
+
+    @Autowired
+    private MeasureRepo<Measure> measureRepo;
+    @Autowired
+    private DataConnectorRepo dcRepo;
+    @Autowired
+    private JobServiceImpl jobService;
+
+
+    @Override
+    public GriffinOperationMessage create(Measure measure) {
+        if (!isConnectorNamesValid((GriffinMeasure) measure)) {
+            return GriffinOperationMessage.CREATE_MEASURE_FAIL;
+        }
+        try {
+            measureRepo.save(measure);
+            return GriffinOperationMessage.CREATE_MEASURE_SUCCESS;
+        } catch (Exception e) {
+            LOGGER.error("Failed to create new measure {}.", measure.getName(), e);
+        }
+        return GriffinOperationMessage.CREATE_MEASURE_FAIL;
+    }
+
+    @Override
+    public GriffinOperationMessage update(Measure measure) {
+        try {
+            measureRepo.save(measure);
+            return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS;
+        } catch (Exception e) {
+            LOGGER.error("Failed to update measure. {}", e.getMessage());
+        }
+        return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+    }
+
+    @Override
+    public Boolean delete(Measure measure) {
+        boolean pauseStatus = jobService.deleteJobsRelateToMeasure(measure.getId());
+        if (!pauseStatus) {
+            return false;
+        }
+        measure.setDeleted(true);
+        measureRepo.save(measure);
+        return true;
+    }
+
+    private boolean isConnectorNamesValid(GriffinMeasure measure) {
+        List<String> names = getConnectorNames(measure);
+        if (names.size() == 0) {
+            LOGGER.warn("Connector names cannot be empty.");
+            return false;
+        }
+        List<DataConnector> connectors = dcRepo.findByConnectorNames(names);
+        if (!CollectionUtils.isEmpty(connectors)) {
+            LOGGER.warn("Failed to create new measure {}. It's connector names already exist. ", measure.getName());
+            return false;
+        }
+        return true;
+    }
+
+    private List<String> getConnectorNames(GriffinMeasure measure) {
+        List<String> names = new ArrayList<>();
+        for (DataSource source : measure.getDataSources()) {
+            for (DataConnector dc : source.getConnectors()) {
+                String name = dc.getName();
+                if (!StringUtils.isEmpty(name)) {
+                    names.add(name);
+                }
+            }
+        }
+        return names;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
index fae0169..3b557ca 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureController.java
@@ -37,17 +37,17 @@ public class MeasureController {
         return measureService.getAllAliveMeasures();
     }
 
-    @RequestMapping(value = "/measure/{id}", method = RequestMethod.GET)
+    @RequestMapping(value = "/measures/{id}", method = RequestMethod.GET)
     public Measure getMeasureById(@PathVariable("id") long id) {
         return measureService.getMeasureById(id);
     }
 
-    @RequestMapping(value = "/measure/{id}", method = RequestMethod.DELETE)
+    @RequestMapping(value = "/measures/{id}", method = RequestMethod.DELETE)
     public GriffinOperationMessage deleteMeasureById(@PathVariable("id") Long id) {
         return measureService.deleteMeasureById(id);
     }
 
-    @RequestMapping(value = "/measure", method = RequestMethod.PUT)
+    @RequestMapping(value = "/measures", method = RequestMethod.PUT)
     public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) {
         return measureService.updateMeasure(measure);
     }
@@ -57,7 +57,7 @@ public class MeasureController {
         return measureService.getAliveMeasuresByOwner(owner);
     }
 
-    @RequestMapping(value = "/measure", method = RequestMethod.POST)
+    @RequestMapping(value = "/measures", method = RequestMethod.POST)
     public GriffinOperationMessage createMeasure(@RequestBody Measure measure) {
         return measureService.createMeasure(measure);
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
new file mode 100644
index 0000000..80f1f30
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOperation.java
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure;
+
+
+import org.apache.griffin.core.measure.entity.Measure;
+import org.apache.griffin.core.util.GriffinOperationMessage;
+
+public interface MeasureOperation {
+
+    GriffinOperationMessage create(Measure measure);
+
+    GriffinOperationMessage update(Measure measure);
+
+    Boolean delete(Measure measure);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java
index abe36d9..499ee8e 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgController.java
@@ -19,14 +19,12 @@ under the License.
 
 package org.apache.griffin.core.measure;
 
-import org.apache.griffin.core.job.JobService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
@@ -37,9 +35,6 @@ public class MeasureOrgController {
     @Autowired
     private MeasureOrgService measureOrgService;
 
-    @Autowired
-    private JobService jobService;
-
     @RequestMapping(value = "/org", method = RequestMethod.GET)
     public List<String> getOrgs() {
         return measureOrgService.getOrgs();
@@ -59,9 +54,4 @@ public class MeasureOrgController {
     public Map<String, List<String>> getMeasureNamesGroupByOrg() {
         return measureOrgService.getMeasureNamesGroupByOrg();
     }
-
-    @RequestMapping(value = "/org/measure/jobs", method = RequestMethod.GET)
-    public Map<String, Map<String, List<Map<String, Serializable>>>> getMeasureWithJobsGroupByOrg() {
-        return measureOrgService.getMeasureWithJobDetailsGroupByOrg(jobService.getJobDetailsGroupByMeasureId());
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
index f45c636..754f3d1 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgService.java
@@ -31,5 +31,5 @@ public interface MeasureOrgService {
 
     Map<String, List<String>> getMeasureNamesGroupByOrg();
 
-    Map<String, Map<String, List<Map<String, Serializable>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Serializable>>> jobDetailsGroupByMeasure);
+    Map<String, Map<String, List<Map<String, Object>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Object>>> jobDetailsGroupByMeasure);
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
index d4cb6a9..1d64830 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
@@ -19,12 +19,12 @@ under the License.
 
 package org.apache.griffin.core.measure;
 
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
 import org.apache.griffin.core.measure.entity.Measure;
-import org.apache.griffin.core.measure.repo.MeasureRepo;
+import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -34,7 +34,7 @@ import java.util.Map;
 public class MeasureOrgServiceImpl implements MeasureOrgService {
 
     @Autowired
-    private MeasureRepo measureRepo;
+    private GriffinMeasureRepo measureRepo;
 
     @Override
     public List<String> getOrgs() {
@@ -43,20 +43,20 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
 
     @Override
     public List<String> getMetricNameListByOrg(String org) {
-        return measureRepo.findNameByOrganization(org,false);
+        return measureRepo.findNameByOrganization(org, false);
     }
 
     @Override
     public Map<String, List<String>> getMeasureNamesGroupByOrg() {
         Map<String, List<String>> orgWithMetricsMap = new HashMap<>();
-        List<Measure> measures = measureRepo.findByDeleted(false);
+        List<GriffinMeasure> measures = measureRepo.findByDeleted(false);
         if (measures == null) {
             return null;
         }
         for (Measure measure : measures) {
             String orgName = measure.getOrganization();
             String measureName = measure.getName();
-            List<String> measureList = orgWithMetricsMap.getOrDefault(orgName, new ArrayList<String>());
+            List<String> measureList = orgWithMetricsMap.getOrDefault(orgName, new ArrayList<>());
             measureList.add(measureName);
             orgWithMetricsMap.put(orgName, measureList);
         }
@@ -64,9 +64,9 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
     }
 
     @Override
-    public Map<String, Map<String, List<Map<String, Serializable>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Serializable>>> jobDetails) {
-        Map<String, Map<String, List<Map<String, Serializable>>>> result = new HashMap<>();
-        List<Measure> measures = measureRepo.findByDeleted(false);
+    public Map<String, Map<String, List<Map<String, Object>>>> getMeasureWithJobDetailsGroupByOrg(Map<String, List<Map<String, Object>>> jobDetails) {
+        Map<String, Map<String, List<Map<String, Object>>>> result = new HashMap<>();
+        List<GriffinMeasure> measures = measureRepo.findByDeleted(false);
         if (measures == null) {
             return null;
         }
@@ -74,8 +74,8 @@ public class MeasureOrgServiceImpl implements MeasureOrgService {
             String orgName = measure.getOrganization();
             String measureName = measure.getName();
             String measureId = measure.getId().toString();
-            List<Map<String, Serializable>> jobList = jobDetails.getOrDefault(measureId, new ArrayList<>());
-            Map<String, List<Map<String, Serializable>>> measureWithJobs = result.getOrDefault(orgName, new HashMap<>());
+            List<Map<String, Object>> jobList = jobDetails.getOrDefault(measureId, new ArrayList<>());
+            Map<String, List<Map<String, Object>>> measureWithJobs = result.getOrDefault(orgName, new HashMap<>());
             measureWithJobs.put(measureName, jobList);
             result.put(orgName, measureWithJobs);
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
index 0e20b4f..a330d0a 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureService.java
@@ -24,11 +24,10 @@ import org.apache.griffin.core.measure.entity.Measure;
 import org.apache.griffin.core.util.GriffinOperationMessage;
 
 import java.util.List;
-import java.util.Map;
 
 public interface MeasureService {
 
-    Iterable<Measure> getAllAliveMeasures();
+    List<Measure> getAllAliveMeasures();
 
     Measure getMeasureById(long id);
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
index 8c088c8..ecb9fdd 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureServiceImpl.java
@@ -20,17 +20,16 @@ under the License.
 package org.apache.griffin.core.measure;
 
 
-import org.apache.griffin.core.job.JobServiceImpl;
+import org.apache.griffin.core.measure.entity.GriffinMeasure;
 import org.apache.griffin.core.measure.entity.Measure;
 import org.apache.griffin.core.measure.repo.MeasureRepo;
 import org.apache.griffin.core.util.GriffinOperationMessage;
-import org.quartz.SchedulerException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.util.CollectionUtils;
 
 import java.util.List;
 
@@ -39,79 +38,76 @@ public class MeasureServiceImpl implements MeasureService {
     private static final Logger LOGGER = LoggerFactory.getLogger(MeasureServiceImpl.class);
 
     @Autowired
-    private JobServiceImpl jobService;
+    private MeasureRepo<Measure> measureRepo;
     @Autowired
-    private MeasureRepo measureRepo;
+    @Qualifier("griffinOperation")
+    private MeasureOperation griffinOp;
+    @Autowired
+    @Qualifier("externalOperation")
+    private MeasureOperation externalOp;
 
     @Override
-    public Iterable<Measure> getAllAliveMeasures() {
+    public List<Measure> getAllAliveMeasures() {
         return measureRepo.findByDeleted(false);
     }
 
     @Override
-    public Measure getMeasureById(@PathVariable("id") long id) {
+    public Measure getMeasureById(long id) {
         return measureRepo.findByIdAndDeleted(id, false);
     }
 
     @Override
-    public GriffinOperationMessage deleteMeasureById(Long measureId) {
-        if (!measureRepo.exists(measureId)) {
-            return GriffinOperationMessage.RESOURCE_NOT_FOUND;
-        } else {
-            Measure measure = measureRepo.findOne(measureId);
-            try {
-                //pause all jobs related to the measure
-                jobService.deleteJobsRelateToMeasure(measure);
-                measure.setDeleted(true);
-                measureRepo.save(measure);
-            } catch (SchedulerException e) {
-                LOGGER.error("Delete measure id: {} name: {} failure. {}", measure.getId(), measure.getName(), e.getMessage());
-                return GriffinOperationMessage.DELETE_MEASURE_BY_ID_FAIL;
-            }
-
-            return GriffinOperationMessage.DELETE_MEASURE_BY_ID_SUCCESS;
-        }
+    public List<Measure> getAliveMeasuresByOwner(String owner) {
+        return measureRepo.findByOwnerAndDeleted(owner, false);
     }
 
     @Override
     public GriffinOperationMessage createMeasure(Measure measure) {
         List<Measure> aliveMeasureList = measureRepo.findByNameAndDeleted(measure.getName(), false);
-        if (aliveMeasureList.size() == 0) {
-            try {
-                if (measureRepo.save(measure) != null) {
-                    return GriffinOperationMessage.CREATE_MEASURE_SUCCESS;
-                } else {
-                    return GriffinOperationMessage.CREATE_MEASURE_FAIL;
-                }
-            } catch (Exception e) {
-                LOGGER.info("Failed to create new measure {}.{}", measure.getName(), e.getMessage());
-                return GriffinOperationMessage.CREATE_MEASURE_FAIL;
-            }
-
-        } else {
-            LOGGER.info("Failed to create new measure {}, it already exists.", measure.getName());
+        if (!CollectionUtils.isEmpty(aliveMeasureList)) {
+            LOGGER.warn("Failed to create new measure {}, it already exists.", measure.getName());
             return GriffinOperationMessage.CREATE_MEASURE_FAIL_DUPLICATE;
         }
+        MeasureOperation op = getOperation(measure);
+        return op.create(measure);
     }
 
     @Override
-    public List<Measure> getAliveMeasuresByOwner(String owner) {
-        return measureRepo.findByOwnerAndDeleted(owner, false);
+    public GriffinOperationMessage updateMeasure(Measure measure) {
+        Measure m = measureRepo.findByIdAndDeleted(measure.getId(), false);
+        if (m == null) {
+            return GriffinOperationMessage.RESOURCE_NOT_FOUND;
+        }
+        if (!m.getType().equals(measure.getType())) {
+            LOGGER.error("Can't update measure to different type.");
+            return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+        }
+        MeasureOperation op = getOperation(measure);
+        return op.update(measure);
     }
 
     @Override
-    public GriffinOperationMessage updateMeasure(@RequestBody Measure measure) {
-        if (measureRepo.findByIdAndDeleted(measure.getId(), false) == null) {
+    public GriffinOperationMessage deleteMeasureById(Long measureId) {
+        Measure measure = measureRepo.findByIdAndDeleted(measureId, false);
+        if (measure == null) {
             return GriffinOperationMessage.RESOURCE_NOT_FOUND;
-        } else {
-            try {
-                measureRepo.save(measure);
-            } catch (Exception e) {
-                LOGGER.error("Failed to update measure. {}", e.getMessage());
-                return GriffinOperationMessage.UPDATE_MEASURE_FAIL;
+        }
+        try {
+            MeasureOperation op = getOperation(measure);
+            if (op.delete(measure)) {
+                return GriffinOperationMessage.DELETE_MEASURE_BY_ID_SUCCESS;
             }
+        } catch (Exception e) {
+            LOGGER.error("Delete measure id: {} name: {} failure. {}", measure.getId(), measure.getName(), e.getMessage());
+        }
+        return GriffinOperationMessage.DELETE_MEASURE_BY_ID_FAIL;
+    }
 
-            return GriffinOperationMessage.UPDATE_MEASURE_SUCCESS;
+    private MeasureOperation getOperation(Measure measure) {
+        if (measure instanceof GriffinMeasure) {
+            return griffinOp;
         }
+        return externalOp;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
index a36c240..3c4abf5 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java
@@ -20,16 +20,21 @@ under the License.
 package org.apache.griffin.core.measure.entity;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.job.entity.SegmentPredicate;
 import org.apache.griffin.core.util.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.StringUtils;
 
-import javax.persistence.Entity;
-import javax.persistence.Transient;
+import javax.persistence.*;
+import javax.validation.constraints.NotNull;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 @Entity
@@ -38,30 +43,84 @@ public class DataConnector extends AbstractAuditableEntity {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(DataConnector.class);
 
+    @NotNull
+    private String name;
+
     private String type;
 
     private String version;
 
-    private String config;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private String dataUnit;
 
     @JsonIgnore
     @Transient
-    private Map<String, String> configInMaps;
+    private String defaultDataUnit = "365000d";
+
+    @JsonIgnore
+    @Access(AccessType.PROPERTY)
+    private String config;
+
+    @Transient
+    private Map<String, String> configMap;
+
+    @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+    @JoinColumn(name = "data_connector_id")
+    private List<SegmentPredicate> predicates = new ArrayList<>();
+
+    public List<SegmentPredicate> getPredicates() {
+        return predicates;
+    }
+
+    public void setPredicates(List<SegmentPredicate> predicates) {
+        this.predicates = predicates;
+    }
+
+    @JsonProperty("config")
+    public Map<String, String> getConfigMap() throws IOException {
+        return configMap;
+    }
+
+    @JsonProperty("config")
+    public void setConfigMap(Map<String, String> configMap) throws JsonProcessingException {
+        this.configMap = configMap;
+        this.config = JsonUtil.toJson(configMap);
+    }
 
-    public Map<String, String> getConfigInMaps() throws IOException {
-        if (this.configInMaps == null && !StringUtils.isEmpty(config)) {
-            this.configInMaps = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {});
+    public void setConfig(String config) throws IOException {
+        this.config = config;
+        this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {
+        });
+    }
+
+    public String getConfig() throws IOException {
+        return config;
+    }
+
+
+    @JsonProperty("data.unit")
+    public String getDataUnit() {
+        if (dataUnit != null) {
+            return dataUnit;
         }
-        return configInMaps;
+        return defaultDataUnit;
+    }
+
+    @JsonProperty("data.unit")
+    public void setDataUnit(String dataUnit) {
+        this.dataUnit = dataUnit;
     }
 
-    public void setConfig(Map<String, String> configInMaps) throws JsonProcessingException {
-        this.configInMaps = configInMaps;
-        this.config = JsonUtil.toJson(configInMaps);
+    public String getName() {
+        return name;
     }
 
-    public Map<String, String> getConfig() throws IOException {
-        return getConfigInMaps();
+    public void setName(String name) {
+        if (StringUtils.isEmpty(name)) {
+            LOGGER.error("Connector name cannot be empty.");
+            throw new NullPointerException();
+        }
+        this.name = name;
     }
 
     public String getType() {
@@ -84,22 +143,19 @@ public class DataConnector extends AbstractAuditableEntity {
     public DataConnector() {
     }
 
-    public DataConnector(String type, String version, String config) {
+    public DataConnector(String name, String type, String version, String config) throws IOException {
+        this.name = name;
         this.type = type;
         this.version = version;
         this.config = config;
-        TypeReference<Map<String, String>> mapType = new TypeReference<Map<String, String>>() {
-        };
-        try {
-            this.configInMaps = JsonUtil.toEntity(config, mapType);
-        } catch (IOException e) {
-            LOGGER.error("Error in converting json to map. {}", e.getMessage());
-        }
+        this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {
+        });
     }
 
     @Override
     public String toString() {
         return "DataConnector{" +
+                "name=" + name +
                 "type=" + type +
                 ", version='" + version + '\'' +
                 ", config=" + config +

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
index 14619cb..0466992 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java
@@ -20,7 +20,11 @@ under the License.
 package org.apache.griffin.core.measure.entity;
 
 
+
+import org.springframework.util.CollectionUtils;
+
 import javax.persistence.*;
+import java.util.ArrayList;
 import java.util.List;
 
 @Entity
@@ -30,8 +34,8 @@ public class DataSource extends AbstractAuditableEntity {
     private String name;
 
     @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
-    @JoinColumn(name = "dataSource_id")
-    private List<DataConnector> connectors;
+    @JoinColumn(name = "data_source_id")
+    private List<DataConnector> connectors = new ArrayList<>();
 
     public String getName() {
         return name;
@@ -46,6 +50,9 @@ public class DataSource extends AbstractAuditableEntity {
     }
 
     public void setConnectors(List<DataConnector> connectors) {
+        if (CollectionUtils.isEmpty(connectors)) {
+            throw new NullPointerException("Data connector can not be empty.");
+        }
         this.connectors = connectors;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
index 2a70636..75a39ce 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/EvaluateRule.java
@@ -24,6 +24,7 @@ import org.hibernate.annotations.Fetch;
 import org.hibernate.annotations.FetchMode;
 
 import javax.persistence.*;
+import java.util.ArrayList;
 import java.util.List;
 
 
@@ -34,7 +35,7 @@ public class EvaluateRule extends AbstractAuditableEntity {
     @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
     @JoinColumn(name = "evaluateRule_id")
     @Fetch(FetchMode.SUBSELECT)
-    private List<Rule> rules;
+    private List<Rule> rules = new ArrayList<>();
 
     public List<Rule> getRules() {
         return rules;

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
new file mode 100644
index 0000000..eb4a19d
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/ExternalMeasure.java
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.griffin.core.job.entity.VirtualJob;
+
+import javax.persistence.CascadeType;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.OneToOne;
+
+/**
+ * Measures to publish metrics that processed externally
+ */
+@Entity
+public class ExternalMeasure extends Measure {
+
+    private String metricName;
+
+    @JsonIgnore
+    @OneToOne(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
+    private VirtualJob virtualJob;
+
+    public ExternalMeasure() {
+        super();
+    }
+
+    public ExternalMeasure(String name, String description, String organization, String owner, String metricName) {
+        super(name, description, organization, owner);
+        this.metricName = metricName;
+    }
+
+    public String getMetricName() {
+        return metricName;
+    }
+
+    public void setMetricName(String metricName) {
+        this.metricName = metricName;
+    }
+
+    public VirtualJob getVirtualJob() {
+        return virtualJob;
+    }
+
+    public void setVirtualJob(VirtualJob virtualJob) {
+        this.virtualJob = virtualJob;
+    }
+
+    @Override
+    public String getType() {
+        return "external";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
new file mode 100644
index 0000000..3c5c602
--- /dev/null
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java
@@ -0,0 +1,115 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.griffin.core.measure.entity;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.commons.collections.CollectionUtils;
+
+import javax.persistence.*;
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Measures processed on Griffin
+ */
+@Entity
+public class GriffinMeasure extends Measure {
+
+    private String processType;
+
+    @Transient
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private Long timestamp;
+
+
+    @NotNull
+    @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+    @JoinColumn(name = "measure_id")
+    private List<DataSource> dataSources = new ArrayList<>();
+
+    @NotNull
+    @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
+    @JoinColumn(name = "evaluate_rule_id")
+    private EvaluateRule evaluateRule;
+
+    @JsonProperty("process.type")
+    public String getProcessType() {
+        return processType;
+    }
+
+    @JsonProperty("process.type")
+    public void setProcessType(String processType) {
+        this.processType = processType;
+    }
+
+    @JsonProperty("data.sources")
+    public List<DataSource> getDataSources() {
+        return dataSources;
+    }
+
+    @JsonProperty("data.sources")
+    public void setDataSources(List<DataSource> dataSources) {
+        if (CollectionUtils.isEmpty(dataSources)) {
+            throw new NullPointerException("Data source can not be empty.");
+        }
+        this.dataSources = dataSources;
+    }
+
+    @JsonProperty("evaluate.rule")
+    public EvaluateRule getEvaluateRule() {
+        return evaluateRule;
+    }
+
+    @JsonProperty("evaluate.rule")
+    public void setEvaluateRule(EvaluateRule evaluateRule) {
+        if (evaluateRule == null || CollectionUtils.isEmpty(evaluateRule.getRules())) {
+            throw new NullPointerException("Evaluate rule can not be empty.");
+        }
+        this.evaluateRule = evaluateRule;
+    }
+
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(Long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public String getType() {
+        return "griffin";
+    }
+
+    public GriffinMeasure() {
+        super();
+    }
+
+    public GriffinMeasure(Long measureId,String name, String description, String organization, String processType, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
+        super(name, description, organization, owner);
+        this.setId(measureId);
+        this.processType = processType;
+        this.dataSources = dataSources;
+        this.evaluateRule = evaluateRule;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
index d8afba4..cf2daec 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java
@@ -19,39 +19,31 @@ under the License.
 
 package org.apache.griffin.core.measure.entity;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
-import javax.persistence.*;
-import java.util.List;
+import javax.persistence.Entity;
+import javax.persistence.Inheritance;
+import javax.persistence.InheritanceType;
+import javax.validation.constraints.NotNull;
 
 @Entity
-public class Measure extends AbstractAuditableEntity {
+@Inheritance(strategy = InheritanceType.JOINED)
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
+@JsonSubTypes({@JsonSubTypes.Type(value = GriffinMeasure.class, name = "griffin"), @JsonSubTypes.Type(value = ExternalMeasure.class, name = "external")})
+public abstract class Measure extends AbstractAuditableEntity {
     private static final long serialVersionUID = -4748881017029815714L;
 
-    private String name;
+    @NotNull
+    protected String name;
 
-    private String description;
+    protected String description;
 
-    private String organization;
+    protected String organization;
 
-    private String processType;
+    protected String owner;
 
-    /**
-     * record triggered time of measure
-     */
-    private Long triggerTimeStamp = -1L;
-
-
-    @OneToMany(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
-    @JoinColumn(name = "measure_id")
-    private List<DataSource> dataSources;
-
-    @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE})
-    @JoinColumn(name = "evaluateRule_id")
-    private EvaluateRule evaluateRule;
-
-    private String owner;
-    private Boolean deleted = false;
+    protected Boolean deleted = false;
 
     public String getName() {
         return name;
@@ -85,34 +77,6 @@ public class Measure extends AbstractAuditableEntity {
         this.owner = owner;
     }
 
-    @JsonProperty("process.type")
-    public String getProcessType() {
-        return processType;
-    }
-
-    @JsonProperty("process.type")
-    public void setProcessType(String processType) {
-        this.processType = processType;
-    }
-
-    @JsonProperty("data.sources")
-    public List<DataSource> getDataSources() {
-        return dataSources;
-    }
-
-    @JsonProperty("data.sources")
-    public void setDataSources(List<DataSource> dataSources) {
-        this.dataSources = dataSources;
-    }
-
-    public EvaluateRule getEvaluateRule() {
-        return evaluateRule;
-    }
-
-    public void setEvaluateRule(EvaluateRule evaluateRule) {
-        this.evaluateRule = evaluateRule;
-    }
-
     public Boolean getDeleted() {
         return this.deleted;
     }
@@ -121,26 +85,15 @@ public class Measure extends AbstractAuditableEntity {
         this.deleted = deleted;
     }
 
-    @JsonProperty("timestamp")
-    public Long getTriggerTimeStamp() {
-        return triggerTimeStamp;
-    }
-
-    @JsonProperty("timestamp")
-    public void setTriggerTimeStamp(Long triggerTimeStamp) {
-        this.triggerTimeStamp = triggerTimeStamp;
-    }
-
     public Measure() {
     }
 
-    public Measure(String name, String description, String organization, String processType, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) {
+    public Measure(String name, String description, String organization, String owner) {
         this.name = name;
         this.description = description;
         this.organization = organization;
-        this.processType = processType;
         this.owner = owner;
-        this.dataSources = dataSources;
-        this.evaluateRule = evaluateRule;
     }
+
+    public abstract String getType();
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
index b060bc4..f0c6516 100644
--- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
+++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java
@@ -23,12 +23,9 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.commons.lang.StringUtils;
 import org.apache.griffin.core.util.JsonUtil;
 
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Transient;
+import javax.persistence.*;
 import java.io.IOException;
 import java.util.Map;
 
@@ -43,10 +40,18 @@ public class Rule extends AbstractAuditableEntity {
 
     private String dqType;
 
-    @Column(length = 1024)
+    @Column(length = 10 * 1024)
     private String rule;
 
     @JsonIgnore
+    private String name;
+
+    @JsonIgnore
+    private String description;
+
+    @JsonIgnore
+    @Access(AccessType.PROPERTY)
+    @Column(length = 10 * 1024)
     private String details;
 
     @Transient
@@ -86,15 +91,14 @@ public class Rule extends AbstractAuditableEntity {
         return details;
     }
 
-    public void setDetails(String details) {
+    private void setDetails(String details) throws IOException {
         this.details = details;
+        detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() {
+        });
     }
 
     @JsonProperty("details")
-    public Map<String, Object> getDetailsMap() throws IOException {
-        if (detailsMap == null && !StringUtils.isEmpty(details)) {
-            detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() {});
-        }
+    public Map<String, Object> getDetailsMap() {
         return detailsMap;
     }
 
@@ -104,6 +108,22 @@ public class Rule extends AbstractAuditableEntity {
         this.details = JsonUtil.toJson(details);
     }
 
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
     public Rule() {
     }