You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2023/01/12 11:08:15 UTC

[griffin] branch griffin-1.0.0-dev updated: Core: Process workflow with stage (#623)

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch griffin-1.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/griffin-1.0.0-dev by this push:
     new 279dcfcb Core: Process workflow with stage (#623)
279dcfcb is described below

commit 279dcfcbd33ed4d1523e96d9072ec0dcf04320c3
Author: dabuliud <wa...@gmail.com>
AuthorDate: Thu Jan 12 19:08:09 2023 +0800

    Core: Process workflow with stage (#623)
    
    * stage
    
    * master init
    
    * init
    
    * factory
    
    * Add task assign strategy
    
    Co-authored-by: Warden <wa...@gmail.com>
---
 .../core/api/context/DQCApplicationContext.java    |  17 ++
 .../{worker => api}/context/WorkerContext.java     |   9 +-
 .../griffin/core/api/dao/DQBusinessRuleDao.java    |  11 +
 .../apache/griffin/core/api/dao/DQContentDao.java  |   7 +
 .../core/api/dao/DQContentInstanceMapDao.java      |  10 +
 .../griffin/core/api/entity/DQResoueceEnum.java    |   5 +
 .../core/api/entity/GriffinDQBusinessRule.java     |  10 +
 .../griffin/core/api/entity/GriffinDQContent.java  |  20 ++
 .../api/entity/GriffinDQContentInstanceMap.java    |  10 +
 .../griffin/core/api/entity/GriffinDQTable.java    |  13 ++
 .../apache/griffin/core/api/utils/SpringUtils.java |  23 ++
 .../griffin/core/master/GriffinMasterMain.java     |  20 ++
 .../core/master/service/TaskAssignService.java     |  30 +++
 .../master/strategy/AbstractAssignStrategy.java    |  14 ++
 .../master/strategy/AssignStrategyFactory.java     |  14 ++
 .../core/master/strategy/LooperAssignStrategy.java |  15 ++
 .../core/master/transport/DQCConnection.java       |  27 +++
 .../griffin/core/worker/GriffinWorkerMain.java     |  20 ++
 .../core/worker/client/DispatcherClient.java       |   6 +
 .../griffin/core/worker/dao/DQInstanceDao.java     |   3 +
 .../apache/griffin/core/worker/dao/DQStageDao.java |  11 +
 .../apache/griffin/core/worker/dao/DQTaskDao.java  |   2 +-
 .../core/worker/driver/PrestoTemplateDriver.java   |   2 +-
 .../core/worker/driver/SparkTemplateDriver.java    |   2 +-
 .../griffin/core/worker/driver/TemplateDriver.java |   4 +-
 .../griffin/core/worker/entity/bo/DQInstance.java  |  32 ++-
 .../core/worker/entity/bo/task/DQBaseTask.java     |  48 +++--
 .../core/worker/entity/bo/task/DQHiveTask.java     |  14 +-
 .../core/worker/entity/bo/task/DQKafkaTask.java    |  11 +-
 .../core/worker/entity/enums/DQInstanceStatus.java |   7 +-
 .../core/worker/entity/enums/DQStageStatus.java    |  18 ++
 .../core/worker/entity/enums/DQStageTypeEnum.java  |   5 +
 .../core/worker/entity/enums/DQTaskStatus.java     |   4 +-
 .../griffin/core/worker/entity/pojo/DQTable.java   |  14 ++
 .../core/worker/entity/pojo/rule/DQAlertRule.java  |   9 +
 .../worker/entity/pojo/rule/DQEvaluateRule.java    |  17 ++
 .../core/worker/entity/pojo/rule/DQRecordRule.java |  40 ++++
 .../entity/pojo/template/DQRecordBaseTemplate.java |   2 +-
 .../entity/pojo/template/DQRecordTemplate.java     |  10 +
 .../worker/exception/StageSubmitException.java     |   8 +
 .../core/worker/factory/DQInstanceFactory.java     |  56 +++++
 .../core/worker/factory/DQStageFactory.java        |  52 +++++
 .../griffin/core/worker/factory/DQTaskFactory.java |  55 +++++
 .../griffin/core/worker/factory/TaskFactory.java   |   9 -
 .../core/worker/factory/TemplateDriverFactory.java |  23 +-
 .../worker/schedule/TaskDispatcherScheduler.java   | 233 ++++++++-------------
 .../core/worker/service/DQInstanceService.java     |  42 +++-
 .../core/worker/service/DQStageService.java        |  58 +++++
 .../griffin/core/worker/service/DQTaskService.java |  37 ++--
 .../core/worker/service/WorkCoreService.java       |   2 +-
 .../griffin/core/worker/stage/DQAbstractStage.java |  67 ++++++
 .../griffin/core/worker/stage/DQAlertStage.java    |  33 +++
 .../griffin/core/worker/stage/DQEvaluateStage.java |  50 +++++
 .../griffin/core/worker/stage/DQRecordStage.java   | 106 ++++++++++
 .../apache/griffin/core/worker/stage/DQStage.java  |   9 +
 .../griffin/core/worker/utils/DQDateUtils.java     |  14 ++
 .../griffin/core/worker/utils/ExpressionUtils.java |   7 +
 .../griffin/core/worker/utils/MsgSender.java       |  11 +
 core/src/main/resources/application.yml            |   4 +
 59 files changed, 1160 insertions(+), 252 deletions(-)

diff --git a/core/src/main/java/org/apache/griffin/core/api/context/DQCApplicationContext.java b/core/src/main/java/org/apache/griffin/core/api/context/DQCApplicationContext.java
new file mode 100644
index 00000000..d2a2fea3
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/context/DQCApplicationContext.java
@@ -0,0 +1,17 @@
+package org.apache.griffin.core.api.context;
+
+import lombok.Data;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Master Runtime Env
+ * Scope: Singleton
+ */
+@Component
+@Data
+public class DQCApplicationContext {
+    private Map<String, WorkerContext> context = new ConcurrentHashMap<>();
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java b/core/src/main/java/org/apache/griffin/core/api/context/WorkerContext.java
similarity index 89%
rename from core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
rename to core/src/main/java/org/apache/griffin/core/api/context/WorkerContext.java
index 52280f36..48e5be75 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
+++ b/core/src/main/java/org/apache/griffin/core/api/context/WorkerContext.java
@@ -1,26 +1,23 @@
-package org.apache.griffin.core.worker.context;
+package org.apache.griffin.core.api.context;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
 import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
- * 上下文信息  全局唯一
+ * Worker Runtime Env
+ * Scope: Singleton
  */
 @Component
 public class WorkerContext {
 
     private final List<DQInstance> WAITTING_TASK_QUEUE;
-//    public static final List<DQBaseTask> runningTaskIdQueue = Lists.newCopyOnWriteArrayList();
-    // runningTaskIdList = RECORDING_TASK_LIST + EVALUATING_TASK_LIST + ALERTING_TASK_LIST
     private final List<DQInstance> RECORDING_TASK_QUEUE;
     private final LinkedBlockingQueue<DQInstance> EVALUATING_TASK_QUEUE;
     private final LinkedBlockingQueue<DQInstance> ALERTING_TASK_QUEUE;
diff --git a/core/src/main/java/org/apache/griffin/core/api/dao/DQBusinessRuleDao.java b/core/src/main/java/org/apache/griffin/core/api/dao/DQBusinessRuleDao.java
new file mode 100644
index 00000000..4197235f
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/dao/DQBusinessRuleDao.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.api.dao;
+
+import org.apache.griffin.core.api.entity.GriffinDQBusinessRule;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Component
+public interface DQBusinessRuleDao {
+    List<GriffinDQBusinessRule> getListByDqcId(Long dqcId);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/dao/DQContentDao.java b/core/src/main/java/org/apache/griffin/core/api/dao/DQContentDao.java
new file mode 100644
index 00000000..73849142
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/dao/DQContentDao.java
@@ -0,0 +1,7 @@
+package org.apache.griffin.core.api.dao;
+
+import org.apache.griffin.core.api.entity.GriffinDQContent;
+
+public interface DQContentDao {
+    GriffinDQContent getById(Long id);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/dao/DQContentInstanceMapDao.java b/core/src/main/java/org/apache/griffin/core/api/dao/DQContentInstanceMapDao.java
new file mode 100644
index 00000000..2ea57545
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/dao/DQContentInstanceMapDao.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.api.dao;
+
+import org.apache.griffin.core.api.entity.GriffinDQContentInstanceMap;
+import org.springframework.stereotype.Component;
+
+@Component
+public interface DQContentInstanceMapDao {
+
+    GriffinDQContentInstanceMap getContentInstanceMapByInstanceId(Long id);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/DQResoueceEnum.java b/core/src/main/java/org/apache/griffin/core/api/entity/DQResoueceEnum.java
new file mode 100644
index 00000000..ba8fd35a
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/DQResoueceEnum.java
@@ -0,0 +1,5 @@
+package org.apache.griffin.core.api.entity;
+
+public enum DQResoueceEnum {
+    HIVE, KAFKA;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQBusinessRule.java b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQBusinessRule.java
new file mode 100644
index 00000000..7fbd00b5
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQBusinessRule.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.api.entity;
+
+import lombok.Data;
+
+@Data
+public class GriffinDQBusinessRule {
+
+    private Long id;
+    private Long dqcId;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContent.java b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContent.java
new file mode 100644
index 00000000..255f4f7b
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContent.java
@@ -0,0 +1,20 @@
+package org.apache.griffin.core.api.entity;
+
+import lombok.Data;
+
+/**
+ * DQContent: one table has only one dqcContent
+ */
+@Data
+public class GriffinDQContent {
+    // id
+    private Long id;
+    private String owner;
+
+
+    // table ID
+    private Long resourceId;
+    // tableName
+    private String tableName;
+    private DQResoueceEnum resoueceEnum;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContentInstanceMap.java b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContentInstanceMap.java
new file mode 100644
index 00000000..a7f7b711
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContentInstanceMap.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.api.entity;
+
+import lombok.Data;
+
+@Data
+public class GriffinDQContentInstanceMap {
+    private Long id;
+    private Long instanceId;
+    private Long dqcId;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQTable.java b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQTable.java
new file mode 100644
index 00000000..4da913b6
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQTable.java
@@ -0,0 +1,13 @@
+package org.apache.griffin.core.api.entity;
+
+import lombok.Data;
+
+/**
+ * Table info
+ */
+@Data
+public class GriffinDQTable {
+    private Long id;
+    private String tableName;
+    private DQResoueceEnum resoueceEnum;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/utils/SpringUtils.java b/core/src/main/java/org/apache/griffin/core/api/utils/SpringUtils.java
new file mode 100644
index 00000000..ad4e1059
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/utils/SpringUtils.java
@@ -0,0 +1,23 @@
+package org.apache.griffin.core.api.utils;
+
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanNotOfRequiredTypeException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+public class SpringUtils implements ApplicationContextAware {
+    // Spring context
+    private static ApplicationContext applicationContext;
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        SpringUtils.applicationContext = applicationContext;
+    }
+
+    public static <T> T getObject(String name, Class<T> clazz) {
+        Object bean = applicationContext.getBean(name);
+        if (clazz.isInstance(bean)) return clazz.cast(bean);
+        throw new BeanNotOfRequiredTypeException(name, clazz, bean.getClass());
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/GriffinMasterMain.java b/core/src/main/java/org/apache/griffin/core/master/GriffinMasterMain.java
new file mode 100644
index 00000000..ed009a96
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/GriffinMasterMain.java
@@ -0,0 +1,20 @@
+package org.apache.griffin.core.master;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+@EnableScheduling
+public class GriffinMasterMain {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(GriffinMasterMain.class);
+
+    public static void main(String[] args) {
+        SpringApplication.run(GriffinMasterMain.class, args);
+        LOGGER.info("application started");
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java b/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
new file mode 100644
index 00000000..a324973d
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
@@ -0,0 +1,30 @@
+package org.apache.griffin.core.master.service;
+
+import org.apache.griffin.core.master.strategy.AbstractAssignStrategy;
+import org.apache.griffin.core.master.strategy.AssignStrategyFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import javax.annotation.PostConstruct;
+
+
+@Component
+public class TaskAssignService {
+
+    @Value("${task.assign.strategy}")
+    private String assignTaskStrategtClass;
+
+    private AbstractAssignStrategy strategy;
+
+    @PostConstruct
+    public void init() {
+        strategy = AssignStrategyFactory.getStrategy(assignTaskStrategtClass);
+        Assert.notNull(strategy, "Task Assign Strategy init failed");
+    }
+
+
+    public String assignTask(long instanceId) {
+        return strategy.assignTask(instanceId);
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/strategy/AbstractAssignStrategy.java b/core/src/main/java/org/apache/griffin/core/master/strategy/AbstractAssignStrategy.java
new file mode 100644
index 00000000..4cd844a7
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/strategy/AbstractAssignStrategy.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.core.master.strategy;
+
+import org.apache.griffin.core.api.context.DQCApplicationContext;
+
+public abstract class AbstractAssignStrategy {
+
+    protected DQCApplicationContext dqcApplicationContext;
+
+    public AbstractAssignStrategy(DQCApplicationContext dqcApplicationContext) {
+        this.dqcApplicationContext = dqcApplicationContext;
+    }
+
+    public abstract String assignTask(long instanceId);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/strategy/AssignStrategyFactory.java b/core/src/main/java/org/apache/griffin/core/master/strategy/AssignStrategyFactory.java
new file mode 100644
index 00000000..37cc8e7f
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/strategy/AssignStrategyFactory.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.core.master.strategy;
+
+public class AssignStrategyFactory {
+
+    public static AbstractAssignStrategy getStrategy(String className) {
+        try {
+            Class clazz = Class.forName(className);
+            return (AbstractAssignStrategy) clazz.newInstance();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/strategy/LooperAssignStrategy.java b/core/src/main/java/org/apache/griffin/core/master/strategy/LooperAssignStrategy.java
new file mode 100644
index 00000000..4d85fc4a
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/strategy/LooperAssignStrategy.java
@@ -0,0 +1,15 @@
+package org.apache.griffin.core.master.strategy;
+
+import org.apache.griffin.core.api.context.DQCApplicationContext;
+
+public class LooperAssignStrategy extends AbstractAssignStrategy {
+
+    public LooperAssignStrategy(DQCApplicationContext dqcApplicationContext) {
+        super(dqcApplicationContext);
+    }
+
+    @Override
+    public String assignTask(long instanceId) {
+        return null;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
new file mode 100644
index 00000000..0f7b902a
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
@@ -0,0 +1,27 @@
+package org.apache.griffin.core.master.transport;
+
+import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.Future;
+
+/**
+ * the obj has a socketChannel to worker node
+ */
+public class DQCConnection {
+    // worker hostName
+    private String hostName;
+    // worker hostIP
+    private String hostIP;
+    // worker hostPort
+    private int hostPort;
+    // todo
+    private ServerSocketChannel channel;
+
+    /**
+     * Send msg async
+     * @param msg message
+     * @return Future
+     */
+    public Future send(byte[] msg) {
+        return null;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/GriffinWorkerMain.java b/core/src/main/java/org/apache/griffin/core/worker/GriffinWorkerMain.java
new file mode 100644
index 00000000..f19e7a5d
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/GriffinWorkerMain.java
@@ -0,0 +1,20 @@
+package org.apache.griffin.core.worker;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+@EnableScheduling
+public class GriffinWorkerMain {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(GriffinWorkerMain.class);
+
+    public static void main(String[] args) {
+        SpringApplication.run(GriffinWorkerMain.class, args);
+        LOGGER.info("application started");
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
index 5422f35e..dbd4bd39 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
@@ -3,6 +3,7 @@ package org.apache.griffin.core.worker.client;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.griffin.core.worker.entity.bo.DQInstance;
 import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
 import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
 import org.springframework.stereotype.Service;
 
@@ -17,6 +18,11 @@ public class DispatcherClient {
     }
 
 
+    public boolean canSubmitToSpecEngine(DQEngineEnum engine) {
+        // todo
+        return true;
+    }
+
     public JobStatusResponse getJobStatus(JobStatusRequest jobStatusRequest) {
         // todo parse job status
         return null;
diff --git a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
index 11dd93f9..544adb36 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
@@ -6,6 +6,9 @@ import org.springframework.stereotype.Component;
 @Component
 public interface DQInstanceDao {
 
+    DQInstance getById(Long id);
+
     void updateDQInstanceStatus(DQInstance instance, int status);
 
+    void insert(DQInstance instance);
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/dao/DQStageDao.java b/core/src/main/java/org/apache/griffin/core/worker/dao/DQStageDao.java
new file mode 100644
index 00000000..0e37c0d8
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQStageDao.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.dao;
+
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQStage;
+
+public interface DQStageDao {
+    void updateDQStageStatus(DQStage stage, int status);
+
+    void insert(DQAbstractStage stage);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
index c94074cb..f87fbc46 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
@@ -11,6 +11,6 @@ import java.util.List;
 public interface DQTaskDao {
 
     void updateDQTaskListStatus(List<DQBaseTask> tasks, int status);
-    void updateDQTaskListStatus(DQBaseTask task, int status);
+    void updateDQTaskStatus(DQBaseTask task, int status);
 
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
index 61ab2bdb..b5d2cf62 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
@@ -7,7 +7,7 @@ import java.util.Map;
 
 public class PrestoTemplateDriver extends TemplateDriver{
     @Override
-    public List<String> getRecordSql(DQRecordTemplate template, Map<String, String> params) {
+    public String getRecordSql(DQRecordTemplate template, Map<String, String> params) {
         return null;
     }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
index eaf17b4a..4d500832 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
@@ -7,7 +7,7 @@ import java.util.Map;
 
 public class SparkTemplateDriver extends TemplateDriver {
     @Override
-    public List<String> getRecordSql(DQRecordTemplate template, Map<String, String> params) {
+    public String getRecordSql(DQRecordTemplate template, Map<String, String> params) {
         return null;
     }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
index 85a23390..9e2aab97 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
@@ -18,9 +18,9 @@ public abstract class TemplateDriver {
     /**
      * 拼出的SQL 返回值 必须是 <ruleId, Partition, Metric>  这样可以做SQL合并
      */
-    public abstract List<String> getRecordSql(DQRecordTemplate template, Map<String, String> params);
+    public abstract String getRecordSql(DQRecordTemplate template, Map<String, String> params);
 
-    public List<String> getRecordSql(DQEngineEnum engine, DQRecordTemplate template, Map<String, String> params) {
+    public String getRecordSql(DQEngineEnum engine, DQRecordTemplate template, Map<String, String> params) {
         TemplateDriver templateDriver = templateDriverFactory.getTemplateDrvier(engine);
         return templateDriver.getRecordSql(template, params);
     }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
index af5ff201..e4608f9c 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
@@ -5,6 +5,9 @@ import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
 import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
 import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
 import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQStage;
 
 import java.util.List;
 
@@ -15,6 +18,9 @@ import java.util.List;
 @Data
 public class DQInstance {
     private Long id;
+
+    private Long dqcId;
+
     // 实例状态
     private DQInstanceStatus status;
     // 记录状态年龄  状态更新是重置
@@ -24,6 +30,12 @@ public class DQInstance {
     //
     private long scanTimeStamp = 0L;
 
+    protected DQAlertRule dqAlertRule;
+
+    private DQAbstractStage recordingStage;
+    private DQAbstractStage evaluatingStage;
+    private DQAbstractStage alertingStage;
+
 
 
     public void setStatus(DQInstanceStatus status) {
@@ -44,16 +56,7 @@ public class DQInstance {
         return statusAge > 5;
     }
 
-    public boolean hasTaskToSubmit() {
-        boolean hasTaskToSubmit = false;
-        for (DQBaseTask dqBaseTask : subTaskList) {
-            if (dqBaseTask.getStatus() == DQTaskStatus.WAITTING) {
-                hasTaskToSubmit = true;
-                break;
-            }
-        }
-        return hasTaskToSubmit;
-    }
+
 
     public boolean isFinishRecord() {
         boolean isFinishRecord = true;
@@ -65,13 +68,4 @@ public class DQInstance {
         }
         return isFinishRecord;
     }
-
-    public void doEvaluteTask() {
-        subTaskList.forEach(DQBaseTask::evaluate);
-
-    }
-
-    public void doAlertTask() {
-        // 收敛告警信息 进行告警
-    }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
index fd01f036..fffc684d 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
@@ -3,13 +3,11 @@ package org.apache.griffin.core.worker.entity.bo.task;
 import com.beust.jcommander.internal.Lists;
 import lombok.Data;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.api.context.WorkerContext;
 import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
 import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
 import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
 import org.apache.griffin.core.worker.entity.pojo.Metric;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
 import org.apache.griffin.core.worker.entity.pojo.rule.DQEvaluateRule;
 import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
 
@@ -20,42 +18,50 @@ import java.util.List;
  *   一个完整的任务包含
  *      record
  *      evaluate
- *      alert
  */
 @Data
 public abstract class DQBaseTask {
 
-    private long id;
-    private String owner;
-    private WorkerContext wc;
-    private DQEngineEnum engine;
-    private DQRecordRule recordRule;
-    private DQEvaluateRule dqEvaluateRule;
-    private DQAlertRule dqAlertRule;
-    private DQTaskStatus status;
-    private List<JobStatus> jobStatusList;
-    private List<Metric> metricList = Lists.newArrayList();
+    protected long id;
+    protected String owner;
+    protected WorkerContext wc;
+    protected DQEngineEnum engine;
+    protected DQRecordRule recordRule;
+    protected DQEvaluateRule dqEvaluateRule;
+    protected DQTaskStatus status;
+    protected List<JobStatus> jobStatusList;
+    protected List<Metric> metricList = Lists.newArrayList();
+    protected boolean needAlert = false;
+    protected Long businessTime;
     // 记录状态年龄  状态更新是重置
     private int statusAge;
     // 生成recordsql和 模板 + 参数 有关系
     // 生成SQL部分希望交给模板来做
     public List<Pair<Long, String>> record() {
         // before
-        List<Pair<Long, String>> partitionTimeAndSqlList = doRecord();
+        List<Pair<Long, String>> partitionTimeAndSqlList = getRecordInfo();
         // after
         return partitionTimeAndSqlList;
     }
+
     public void evaluate() {
-        doEvaluate();
+        needAlert = doEvaluate();
     }
-    public void alert() {
-        doAlert();
+
+    public String alert() {
+        return getAlertMsg();
     }
 
     // partitionTime And sql
-    public abstract List<Pair<Long, String>> doRecord();
-    public abstract boolean doEvaluate();
-    public abstract boolean doAlert();
+    public abstract List<Pair<Long, String>> getRecordInfo();
+
+    public boolean doEvaluate() {
+        return dqEvaluateRule.execute(metricList);
+    }
+
+    public String getAlertMsg() {
+        return "xxx is error";
+    }
 
     public void setStatus(DQTaskStatus status) {
         if (this.status != status) resetStatusAge();
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
index 62c6ef3b..be6aee5e 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
@@ -1,6 +1,7 @@
 package org.apache.griffin.core.worker.entity.bo.task;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
 
 import java.util.List;
 
@@ -10,17 +11,8 @@ import java.util.List;
 public class DQHiveTask extends DQBaseTask {
 
     @Override
-    public List<Pair<Long, String>> doRecord() {
-        return null;
+    public List<Pair<Long, String>> getRecordInfo() {
+        return recordRule.getPartitionAndRuleIdList(businessTime, engine);
     }
 
-    @Override
-    public boolean doEvaluate() {
-        return false;
-    }
-
-    @Override
-    public boolean doAlert() {
-        return false;
-    }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
index 356c0fac..c92906ce 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
@@ -6,17 +6,8 @@ import java.util.List;
 
 public class DQKafkaTask extends DQBaseTask {
     @Override
-    public List<Pair<Long, String>> doRecord() {
+    public List<Pair<Long, String>> getRecordInfo() {
         return null;
     }
 
-    @Override
-    public boolean doEvaluate() {
-        return false;
-    }
-
-    @Override
-    public boolean doAlert() {
-        return false;
-    }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
index 6e70721b..cd626a7e 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
@@ -3,11 +3,12 @@ package org.apache.griffin.core.worker.entity.enums;
 public enum DQInstanceStatus {
     ACCEPTED(0),
     WAITTING(1),
-    SUBMITTING(2), // 任务提交中
-    RUNNING(3),
+//    SUBMITTING(2), // 任务提交中
+//    RUNNING(3),
     RECORDING(4),
     EVALUATING(5),
-    EVALUATE_ALERTING(6), // Metric 需要告警
+    ALERTING(5),
+//    EVALUATE_ALERTING(6), // Metric 需要告警
     FAILED_ALERTING(7),   // 任务执行失败需要告警
     SUCCESS(8),
     FAILED(9);
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageStatus.java b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageStatus.java
new file mode 100644
index 00000000..a971c8e5
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageStatus.java
@@ -0,0 +1,18 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQStageStatus {
+    INIT(0),
+    RUNNABLE(1),
+    RUNNING(2),
+    FINISH(3);
+
+    private final int code;
+
+    DQStageStatus(int code) {
+        this.code = code;
+    }
+
+    public int getCode() {
+        return code;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageTypeEnum.java b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageTypeEnum.java
new file mode 100644
index 00000000..93057a05
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageTypeEnum.java
@@ -0,0 +1,5 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQStageTypeEnum {
+    RECORD, EVALUATE, ALERT;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
index ab5e0716..11235a11 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
@@ -6,8 +6,8 @@ public enum DQTaskStatus {
     RECORDED(1),
     EVALUATING(2),
     EVALUATED(2),
-    ALERTING(3),
-    ALERTED(3),
+//    ALERTING(3),
+//    ALERTED(3),
     SUCCESS(4),
     FAILED(5);
 
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java
new file mode 100644
index 00000000..7be0c5ac
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.core.worker.entity.pojo;
+
+import lombok.Data;
+
+import java.util.concurrent.TimeUnit;
+
+@Data
+public class DQTable {
+    private Long id;
+    private String dbName;
+    private String tableName;
+    private String partition;
+    private TimeUnit unit;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
index 86d4fca8..37dd63f1 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
@@ -1,4 +1,13 @@
 package org.apache.griffin.core.worker.entity.pojo.rule;
 
+import java.util.List;
+
 public class DQAlertRule {
+    public int getSendType() {
+        return 0;
+    }
+
+    public List<String> getReceivers() {
+        return null;
+    }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
index 5fc51854..2ee5ec6a 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
@@ -1,4 +1,21 @@
 package org.apache.griffin.core.worker.entity.pojo.rule;
 
+import org.apache.griffin.core.worker.entity.pojo.Metric;
+import org.apache.griffin.core.worker.utils.ExpressionUtils;
+
+import java.util.List;
+
 public class DQEvaluateRule {
+
+    private String expression;
+
+    public boolean execute(List<Metric> metricList) {
+        for (Metric metric : metricList) {
+            double metricValue = metric.getMetric();
+            if (ExpressionUtils.evaluate(expression, metricValue)) {
+                return true;
+            }
+        }
+        return false;
+    }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
index 94deba4b..3cf5605d 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
@@ -1,9 +1,49 @@
 package org.apache.griffin.core.worker.entity.pojo.rule;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import lombok.Data;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.apache.griffin.core.worker.entity.pojo.DQTable;
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordBaseTemplate;
 import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.utils.DQDateUtils;
+
+import java.util.List;
+import java.util.Map;
 
 @Data
 public class DQRecordRule {
+
+    private Long id;
+    private Map<String, String> baseTemplateParams;
     private DQRecordTemplate template;
+    private DQTable table;
+
+
+    public List<Pair<Long, String>> getPartitionAndRuleIdList(Long businessTime, DQEngineEnum engine) {
+        // prepare params
+        List<Pair<Long, String>> partitionAndRuleIdList = Lists.newArrayList();
+        if (template instanceof DQRecordBaseTemplate) {
+            // base template
+            baseTemplateParams.put("partition_sql_info", DQDateUtils.praseBussinesstimeToPartitionSQL(businessTime, table.getPartition()));
+            String recordSql = template.getRecordSql(baseTemplateParams, engine);
+            partitionAndRuleIdList.add(Pair.of(businessTime, recordSql));
+        } else {
+            List<Integer> offsetList = getOffsetList();
+            offsetList.forEach(offset -> {
+                Map<String, String> templateParams = Maps.newHashMap(baseTemplateParams);
+                Long offsetBusinessTime = DQDateUtils.offsetTime(businessTime, offset, table.getUnit());
+                baseTemplateParams.put("partition_sql_info", DQDateUtils.praseBussinesstimeToPartitionSQL(offsetBusinessTime, table.getPartition()));
+                String recordSql = template.getRecordSql(templateParams, engine);
+                partitionAndRuleIdList.add(Pair.of(offsetBusinessTime, recordSql));
+            });
+        }
+        return partitionAndRuleIdList;
+    }
+
+    private List<Integer> getOffsetList() {
+        return null;
+    }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
index fbe51281..44646da8 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
@@ -1,4 +1,4 @@
 package org.apache.griffin.core.worker.entity.pojo.template;
 
-public class DQRecordBaseTemplate {
+public class DQRecordBaseTemplate extends DQRecordTemplate {
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
index d4b12047..49db5868 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
@@ -1,5 +1,15 @@
 package org.apache.griffin.core.worker.entity.pojo.template;
 
+import org.apache.griffin.core.worker.driver.TemplateDriver;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+
+import java.util.Map;
+
 public class DQRecordTemplate {
 
+    private TemplateDriver driver;
+
+    public String getRecordSql(Map<String, String> templateParams, DQEngineEnum engine) {
+        return driver.getRecordSql(engine, this, templateParams);
+    }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/exception/StageSubmitException.java b/core/src/main/java/org/apache/griffin/core/worker/exception/StageSubmitException.java
new file mode 100644
index 00000000..07a3e05e
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/exception/StageSubmitException.java
@@ -0,0 +1,8 @@
+package org.apache.griffin.core.worker.exception;
+
+public class StageSubmitException extends Exception {
+
+    public StageSubmitException(String message) {
+        super(message);
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
new file mode 100644
index 00000000..386bd8b9
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
@@ -0,0 +1,56 @@
+package org.apache.griffin.core.worker.factory;
+
+import org.apache.griffin.core.api.dao.DQBusinessRuleDao;
+import org.apache.griffin.core.api.dao.DQContentDao;
+import org.apache.griffin.core.api.entity.GriffinDQBusinessRule;
+import org.apache.griffin.core.api.entity.GriffinDQContent;
+import org.apache.griffin.core.worker.dao.DQInstanceDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQStageTypeEnum;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Component
+public class DQInstanceFactory {
+
+    @Autowired
+    private DQInstanceDao dqInstanceDao;
+    @Autowired
+    private DQContentDao dqContentDao;
+    @Autowired
+    private DQBusinessRuleDao dqBusinessRuleDao;
+    @Autowired
+    private DQStageFactory dqStageFactory;
+    @Autowired
+    private DQTaskFactory dqTaskFactory;
+
+    public DQInstance constructInstance(Long id, Long dqcId) {
+        DQInstance instance = new DQInstance();
+        instance.setId(id);
+        instance.setDqcId(dqcId);
+        GriffinDQContent griffinDQContent = dqContentDao.getById(dqcId);
+        // construct taskL
+        List<GriffinDQBusinessRule> businessRuleList = dqBusinessRuleDao.getListByDqcId(dqcId);
+        List<DQBaseTask> subTaskList = dqTaskFactory.constructTasks(griffinDQContent.getResoueceEnum(), businessRuleList);
+        instance.setSubTaskList(subTaskList);
+        // construct record stage
+        DQAbstractStage reordStage = dqStageFactory.constructStage(DQStageTypeEnum.RECORD, instance);
+        instance.setRecordingStage(reordStage);
+        // construct check stage
+        DQAbstractStage evaluateStage = dqStageFactory.constructStage(DQStageTypeEnum.EVALUATE, instance);
+        instance.setEvaluatingStage(evaluateStage);
+        // construct alert stage
+        DQAbstractStage alertStage = dqStageFactory.constructStage(DQStageTypeEnum.ALERT, instance);
+        instance.setAlertingStage(alertStage);
+        dqInstanceDao.insert(instance);
+        return instance;
+    }
+
+    public DQInstance recoveryInstance(DQInstance instance) {
+        return null;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
new file mode 100644
index 00000000..519c7162
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
@@ -0,0 +1,52 @@
+package org.apache.griffin.core.worker.factory;
+
+import org.apache.griffin.core.api.utils.SpringUtils;
+import org.apache.griffin.core.worker.dao.DQStageDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.enums.DQStageTypeEnum;
+import org.apache.griffin.core.worker.service.DQStageService;
+import org.apache.griffin.core.worker.service.DQTaskService;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQAlertStage;
+import org.apache.griffin.core.worker.stage.DQEvaluateStage;
+import org.apache.griffin.core.worker.stage.DQRecordStage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DQStageFactory {
+
+    private static final Logger log = LoggerFactory.getLogger(DQStageFactory.class);
+
+    @Autowired
+    private DQStageDao dqStageDao;
+
+    public DQAbstractStage constructStage(DQStageTypeEnum stageTypeEnum, DQInstance instance) {
+        DQAbstractStage stage = null;
+        switch (stageTypeEnum) {
+            case RECORD:
+                stage = new DQRecordStage();
+                break;
+            case EVALUATE:
+                stage = new DQEvaluateStage();
+                break;
+            case ALERT:
+                stage = new DQAlertStage();
+                break;
+            default:
+                break;
+        }
+        if (stage == null) {
+            log.error("DQStageFactory - Unknown Stage Type: {}", stageTypeEnum);
+            return stage;
+        }
+        stage.setInstance(instance);
+        stage.setSubTaskList(instance.getSubTaskList());
+        stage.setDqTaskService(SpringUtils.getObject("dqTaskService", DQTaskService.class));
+        stage.setDqStageService(SpringUtils.getObject("dqStageService", DQStageService.class));
+        dqStageDao.insert(stage);
+        return stage;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
new file mode 100644
index 00000000..dcb2ce07
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
@@ -0,0 +1,55 @@
+package org.apache.griffin.core.worker.factory;
+
+import org.apache.griffin.core.api.entity.DQResoueceEnum;
+import org.apache.griffin.core.api.entity.GriffinDQBusinessRule;
+import org.apache.griffin.core.worker.dao.DQTaskDao;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Component
+public class DQTaskFactory {
+
+    private static final Logger log = LoggerFactory.getLogger(DQTaskFactory.class);
+
+    @Autowired
+    private DQTaskDao dqTaskDao;
+
+    public List<DQBaseTask> constructTasks(DQResoueceEnum resoueceEnum,
+                                           List<GriffinDQBusinessRule> businessRuleList) {
+        switch (resoueceEnum) {
+            // construct hive
+            case HIVE: return constructHiveTasks(businessRuleList);
+            // construct kafka
+            case KAFKA: return constructKafkaTasks(businessRuleList);
+        }
+        return null;
+    }
+
+    private List<DQBaseTask> constructHiveTasks(List<GriffinDQBusinessRule> businessRuleList) {
+        return businessRuleList.stream()
+                .map(this::constructHiveTask)
+                .collect(Collectors.toList());
+    }
+
+    private DQBaseTask constructHiveTask(GriffinDQBusinessRule businessRule) {
+        return null;
+    }
+
+    private List<DQBaseTask> constructKafkaTasks(List<GriffinDQBusinessRule> businessRuleList) {
+        return businessRuleList.stream()
+                .map(this::constructKafkaTask)
+                .collect(Collectors.toList());
+    }
+
+    private DQBaseTask constructKafkaTask(GriffinDQBusinessRule businessRule) {
+        // todo
+        return null;
+    }
+
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java
deleted file mode 100644
index b6b79f32..00000000
--- a/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.griffin.core.worker.factory;
-
-public class TaskFactory {
-
-
-
-
-
-}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
index 32970020..bc1c280a 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
@@ -1,12 +1,33 @@
 package org.apache.griffin.core.worker.factory;
 
+import com.google.common.collect.Maps;
+import org.apache.griffin.core.worker.driver.PrestoTemplateDriver;
+import org.apache.griffin.core.worker.driver.SparkTemplateDriver;
 import org.apache.griffin.core.worker.driver.TemplateDriver;
 import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
 import org.springframework.stereotype.Service;
 
+import java.util.Map;
+
 @Service
 public class TemplateDriverFactory {
+    Map<DQEngineEnum, TemplateDriver> engineDriverMap = Maps.newConcurrentMap();
+
     public TemplateDriver getTemplateDrvier(DQEngineEnum engine) {
-        return null;
+        TemplateDriver templateDriver = engineDriverMap.get(engine);
+        if (templateDriver != null) return templateDriver;
+
+        switch (engine) {
+            case PRESTO:
+                templateDriver =  new PrestoTemplateDriver();
+                break;
+            case SPARK:
+                templateDriver = new SparkTemplateDriver();
+                break;
+            default:
+                break;
+        }
+        engineDriverMap.put(engine, templateDriver);
+        return templateDriver;
     }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
index 79759a02..d29bcb81 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
@@ -3,13 +3,15 @@ package org.apache.griffin.core.worker.schedule;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.api.context.WorkerContext;
 import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
 import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
-import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+import org.apache.griffin.core.worker.exception.StageSubmitException;
 import org.apache.griffin.core.worker.service.DQInstanceService;
-import org.apache.griffin.core.worker.service.DQTaskService;
+import org.apache.griffin.core.worker.service.DQStageService;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQStage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -19,11 +21,12 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PostConstruct;
 import java.util.List;
 import java.util.Queue;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 /**
- * 任务执行调度期 和 dispatcher交互
+ * schedule task
  */
 @Component
 public class TaskDispatcherScheduler {
@@ -31,7 +34,7 @@ public class TaskDispatcherScheduler {
 
     private WorkerContext wc;
     private DQInstanceService dqInstanceService;
-    private DQTaskService dqTaskService;
+    private DQStageService dqStageService;
 
     @Autowired
     public void setWc(WorkerContext wc) {
@@ -42,41 +45,40 @@ public class TaskDispatcherScheduler {
         this.dqInstanceService = dqInstanceService;
     }
     @Autowired
-    public void setDqTaskService(DQTaskService dqTaskService) {
-        this.dqTaskService = dqTaskService;
+    public void setDqStageService(DQStageService dqStageService) {
+        this.dqStageService = dqStageService;
     }
 
     @PostConstruct
     public void startEvaluetingAndAlertThread() {
-        // todo 用线程池运行
+        // todo submit to thread pool
         scanAlertingTask();
         scanEvaluatingTask();
     }
 
     /**
-     * 进行任务调度
+     * schedule task
+     * put task to the queue of recording task
      */
     @Scheduled(fixedDelay = 5 * 1000L)
     public void doTaskDispatcherScheduler() {
         log.info("doTaskDispatcherScheduler start.");
         List<DQInstance> waittingToRemoveFromWaitingList = Lists.newArrayList();
         Queue<DQInstance> waitingToRecordingDQInstanceQueue = Queues.newPriorityBlockingQueue(wc.getWAITTING_TASK_QUEUE());
-        // 从waitting队列获取任务
+
         try {
             while (true) {
                 try {
                     DQInstance dqInstance = waitingToRecordingDQInstanceQueue.poll();
-                    // 队列中无元素 结束循环
+                    // queue is empty, quit
                     if (dqInstance == null) break;
                     if (DQInstanceStatus.ACCEPTED != dqInstance.getStatus()) {
-                        // 非初始状态
-                        // 同步数据库状态 缓存中的实例状态可能不是最新的 从数据库构建最新的实例 替换缓存中的实例
+                        // State is not init, sync from database and reassign it
                         dqInstance = dqInstanceService.getById(dqInstance.getId());
-                        // 根据状态放到对应队列
+                        // assign task by status
                         waittingToRemoveFromWaitingList.add(dqInstance);
                     } else {
-                        // 提交到recording队列
-                        // 开始提交任务 放到recording队列中
+                        // normal, update status and remove from queue, then put it to the queue of recording task
                         if (dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.WAITTING)) {
                             waittingToRemoveFromWaitingList.add(dqInstance);
                         }
@@ -90,9 +92,9 @@ public class TaskDispatcherScheduler {
             // todo
             log.error("scanRecordingTask failed, ex:", e);
         } finally {
-            // 根据状态分发到指定队列
+            // assign task by status
             waittingToRemoveFromWaitingList.forEach(this::offerToSpecQueueByStatus);
-            // 从 原队列移除 队列移除
+            // remove from queue
             if (CollectionUtils.isNotEmpty(waittingToRemoveFromWaitingList)) wc.removeAll(wc.getWAITTING_TASK_QUEUE(), waittingToRemoveFromWaitingList);
         }
         log.info("doTaskDispatcherScheduler end.");
@@ -102,15 +104,16 @@ public class TaskDispatcherScheduler {
         DQInstanceStatus status = instance.getStatus();
         switch (status) {
             case WAITTING:
-            case RUNNING:
-            case SUBMITTING:
+//            case RUNNING:
+//            case SUBMITTING:
             case RECORDING:
                 wc.offerToRecordingTaskQueue(instance);
                 break;
             case EVALUATING:
                 wc.offerToEvaluatingTaskQueue(instance);
                 break;
-            case EVALUATE_ALERTING:
+//            case EVALUATE_ALERTING:
+            case ALERTING:
             case FAILED_ALERTING:
                 wc.offerToAlertingTaskQueue(instance);
                 break;
@@ -121,7 +124,7 @@ public class TaskDispatcherScheduler {
                 wc.addSuccessDQInstanceInfo(instance);
                 break;
             default:
-                // todo 未知状态 丢弃任务
+                // todo Unknown state Drop
                 log.warn("Unknown status, id : {}, status : {}, instance: {}", instance.getId(), status, instance);
                 break;
         }
@@ -154,138 +157,88 @@ public class TaskDispatcherScheduler {
     }
 
     private void processRecordingInstance(DQInstance dqInstance, List<DQInstance> waittingToRemoveFromRecordingList) {
-        // 检查当前环境是否有可以提交任务到dispatcher(并发度限制  需要根据提交的引擎计算)
-        DQInstanceStatus instanceStatus = dqInstance.getStatus();
-        List<DQBaseTask> subTaskList = dqInstance.getSubTaskList();
-        switch (instanceStatus) {
-            case ACCEPTED:
-            case WAITTING:
-                dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.SUBMITTING);
-                submitTaskToDispatcher(subTaskList);
-                if (!dqInstance.hasTaskToSubmit()) {
-                    dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.RUNNING);
-                }
-                break;
-            case SUBMITTING:
-                submitTaskToDispatcher(subTaskList);
-                // 检查是否所有任务都已经提交
-                if (!dqInstance.hasTaskToSubmit()) {
-                    dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.RUNNING);
-                }
-                break;
-            case RUNNING:
-                // 检查并更新任务状态
-                checkJobStatus(subTaskList);
-                if (dqInstance.isFinishRecord()) {
-                    // record 任务都完成了  准备移除
-                    waittingToRemoveFromRecordingList.add(dqInstance);
+        try {
+            DQAbstractStage recordingStage = dqInstance.getRecordingStage();
+            DQStageStatus stageStatus = recordingStage.getStatus();
+            if (stageStatus == DQStageStatus.RUNNABLE) {
+                if (!dqStageService.submitStage(recordingStage)) {
+                    throw new StageSubmitException("Submit stage failed!, instance id: " + dqInstance.getId());
+                } else {
+                    dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.RECORDING);
                 }
-                break;
-            default:
-                break;
-        }
-    }
-
-    private void checkJobStatus(List<DQBaseTask> subTaskList) {
-        subTaskList.forEach(task -> dqTaskService.checkJobStatus(task));
-    }
-
-    private void submitTaskToDispatcher(List<DQBaseTask> subTaskList) {
-        // 遍历 recording tasks 检查状态进行更新
-        subTaskList.forEach(task -> {
-            DQTaskStatus taskStatus = task.getStatus();
-            switch (taskStatus) {
-                case WAITTING:
-                    // 提交任务
-                    doSubmitTaskToDispatcher(task);
-                    if (task.isFailed()) {
-                        // 提交一直失败
-                        dqTaskService.updateTaskStatus(task, DQTaskStatus.FAILED);
-                    }
-                    break;
-                case RECORDING:
-                    // 查询结果
-                    boolean isFinished = dqTaskService.checkJobStatus(task);
-                    if (isFinished) {
-                        // 任务结束, 设置任务状态为record结束
-                        dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDED);
-                    }
-                    break;
-                default:
-                    // 其余状态不处理
-                    break;
+            } else if (stageStatus == DQStageStatus.FINISH) {
+                // if there is one task success, the instance should be EVALUATING;
+                // if all tasks are failed, the instance should be FAILED_ALERTING;
+                DQInstanceStatus instanceStatus = recordingStage.hasSuccess()? DQInstanceStatus.EVALUATING: DQInstanceStatus.FAILED_ALERTING;
+                dqInstanceService.updateStatus(dqInstance, instanceStatus);
+                waittingToRemoveFromRecordingList.add(dqInstance);
             }
-        });
-    }
-
-    private void doSubmitTaskToDispatcher(DQBaseTask task) {
-        // 并发度检查
-        if (!wc.canSubmitToSpecEngine(task.getEngine())) return;
-        if (dqTaskService.doSubmitRecordingTask(task)) {
-            // 任务提交成功  更新状态为recording
-            dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDING);
-        } else {
-            // 提交失败 记录一次失败
-            task.incrStatusAge();
+        } catch (Exception e) {
+            // todo rollback dqInstance status
+            log.error("e: ", e);
         }
     }
 
     public void scanEvaluatingTask() {
-        LinkedBlockingQueue<DQInstance> evaluating_task_queue = wc.getEVALUATING_TASK_QUEUE();
-        while (true) {
-            DQInstance dqInstance = null;
-            try {
-                // Evaluating 来一个处理一个
-                dqInstance = evaluating_task_queue.poll(5, TimeUnit.SECONDS);
-                if (dqInstance == null) continue;
-                // 根据状态打回任务
-                // 执行evaluating
-                if (dqInstance.getStatus() == DQInstanceStatus.EVALUATING) {
-                    dqInstance.doEvaluteTask();
-                } else {
-                    offerToSpecQueueByStatus(dqInstance);
-                }
-            } catch (Exception e) {
-                if (dqInstance != null) {
-                    log.error("scanEvaluatingTask doEvalute failed, id : {}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
-                    dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.FAILED_ALERTING);
+        Executors.newCachedThreadPool().execute(() -> {
+            LinkedBlockingQueue<DQInstance> evaluating_task_queue = wc.getEVALUATING_TASK_QUEUE();
+            while (true) {
+                DQInstance dqInstance = null;
+                try {
+                    dqInstance = evaluating_task_queue.poll(5, TimeUnit.SECONDS);
+                    if (dqInstance == null) continue;
+                    // do Evaluate
+                    DQStage evaluatingStage = dqInstance.getEvaluatingStage();
+                    if (dqInstance.getStatus() == DQInstanceStatus.EVALUATING) {
+                        dqStageService.executeStage(evaluatingStage);
+                        DQInstanceStatus dqInstanceStatus = evaluatingStage.hasSuccess() ? DQInstanceStatus.ALERTING : DQInstanceStatus.FAILED_ALERTING;
+                        dqInstanceService.updateStatus(dqInstance, dqInstanceStatus);
+                    }
                     offerToSpecQueueByStatus(dqInstance);
-                } else {
-                    log.error("scanEvaluatingTask poll instance failed. ex:", e);
+                } catch (Exception e) {
+                    if (dqInstance != null) {
+                        log.error("scanEvaluatingTask doEvalute failed, id : {}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
+                        dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.FAILED_ALERTING);
+                        offerToSpecQueueByStatus(dqInstance);
+                    } else {
+                        log.error("scanEvaluatingTask poll instance failed. ex:", e);
+                    }
                 }
             }
-        }
+        });
     }
 
     public void scanAlertingTask() {
-        LinkedBlockingQueue<DQInstance> alerting_task_queue = wc.getALERTING_TASK_QUEUE();
-        while (true) {
-            DQInstance dqInstance = null;
-            try {
-                // Evaluating 来一个处理一个
-                dqInstance = alerting_task_queue.poll(1, TimeUnit.SECONDS);
-                if (dqInstance == null) continue;
-                // 根据状态打回任务
-                // 执行evaluating
-                if (dqInstance.getStatus() == DQInstanceStatus.FAILED_ALERTING || dqInstance.getStatus() == DQInstanceStatus.EVALUATE_ALERTING) {
-                    dqInstance.doAlertTask();
-                } else {
-                    offerToSpecQueueByStatus(dqInstance);
-                }
-            } catch (Exception e) {
-                if (dqInstance != null) {
-                    log.error("scanAlertingTask doAlert failed, id : {}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
-                    if (dqInstance.isFailed()) {
-                        // 重试很多次了 直接设置为失败
-                        dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.FAILED);
-                        // 没有失败很多次的话,状态不修改 放回队列重试
+        Executors.newCachedThreadPool().execute(() -> {
+            LinkedBlockingQueue<DQInstance> alerting_task_queue = wc.getALERTING_TASK_QUEUE();
+            while (true) {
+                DQInstance dqInstance = null;
+                try {
+                    dqInstance = alerting_task_queue.poll(1, TimeUnit.SECONDS);
+                    if (dqInstance == null) continue;
+                    // do alerting
+                    if (dqInstance.getStatus() == DQInstanceStatus.FAILED_ALERTING || dqInstance.getStatus() == DQInstanceStatus.ALERTING) {
+                        DQStage alertingStage = dqInstance.getAlertingStage();
+                        dqStageService.executeStage(alertingStage);
+                        DQInstanceStatus dqInstanceStatus = alertingStage.hasSuccess()? DQInstanceStatus.SUCCESS : DQInstanceStatus.FAILED;
+                        dqInstanceService.updateStatus(dqInstance, dqInstanceStatus);
                     }
-                    // 放回队列准备重试
                     offerToSpecQueueByStatus(dqInstance);
-                } else {
-                    log.error("scanAlertingTask poll instance failed. ex:", e);
+                } catch (Exception e) {
+                    if (dqInstance != null) {
+                        log.error("scanAlertingTask doAlert failed, id : {}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
+                        if (dqInstance.isFailed()) {
+                            // retry 5 times, set failed
+                            dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.FAILED);
+                            // retry times less than 5, do not modify status and put task back
+                        }
+                        // put task back
+                        offerToSpecQueueByStatus(dqInstance);
+                    } else {
+                        log.error("scanAlertingTask poll instance failed. ex:", e);
+                    }
                 }
             }
-        }
+        });
     }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java b/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
index a39c17d1..276ebdaa 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
@@ -1,8 +1,11 @@
 package org.apache.griffin.core.worker.service;
 
+import org.apache.griffin.core.api.entity.GriffinDQContentInstanceMap;
+import org.apache.griffin.core.api.dao.DQContentInstanceMapDao;
 import org.apache.griffin.core.worker.dao.DQInstanceDao;
 import org.apache.griffin.core.worker.entity.bo.DQInstance;
 import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.worker.factory.DQInstanceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -14,12 +17,16 @@ public class DQInstanceService {
 
     @Autowired
     private DQInstanceDao dqInstanceDao;
+    @Autowired
+    private DQContentInstanceMapDao dqContentInstanceMapDao;
+    @Autowired
+    private DQInstanceFactory dqInstanceFactory;
 
     /**
      * 数据库和内存需要保持同步
-     * @param instance
-     * @param status
-     * @return
+     * @param instance obj
+     * @param status stat
+     * @return true or false
      */
     public boolean updateStatus(DQInstance instance, DQInstanceStatus status) {
         try {
@@ -33,7 +40,32 @@ public class DQInstanceService {
         return false;
     }
 
-    public DQInstance getById(long id) {
-        return null;
+    /**
+     * construct instance
+     * @param id master assign id
+     * @return DQInstance
+     */
+    public DQInstance getById(Long id) {
+        if (id == null) {
+            log.error("Unknown instance id: null");
+        }
+        DQInstance ins = dqInstanceDao.getById(id);
+        if (ins == null) {
+            // the ins id has no task info
+            return constructInstance(id);
+        }
+        return recoveryInstance(ins);
+    }
+
+    private DQInstance constructInstance(Long id) {
+        log.info("constructInstance id: {}", id);
+        GriffinDQContentInstanceMap contentInstanceMap = dqContentInstanceMapDao.getContentInstanceMapByInstanceId(id);
+        Long instanceId = contentInstanceMap.getInstanceId();
+        Long dqcId = contentInstanceMap.getDqcId();
+        return dqInstanceFactory.constructInstance(instanceId, dqcId);
+    }
+
+    private DQInstance recoveryInstance(DQInstance instance) {
+        return dqInstanceFactory.recoveryInstance(instance);
     }
 }
diff --git a/core/src/main/java/org/apache/griffin/core/worker/service/DQStageService.java b/core/src/main/java/org/apache/griffin/core/worker/service/DQStageService.java
new file mode 100644
index 00000000..2a2916d5
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/service/DQStageService.java
@@ -0,0 +1,58 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.griffin.core.worker.dao.DQStageDao;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQStage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.Executors;
+
+@Service
+public class DQStageService {
+
+    private static final Logger log = LoggerFactory.getLogger(DQStageService.class);
+    private DQStageDao dqStageDao;
+
+    public boolean updateTaskStatus(DQAbstractStage stage, DQStageStatus status) {
+        try {
+            dqStageDao.updateDQStageStatus(stage, status.getCode());
+            stage.setStatus(status);
+            return true;
+        } catch (Exception e) {
+            // todo
+            log.error("stage {} {} => {} failed, ex", stage, stage.getStatus(), status, e);
+        }
+        return false;
+    }
+
+    // async to submit task
+    public boolean submitStage(DQStage dqStage) {
+        log.info("start to submit stage");
+        // todo prepare thread pool
+        try {
+            Executors.newCachedThreadPool().execute(dqStage::start);
+            return true;
+        } catch (Exception e) {
+            // todo
+            log.error("Submit failed. ex:", e);
+        }
+        return false;
+    }
+
+    // sync to execute task
+    public boolean executeStage(DQStage dqStage) {
+        try {
+            dqStage.start();
+            return true;
+        } catch (Exception e) {
+            // todo
+            log.error("Submit failed. ex:", e);
+        }
+        return false;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
index 8c0f018b..bf5b38b6 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
@@ -28,42 +28,43 @@ public class DQTaskService {
     @Autowired
     private DispatcherClient dispatcherClient;
 
-    public boolean updateTaskStatus(DQBaseTask task, DQTaskStatus status) {
+    public void updateTaskStatus(DQBaseTask task, DQTaskStatus status) {
         try {
-            dqTaskDao.updateDQTaskListStatus(task, status.getCode());
+            dqTaskDao.updateDQTaskStatus(task, status.getCode());
             task.setStatus(status);
-            return true;
         } catch (Exception e) {
             log.error("task {} {} => {} failed, ex", task.getId(), task.getStatus(), status, e);
         }
-        return false;
     }
 
-    public boolean updateTaskStatus(List<DQBaseTask> tasks, DQTaskStatus status) {
-        if (CollectionUtils.isEmpty(tasks)) return false;
+    public void updateTaskStatus(List<DQBaseTask> tasks, DQTaskStatus status) {
+        if (CollectionUtils.isEmpty(tasks)) return;
         DQBaseTask sampleTask = tasks.get(0);
         List<Long> taskIdList = tasks.stream().map(DQBaseTask::getId).collect(Collectors.toList());
         try {
             dqTaskDao.updateDQTaskListStatus(tasks, status.getCode());
             tasks.forEach(x -> x.setStatus(status));
-            return true;
         } catch (Exception e) {
             log.error("task {} {} => {} failed, ex", taskIdList, sampleTask.getStatus(), status, e);
         }
-        return false;
     }
 
     public boolean doSubmitRecordingTask(DQBaseTask task) {
-        // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
-        List<Pair<Long, SubmitRequest>> requestList = getSubmitRequest(task);
-        // 提交任务 获取任务对应的job信息
-        List<JobStatus> jobStatusList = requestList.stream()
-                .map(reqPair -> Pair.of(reqPair.getLeft(), dispatcherClient.submitSql(reqPair.getRight())))
-                .map(respPair -> dispatcherClient.wrapperSubmitResponse(respPair))
-                .collect(Collectors.toList());
-        // 设置job信息
-        task.setJobStatusList(jobStatusList);
-        return true;
+        try {
+            // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
+            List<Pair<Long, SubmitRequest>> requestList = getSubmitRequest(task);
+            // 提交任务 获取任务对应的job信息
+            List<JobStatus> jobStatusList = requestList.stream()
+                    .map(reqPair -> Pair.of(reqPair.getLeft(), dispatcherClient.submitSql(reqPair.getRight())))
+                    .map(respPair -> dispatcherClient.wrapperSubmitResponse(respPair))
+                    .collect(Collectors.toList());
+            // 设置job信息
+            task.setJobStatusList(jobStatusList);
+            return true;
+        } catch (Exception e) {
+            log.error("ex: ", e);
+        }
+        return false;
     }
 
     public boolean checkJobStatus(DQBaseTask task) {
diff --git a/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java b/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
index 314a328d..2a5f9863 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
@@ -1,6 +1,6 @@
 package org.apache.griffin.core.worker.service;
 
-import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.api.context.WorkerContext;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
new file mode 100644
index 00000000..f866a533
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
@@ -0,0 +1,67 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+import org.apache.griffin.core.worker.service.DQStageService;
+import org.apache.griffin.core.worker.service.DQTaskService;
+
+import java.util.List;
+
+public abstract class DQAbstractStage implements DQStage {
+    protected DQTaskService dqTaskService;
+    protected DQStageService dqStageService;
+
+    protected DQStageStatus status;
+    protected DQInstance instance;
+
+    protected List<DQBaseTask> subTaskList;
+
+    public DQAbstractStage() {
+        this.status = DQStageStatus.INIT;
+    }
+
+    public DQInstance getInstance() {
+        return instance;
+    }
+
+    public void setInstance(DQInstance instance) {
+        this.instance = instance;
+    }
+
+    public void setDqTaskService(DQTaskService dqTaskService) {
+        this.dqTaskService = dqTaskService;
+    }
+
+    public void setDqStageService(DQStageService dqStageService) {
+        this.dqStageService = dqStageService;
+    }
+
+    public List<DQBaseTask> getSubTaskList() {
+        return subTaskList;
+    }
+
+    public void setSubTaskList(List<DQBaseTask> subTaskList) {
+        this.subTaskList = subTaskList;
+    }
+
+    public DQStageStatus getStatus() {
+        return status;
+    }
+
+    public void setStatus(DQStageStatus status) {
+        this.status = status;
+    }
+
+    public abstract void process();
+
+    public void updateStatus(DQStageStatus status) {
+        dqStageService.updateTaskStatus(this, status);
+    }
+
+    public void start() {
+        updateStatus(DQStageStatus.RUNNING);
+        process();
+        updateStatus(DQStageStatus.FINISH);
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQAlertStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAlertStage.java
new file mode 100644
index 00000000..2f980afd
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAlertStage.java
@@ -0,0 +1,33 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.utils.MsgSender;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DQAlertStage extends DQAbstractStage {
+
+    @Override
+    public void process() {
+
+        List<String> alertMsgList = subTaskList.stream()
+                .filter(DQBaseTask::isNeedAlert)
+                .map(DQBaseTask::alert)
+                .collect(Collectors.toList());
+        if (CollectionUtils.isEmpty(alertMsgList)) {
+            return;
+        }
+        MsgSender.send(packageAlertMsg(alertMsgList), instance.getDqAlertRule().getReceivers(), instance.getDqAlertRule().getSendType());
+    }
+
+    private String packageAlertMsg(List<String> alertMsgList) {
+        return null;
+    }
+
+    @Override
+    public boolean hasSuccess() {
+        return false;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQEvaluateStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQEvaluateStage.java
new file mode 100644
index 00000000..bcbabc08
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQEvaluateStage.java
@@ -0,0 +1,50 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Component
+public class DQEvaluateStage extends DQAbstractStage {
+
+    private static final Logger log = LoggerFactory.getLogger(DQEvaluateStage.class);
+
+    @Override
+    public void process() {
+        // failed task will not do evaluate
+        List<DQBaseTask> waitToDoEvaluateTaskList = subTaskList.stream()
+                .filter(task -> task.getStatus() == DQTaskStatus.RECORDED)
+                .collect(Collectors.toList());
+        dqTaskService.updateTaskStatus(waitToDoEvaluateTaskList, DQTaskStatus.EVALUATING);
+        waitToDoEvaluateTaskList.forEach(task -> {
+            // retry 3 times
+            for (int i = 0; i < 3; i++) {
+                try {
+                    task.evaluate();
+                    dqTaskService.updateTaskStatus(task, DQTaskStatus.EVALUATED);
+                    break;
+                } catch (Exception e) {
+                    log.error("Evaluate failed {} times", i);
+                }
+            }
+            if (task.getStatus() != DQTaskStatus.EVALUATED) {
+                dqTaskService.updateTaskStatus(task, DQTaskStatus.FAILED);
+            }
+        });
+    }
+
+    @Override
+    public boolean hasSuccess() {
+        for (DQBaseTask task : subTaskList) {
+            if (task.getStatus() == DQTaskStatus.EVALUATED) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQRecordStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQRecordStage.java
new file mode 100644
index 00000000..24550c75
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQRecordStage.java
@@ -0,0 +1,106 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.griffin.core.worker.client.DispatcherClient;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class DQRecordStage extends DQAbstractStage {
+    private static final Logger log = LoggerFactory.getLogger(DQRecordStage.class);
+
+    private DispatcherClient dispatcherClient;
+
+
+    public DispatcherClient getDispatcherClient() {
+        return dispatcherClient;
+    }
+
+    public void setDispatcherClient(DispatcherClient dispatcherClient) {
+        this.dispatcherClient = dispatcherClient;
+    }
+
+
+    @Override
+    public void process() {
+        while (continueCheckTaskStaus()) {
+            try {
+                submitTaskToDispatcher();
+                TimeUnit.SECONDS.sleep(1);
+            } catch (Exception e) {
+                // todo
+                log.error("ex: ", e);
+            }
+        }
+    }
+
+    /**
+     * one of tasks submitted to dispatcher is success
+     * @return boolean
+     */
+    @Override
+    public boolean hasSuccess() {
+        for (DQBaseTask task : subTaskList) {
+            if (task.getStatus() == DQTaskStatus.RECORDED) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean continueCheckTaskStaus() {
+        boolean doContinue = false;
+        for (DQBaseTask dqBaseTask : subTaskList) {
+            if (dqBaseTask.getStatus() == DQTaskStatus.RECORDING || dqBaseTask.getStatus() == DQTaskStatus.WAITTING) {
+                doContinue = true;
+                break;
+            }
+        }
+        return doContinue;
+    }
+
+    private void submitTaskToDispatcher() {
+        // loop and check status
+        subTaskList.forEach(task -> {
+            DQTaskStatus taskStatus = task.getStatus();
+            switch (taskStatus) {
+                case WAITTING:
+                    // submit task
+                    doSubmitTaskToDispatcher(task);
+                    if (task.isFailed()) {
+                        // failed
+                        dqTaskService.updateTaskStatus(task, DQTaskStatus.FAILED);
+                    }
+                    break;
+                case RECORDING:
+                    // query status
+                    boolean isFinished = dqTaskService.checkJobStatus(task);
+                    if (isFinished) {
+                        // task is finish, set status
+                        dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDED);
+                    }
+                    break;
+                default:
+                    // no handle
+                    break;
+            }
+        });
+    }
+
+    private void doSubmitTaskToDispatcher(DQBaseTask task) {
+        // 并发度检查
+        if (!dispatcherClient.canSubmitToSpecEngine(task.getEngine())) return;
+        if (dqTaskService.doSubmitRecordingTask(task)) {
+            // 任务提交成功  更新状态为recording
+            dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDING);
+        } else {
+            // 提交失败 记录一次失败
+            task.incrStatusAge();
+        }
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQStage.java
new file mode 100644
index 00000000..4daa54cf
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQStage.java
@@ -0,0 +1,9 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+
+public interface DQStage {
+    void process();
+    void start();
+    boolean hasSuccess();
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java b/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java
new file mode 100644
index 00000000..8e5a1cf4
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.core.worker.utils;
+
+import java.util.concurrent.TimeUnit;
+
+public class DQDateUtils {
+
+    public static String praseBussinesstimeToPartitionSQL(Long businessTime, String partition) {
+        return null;
+    }
+
+    public static Long offsetTime(Long businessTime, Integer offset, TimeUnit unit) {
+        return 0L;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java b/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java
new file mode 100644
index 00000000..23884c88
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java
@@ -0,0 +1,7 @@
+package org.apache.griffin.core.worker.utils;
+
+public class ExpressionUtils {
+    public static boolean evaluate(String expression, double metricValue) {
+        return false;
+    }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/utils/MsgSender.java b/core/src/main/java/org/apache/griffin/core/worker/utils/MsgSender.java
new file mode 100644
index 00000000..5bc5a6ce
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/utils/MsgSender.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.utils;
+
+import java.util.List;
+
+public class MsgSender {
+
+    public static boolean send(String msg, List<String> receiver, int sendType) {
+        return false;
+    }
+
+}
diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml
new file mode 100644
index 00000000..fddc2639
--- /dev/null
+++ b/core/src/main/resources/application.yml
@@ -0,0 +1,4 @@
+task:
+  assign:
+    strategy:
+      org.apache.griffin.core.master.strategy.LooperAssignStrategy
\ No newline at end of file