You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2023/01/12 11:08:15 UTC
[griffin] branch griffin-1.0.0-dev updated: Core: Process workflow with stage (#623)
This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch griffin-1.0.0-dev
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/griffin-1.0.0-dev by this push:
new 279dcfcb Core: Process workflow with stage (#623)
279dcfcb is described below
commit 279dcfcbd33ed4d1523e96d9072ec0dcf04320c3
Author: dabuliud <wa...@gmail.com>
AuthorDate: Thu Jan 12 19:08:09 2023 +0800
Core: Process workflow with stage (#623)
* stage
* master init
* init
* factory
* Add task assign strategy
Co-authored-by: Warden <wa...@gmail.com>
---
.../core/api/context/DQCApplicationContext.java | 17 ++
.../{worker => api}/context/WorkerContext.java | 9 +-
.../griffin/core/api/dao/DQBusinessRuleDao.java | 11 +
.../apache/griffin/core/api/dao/DQContentDao.java | 7 +
.../core/api/dao/DQContentInstanceMapDao.java | 10 +
.../griffin/core/api/entity/DQResoueceEnum.java | 5 +
.../core/api/entity/GriffinDQBusinessRule.java | 10 +
.../griffin/core/api/entity/GriffinDQContent.java | 20 ++
.../api/entity/GriffinDQContentInstanceMap.java | 10 +
.../griffin/core/api/entity/GriffinDQTable.java | 13 ++
.../apache/griffin/core/api/utils/SpringUtils.java | 23 ++
.../griffin/core/master/GriffinMasterMain.java | 20 ++
.../core/master/service/TaskAssignService.java | 30 +++
.../master/strategy/AbstractAssignStrategy.java | 14 ++
.../master/strategy/AssignStrategyFactory.java | 14 ++
.../core/master/strategy/LooperAssignStrategy.java | 15 ++
.../core/master/transport/DQCConnection.java | 27 +++
.../griffin/core/worker/GriffinWorkerMain.java | 20 ++
.../core/worker/client/DispatcherClient.java | 6 +
.../griffin/core/worker/dao/DQInstanceDao.java | 3 +
.../apache/griffin/core/worker/dao/DQStageDao.java | 11 +
.../apache/griffin/core/worker/dao/DQTaskDao.java | 2 +-
.../core/worker/driver/PrestoTemplateDriver.java | 2 +-
.../core/worker/driver/SparkTemplateDriver.java | 2 +-
.../griffin/core/worker/driver/TemplateDriver.java | 4 +-
.../griffin/core/worker/entity/bo/DQInstance.java | 32 ++-
.../core/worker/entity/bo/task/DQBaseTask.java | 48 +++--
.../core/worker/entity/bo/task/DQHiveTask.java | 14 +-
.../core/worker/entity/bo/task/DQKafkaTask.java | 11 +-
.../core/worker/entity/enums/DQInstanceStatus.java | 7 +-
.../core/worker/entity/enums/DQStageStatus.java | 18 ++
.../core/worker/entity/enums/DQStageTypeEnum.java | 5 +
.../core/worker/entity/enums/DQTaskStatus.java | 4 +-
.../griffin/core/worker/entity/pojo/DQTable.java | 14 ++
.../core/worker/entity/pojo/rule/DQAlertRule.java | 9 +
.../worker/entity/pojo/rule/DQEvaluateRule.java | 17 ++
.../core/worker/entity/pojo/rule/DQRecordRule.java | 40 ++++
.../entity/pojo/template/DQRecordBaseTemplate.java | 2 +-
.../entity/pojo/template/DQRecordTemplate.java | 10 +
.../worker/exception/StageSubmitException.java | 8 +
.../core/worker/factory/DQInstanceFactory.java | 56 +++++
.../core/worker/factory/DQStageFactory.java | 52 +++++
.../griffin/core/worker/factory/DQTaskFactory.java | 55 +++++
.../griffin/core/worker/factory/TaskFactory.java | 9 -
.../core/worker/factory/TemplateDriverFactory.java | 23 +-
.../worker/schedule/TaskDispatcherScheduler.java | 233 ++++++++-------------
.../core/worker/service/DQInstanceService.java | 42 +++-
.../core/worker/service/DQStageService.java | 58 +++++
.../griffin/core/worker/service/DQTaskService.java | 37 ++--
.../core/worker/service/WorkCoreService.java | 2 +-
.../griffin/core/worker/stage/DQAbstractStage.java | 67 ++++++
.../griffin/core/worker/stage/DQAlertStage.java | 33 +++
.../griffin/core/worker/stage/DQEvaluateStage.java | 50 +++++
.../griffin/core/worker/stage/DQRecordStage.java | 106 ++++++++++
.../apache/griffin/core/worker/stage/DQStage.java | 9 +
.../griffin/core/worker/utils/DQDateUtils.java | 14 ++
.../griffin/core/worker/utils/ExpressionUtils.java | 7 +
.../griffin/core/worker/utils/MsgSender.java | 11 +
core/src/main/resources/application.yml | 4 +
59 files changed, 1160 insertions(+), 252 deletions(-)
diff --git a/core/src/main/java/org/apache/griffin/core/api/context/DQCApplicationContext.java b/core/src/main/java/org/apache/griffin/core/api/context/DQCApplicationContext.java
new file mode 100644
index 00000000..d2a2fea3
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/context/DQCApplicationContext.java
@@ -0,0 +1,17 @@
+package org.apache.griffin.core.api.context;
+
+import lombok.Data;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Master Runtime Env
+ * Scope: Singleton
+ */
+@Component
+@Data
+public class DQCApplicationContext {
+ private Map<String, WorkerContext> context = new ConcurrentHashMap<>();
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java b/core/src/main/java/org/apache/griffin/core/api/context/WorkerContext.java
similarity index 89%
rename from core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
rename to core/src/main/java/org/apache/griffin/core/api/context/WorkerContext.java
index 52280f36..48e5be75 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/context/WorkerContext.java
+++ b/core/src/main/java/org/apache/griffin/core/api/context/WorkerContext.java
@@ -1,26 +1,23 @@
-package org.apache.griffin.core.worker.context;
+package org.apache.griffin.core.api.context;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
-import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
/**
- * 上下文信息 全局唯一
+ * Worker Runtime Env
+ * Scope: Singleton
*/
@Component
public class WorkerContext {
private final List<DQInstance> WAITTING_TASK_QUEUE;
-// public static final List<DQBaseTask> runningTaskIdQueue = Lists.newCopyOnWriteArrayList();
- // runningTaskIdList = RECORDING_TASK_LIST + EVALUATING_TASK_LIST + ALERTING_TASK_LIST
private final List<DQInstance> RECORDING_TASK_QUEUE;
private final LinkedBlockingQueue<DQInstance> EVALUATING_TASK_QUEUE;
private final LinkedBlockingQueue<DQInstance> ALERTING_TASK_QUEUE;
diff --git a/core/src/main/java/org/apache/griffin/core/api/dao/DQBusinessRuleDao.java b/core/src/main/java/org/apache/griffin/core/api/dao/DQBusinessRuleDao.java
new file mode 100644
index 00000000..4197235f
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/dao/DQBusinessRuleDao.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.api.dao;
+
+import org.apache.griffin.core.api.entity.GriffinDQBusinessRule;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Component
+public interface DQBusinessRuleDao {
+ List<GriffinDQBusinessRule> getListByDqcId(Long dqcId);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/dao/DQContentDao.java b/core/src/main/java/org/apache/griffin/core/api/dao/DQContentDao.java
new file mode 100644
index 00000000..73849142
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/dao/DQContentDao.java
@@ -0,0 +1,7 @@
+package org.apache.griffin.core.api.dao;
+
+import org.apache.griffin.core.api.entity.GriffinDQContent;
+
+public interface DQContentDao {
+ GriffinDQContent getById(Long id);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/dao/DQContentInstanceMapDao.java b/core/src/main/java/org/apache/griffin/core/api/dao/DQContentInstanceMapDao.java
new file mode 100644
index 00000000..2ea57545
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/dao/DQContentInstanceMapDao.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.api.dao;
+
+import org.apache.griffin.core.api.entity.GriffinDQContentInstanceMap;
+import org.springframework.stereotype.Component;
+
+@Component
+public interface DQContentInstanceMapDao {
+
+ GriffinDQContentInstanceMap getContentInstanceMapByInstanceId(Long id);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/DQResoueceEnum.java b/core/src/main/java/org/apache/griffin/core/api/entity/DQResoueceEnum.java
new file mode 100644
index 00000000..ba8fd35a
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/DQResoueceEnum.java
@@ -0,0 +1,5 @@
+package org.apache.griffin.core.api.entity;
+
+public enum DQResoueceEnum {
+ HIVE, KAFKA;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQBusinessRule.java b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQBusinessRule.java
new file mode 100644
index 00000000..7fbd00b5
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQBusinessRule.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.api.entity;
+
+import lombok.Data;
+
+@Data
+public class GriffinDQBusinessRule {
+
+ private Long id;
+ private Long dqcId;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContent.java b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContent.java
new file mode 100644
index 00000000..255f4f7b
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContent.java
@@ -0,0 +1,20 @@
+package org.apache.griffin.core.api.entity;
+
+import lombok.Data;
+
+/**
+ * DQContent: one table has only one dqcContent
+ */
+@Data
+public class GriffinDQContent {
+ // id
+ private Long id;
+ private String owner;
+
+
+ // table ID
+ private Long resourceId;
+ // tableName
+ private String tableName;
+ private DQResoueceEnum resoueceEnum;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContentInstanceMap.java b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContentInstanceMap.java
new file mode 100644
index 00000000..a7f7b711
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQContentInstanceMap.java
@@ -0,0 +1,10 @@
+package org.apache.griffin.core.api.entity;
+
+import lombok.Data;
+
+@Data
+public class GriffinDQContentInstanceMap {
+ private Long id;
+ private Long instanceId;
+ private Long dqcId;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQTable.java b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQTable.java
new file mode 100644
index 00000000..4da913b6
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/entity/GriffinDQTable.java
@@ -0,0 +1,13 @@
+package org.apache.griffin.core.api.entity;
+
+import lombok.Data;
+
+/**
+ * Table info
+ */
+@Data
+public class GriffinDQTable {
+ private Long id;
+ private String tableName;
+ private DQResoueceEnum resoueceEnum;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/api/utils/SpringUtils.java b/core/src/main/java/org/apache/griffin/core/api/utils/SpringUtils.java
new file mode 100644
index 00000000..ad4e1059
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/api/utils/SpringUtils.java
@@ -0,0 +1,23 @@
+package org.apache.griffin.core.api.utils;
+
+
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanNotOfRequiredTypeException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+public class SpringUtils implements ApplicationContextAware {
+ // Spring context
+ private static ApplicationContext applicationContext;
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ SpringUtils.applicationContext = applicationContext;
+ }
+
+ public static <T> T getObject(String name, Class<T> clazz) {
+ Object bean = applicationContext.getBean(name);
+ if (clazz.isInstance(bean)) return clazz.cast(bean);
+ throw new BeanNotOfRequiredTypeException(name, clazz, bean.getClass());
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/GriffinMasterMain.java b/core/src/main/java/org/apache/griffin/core/master/GriffinMasterMain.java
new file mode 100644
index 00000000..ed009a96
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/GriffinMasterMain.java
@@ -0,0 +1,20 @@
+package org.apache.griffin.core.master;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+@EnableScheduling
+public class GriffinMasterMain {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(GriffinMasterMain.class);
+
+ public static void main(String[] args) {
+ SpringApplication.run(GriffinMasterMain.class, args);
+ LOGGER.info("application started");
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java b/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
new file mode 100644
index 00000000..a324973d
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/service/TaskAssignService.java
@@ -0,0 +1,30 @@
+package org.apache.griffin.core.master.service;
+
+import org.apache.griffin.core.master.strategy.AbstractAssignStrategy;
+import org.apache.griffin.core.master.strategy.AssignStrategyFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import javax.annotation.PostConstruct;
+
+
+@Component
+public class TaskAssignService {
+
+ @Value("${task.assign.strategy}")
+ private String assignTaskStrategtClass;
+
+ private AbstractAssignStrategy strategy;
+
+ @PostConstruct
+ public void init() {
+ strategy = AssignStrategyFactory.getStrategy(assignTaskStrategtClass);
+ Assert.notNull(strategy, "Task Assign Strategy init failed");
+ }
+
+
+ public String assignTask(long instanceId) {
+ return strategy.assignTask(instanceId);
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/strategy/AbstractAssignStrategy.java b/core/src/main/java/org/apache/griffin/core/master/strategy/AbstractAssignStrategy.java
new file mode 100644
index 00000000..4cd844a7
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/strategy/AbstractAssignStrategy.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.core.master.strategy;
+
+import org.apache.griffin.core.api.context.DQCApplicationContext;
+
+public abstract class AbstractAssignStrategy {
+
+ protected DQCApplicationContext dqcApplicationContext;
+
+ public AbstractAssignStrategy(DQCApplicationContext dqcApplicationContext) {
+ this.dqcApplicationContext = dqcApplicationContext;
+ }
+
+ public abstract String assignTask(long instanceId);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/strategy/AssignStrategyFactory.java b/core/src/main/java/org/apache/griffin/core/master/strategy/AssignStrategyFactory.java
new file mode 100644
index 00000000..37cc8e7f
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/strategy/AssignStrategyFactory.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.core.master.strategy;
+
+public class AssignStrategyFactory {
+
+ public static AbstractAssignStrategy getStrategy(String className) {
+ try {
+ Class clazz = Class.forName(className);
+ return (AbstractAssignStrategy) clazz.newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/strategy/LooperAssignStrategy.java b/core/src/main/java/org/apache/griffin/core/master/strategy/LooperAssignStrategy.java
new file mode 100644
index 00000000..4d85fc4a
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/strategy/LooperAssignStrategy.java
@@ -0,0 +1,15 @@
+package org.apache.griffin.core.master.strategy;
+
+import org.apache.griffin.core.api.context.DQCApplicationContext;
+
+public class LooperAssignStrategy extends AbstractAssignStrategy {
+
+ public LooperAssignStrategy(DQCApplicationContext dqcApplicationContext) {
+ super(dqcApplicationContext);
+ }
+
+ @Override
+ public String assignTask(long instanceId) {
+ return null;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
new file mode 100644
index 00000000..0f7b902a
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/master/transport/DQCConnection.java
@@ -0,0 +1,27 @@
+package org.apache.griffin.core.master.transport;
+
+import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.Future;
+
+/**
+ * the obj has a socketChannel to worker node
+ */
+public class DQCConnection {
+ // worker hostName
+ private String hostName;
+ // worker hostIP
+ private String hostIP;
+ // worker hostPort
+ private int hostPort;
+ // todo
+ private ServerSocketChannel channel;
+
+ /**
+ * Send msg async
+ * @param msg message
+ * @return Future
+ */
+ public Future send(byte[] msg) {
+ return null;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/GriffinWorkerMain.java b/core/src/main/java/org/apache/griffin/core/worker/GriffinWorkerMain.java
new file mode 100644
index 00000000..f19e7a5d
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/GriffinWorkerMain.java
@@ -0,0 +1,20 @@
+package org.apache.griffin.core.worker;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+@EnableScheduling
+public class GriffinWorkerMain {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(GriffinWorkerMain.class);
+
+ public static void main(String[] args) {
+ SpringApplication.run(GriffinWorkerMain.class, args);
+ LOGGER.info("application started");
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
index 5422f35e..dbd4bd39 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/client/DispatcherClient.java
@@ -3,6 +3,7 @@ package org.apache.griffin.core.worker.client;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.griffin.core.worker.entity.bo.DQInstance;
import org.apache.griffin.core.worker.entity.dispatcher.*;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
import org.apache.griffin.core.worker.entity.enums.DQErrorCode;
import org.springframework.stereotype.Service;
@@ -17,6 +18,11 @@ public class DispatcherClient {
}
+ public boolean canSubmitToSpecEngine(DQEngineEnum engine) {
+ // todo
+ return true;
+ }
+
public JobStatusResponse getJobStatus(JobStatusRequest jobStatusRequest) {
// todo parse job status
return null;
diff --git a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
index 11dd93f9..544adb36 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQInstanceDao.java
@@ -6,6 +6,9 @@ import org.springframework.stereotype.Component;
@Component
public interface DQInstanceDao {
+ DQInstance getById(Long id);
+
void updateDQInstanceStatus(DQInstance instance, int status);
+ void insert(DQInstance instance);
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/dao/DQStageDao.java b/core/src/main/java/org/apache/griffin/core/worker/dao/DQStageDao.java
new file mode 100644
index 00000000..0e37c0d8
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQStageDao.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.dao;
+
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQStage;
+
+public interface DQStageDao {
+ void updateDQStageStatus(DQStage stage, int status);
+
+ void insert(DQAbstractStage stage);
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
index c94074cb..f87fbc46 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/dao/DQTaskDao.java
@@ -11,6 +11,6 @@ import java.util.List;
public interface DQTaskDao {
void updateDQTaskListStatus(List<DQBaseTask> tasks, int status);
- void updateDQTaskListStatus(DQBaseTask task, int status);
+ void updateDQTaskStatus(DQBaseTask task, int status);
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
index 61ab2bdb..b5d2cf62 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/driver/PrestoTemplateDriver.java
@@ -7,7 +7,7 @@ import java.util.Map;
public class PrestoTemplateDriver extends TemplateDriver{
@Override
- public List<String> getRecordSql(DQRecordTemplate template, Map<String, String> params) {
+ public String getRecordSql(DQRecordTemplate template, Map<String, String> params) {
return null;
}
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
index eaf17b4a..4d500832 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/driver/SparkTemplateDriver.java
@@ -7,7 +7,7 @@ import java.util.Map;
public class SparkTemplateDriver extends TemplateDriver {
@Override
- public List<String> getRecordSql(DQRecordTemplate template, Map<String, String> params) {
+ public String getRecordSql(DQRecordTemplate template, Map<String, String> params) {
return null;
}
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
index 85a23390..9e2aab97 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/driver/TemplateDriver.java
@@ -18,9 +18,9 @@ public abstract class TemplateDriver {
/**
* 拼出的SQL 返回值 必须是 <ruleId, Partition, Metric> 这样可以做SQL合并
*/
- public abstract List<String> getRecordSql(DQRecordTemplate template, Map<String, String> params);
+ public abstract String getRecordSql(DQRecordTemplate template, Map<String, String> params);
- public List<String> getRecordSql(DQEngineEnum engine, DQRecordTemplate template, Map<String, String> params) {
+ public String getRecordSql(DQEngineEnum engine, DQRecordTemplate template, Map<String, String> params) {
TemplateDriver templateDriver = templateDriverFactory.getTemplateDrvier(engine);
return templateDriver.getRecordSql(template, params);
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
index af5ff201..e4608f9c 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/DQInstance.java
@@ -5,6 +5,9 @@ import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQStage;
import java.util.List;
@@ -15,6 +18,9 @@ import java.util.List;
@Data
public class DQInstance {
private Long id;
+
+ private Long dqcId;
+
// 实例状态
private DQInstanceStatus status;
// 记录状态年龄 状态更新是重置
@@ -24,6 +30,12 @@ public class DQInstance {
//
private long scanTimeStamp = 0L;
+ protected DQAlertRule dqAlertRule;
+
+ private DQAbstractStage recordingStage;
+ private DQAbstractStage evaluatingStage;
+ private DQAbstractStage alertingStage;
+
public void setStatus(DQInstanceStatus status) {
@@ -44,16 +56,7 @@ public class DQInstance {
return statusAge > 5;
}
- public boolean hasTaskToSubmit() {
- boolean hasTaskToSubmit = false;
- for (DQBaseTask dqBaseTask : subTaskList) {
- if (dqBaseTask.getStatus() == DQTaskStatus.WAITTING) {
- hasTaskToSubmit = true;
- break;
- }
- }
- return hasTaskToSubmit;
- }
+
public boolean isFinishRecord() {
boolean isFinishRecord = true;
@@ -65,13 +68,4 @@ public class DQInstance {
}
return isFinishRecord;
}
-
- public void doEvaluteTask() {
- subTaskList.forEach(DQBaseTask::evaluate);
-
- }
-
- public void doAlertTask() {
- // 收敛告警信息 进行告警
- }
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
index fd01f036..fffc684d 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQBaseTask.java
@@ -3,13 +3,11 @@ package org.apache.griffin.core.worker.entity.bo.task;
import com.beust.jcommander.internal.Lists;
import lombok.Data;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.api.context.WorkerContext;
import org.apache.griffin.core.worker.entity.dispatcher.JobStatus;
import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
-import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
import org.apache.griffin.core.worker.entity.pojo.Metric;
-import org.apache.griffin.core.worker.entity.pojo.rule.DQAlertRule;
import org.apache.griffin.core.worker.entity.pojo.rule.DQEvaluateRule;
import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
@@ -20,42 +18,50 @@ import java.util.List;
* 一个完整的任务包含
* record
* evaluate
- * alert
*/
@Data
public abstract class DQBaseTask {
- private long id;
- private String owner;
- private WorkerContext wc;
- private DQEngineEnum engine;
- private DQRecordRule recordRule;
- private DQEvaluateRule dqEvaluateRule;
- private DQAlertRule dqAlertRule;
- private DQTaskStatus status;
- private List<JobStatus> jobStatusList;
- private List<Metric> metricList = Lists.newArrayList();
+ protected long id;
+ protected String owner;
+ protected WorkerContext wc;
+ protected DQEngineEnum engine;
+ protected DQRecordRule recordRule;
+ protected DQEvaluateRule dqEvaluateRule;
+ protected DQTaskStatus status;
+ protected List<JobStatus> jobStatusList;
+ protected List<Metric> metricList = Lists.newArrayList();
+ protected boolean needAlert = false;
+ protected Long businessTime;
// 记录状态年龄 状态更新是重置
private int statusAge;
// 生成recordsql和 模板 + 参数 有关系
// 生成SQL部分希望交给模板来做
public List<Pair<Long, String>> record() {
// before
- List<Pair<Long, String>> partitionTimeAndSqlList = doRecord();
+ List<Pair<Long, String>> partitionTimeAndSqlList = getRecordInfo();
// after
return partitionTimeAndSqlList;
}
+
public void evaluate() {
- doEvaluate();
+ needAlert = doEvaluate();
}
- public void alert() {
- doAlert();
+
+ public String alert() {
+ return getAlertMsg();
}
// partitionTime And sql
- public abstract List<Pair<Long, String>> doRecord();
- public abstract boolean doEvaluate();
- public abstract boolean doAlert();
+ public abstract List<Pair<Long, String>> getRecordInfo();
+
+ public boolean doEvaluate() {
+ return dqEvaluateRule.execute(metricList);
+ }
+
+ public String getAlertMsg() {
+ return "xxx is error";
+ }
public void setStatus(DQTaskStatus status) {
if (this.status != status) resetStatusAge();
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
index 62c6ef3b..be6aee5e 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQHiveTask.java
@@ -1,6 +1,7 @@
package org.apache.griffin.core.worker.entity.bo.task;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.griffin.core.worker.entity.pojo.rule.DQRecordRule;
import java.util.List;
@@ -10,17 +11,8 @@ import java.util.List;
public class DQHiveTask extends DQBaseTask {
@Override
- public List<Pair<Long, String>> doRecord() {
- return null;
+ public List<Pair<Long, String>> getRecordInfo() {
+ return recordRule.getPartitionAndRuleIdList(businessTime, engine);
}
- @Override
- public boolean doEvaluate() {
- return false;
- }
-
- @Override
- public boolean doAlert() {
- return false;
- }
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
index 356c0fac..c92906ce 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/bo/task/DQKafkaTask.java
@@ -6,17 +6,8 @@ import java.util.List;
public class DQKafkaTask extends DQBaseTask {
@Override
- public List<Pair<Long, String>> doRecord() {
+ public List<Pair<Long, String>> getRecordInfo() {
return null;
}
- @Override
- public boolean doEvaluate() {
- return false;
- }
-
- @Override
- public boolean doAlert() {
- return false;
- }
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
index 6e70721b..cd626a7e 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQInstanceStatus.java
@@ -3,11 +3,12 @@ package org.apache.griffin.core.worker.entity.enums;
public enum DQInstanceStatus {
ACCEPTED(0),
WAITTING(1),
- SUBMITTING(2), // 任务提交中
- RUNNING(3),
+// SUBMITTING(2), // 任务提交中
+// RUNNING(3),
RECORDING(4),
EVALUATING(5),
- EVALUATE_ALERTING(6), // Metric 需要告警
+ ALERTING(5),
+// EVALUATE_ALERTING(6), // Metric 需要告警
FAILED_ALERTING(7), // 任务执行失败需要告警
SUCCESS(8),
FAILED(9);
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageStatus.java b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageStatus.java
new file mode 100644
index 00000000..a971c8e5
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageStatus.java
@@ -0,0 +1,18 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQStageStatus {
+ INIT(0),
+ RUNNABLE(1),
+ RUNNING(2),
+ FINISH(3);
+
+ private final int code;
+
+ DQStageStatus(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageTypeEnum.java b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageTypeEnum.java
new file mode 100644
index 00000000..93057a05
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQStageTypeEnum.java
@@ -0,0 +1,5 @@
+package org.apache.griffin.core.worker.entity.enums;
+
+public enum DQStageTypeEnum {
+ RECORD, EVALUATE, ALERT;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
index ab5e0716..11235a11 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/enums/DQTaskStatus.java
@@ -6,8 +6,8 @@ public enum DQTaskStatus {
RECORDED(1),
EVALUATING(2),
EVALUATED(2),
- ALERTING(3),
- ALERTED(3),
+// ALERTING(3),
+// ALERTED(3),
SUCCESS(4),
FAILED(5);
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java
new file mode 100644
index 00000000..7be0c5ac
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/DQTable.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.core.worker.entity.pojo;
+
+import lombok.Data;
+
+import java.util.concurrent.TimeUnit;
+
+@Data
+public class DQTable {
+ private Long id;
+ private String dbName;
+ private String tableName;
+ private String partition;
+ private TimeUnit unit;
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
index 86d4fca8..37dd63f1 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQAlertRule.java
@@ -1,4 +1,13 @@
package org.apache.griffin.core.worker.entity.pojo.rule;
+import java.util.List;
+
public class DQAlertRule {
+ public int getSendType() {
+ return 0;
+ }
+
+ public List<String> getReceivers() {
+ return null;
+ }
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
index 5fc51854..2ee5ec6a 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQEvaluateRule.java
@@ -1,4 +1,21 @@
package org.apache.griffin.core.worker.entity.pojo.rule;
+import org.apache.griffin.core.worker.entity.pojo.Metric;
+import org.apache.griffin.core.worker.utils.ExpressionUtils;
+
+import java.util.List;
+
public class DQEvaluateRule {
+
+ private String expression;
+
+ public boolean execute(List<Metric> metricList) {
+ for (Metric metric : metricList) {
+ double metricValue = metric.getMetric();
+ if (ExpressionUtils.evaluate(expression, metricValue)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
index 94deba4b..3cf5605d 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/rule/DQRecordRule.java
@@ -1,9 +1,49 @@
package org.apache.griffin.core.worker.entity.pojo.rule;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import lombok.Data;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+import org.apache.griffin.core.worker.entity.pojo.DQTable;
+import org.apache.griffin.core.worker.entity.pojo.template.DQRecordBaseTemplate;
import org.apache.griffin.core.worker.entity.pojo.template.DQRecordTemplate;
+import org.apache.griffin.core.worker.utils.DQDateUtils;
+
+import java.util.List;
+import java.util.Map;
@Data
public class DQRecordRule {
+
+ private Long id;
+ private Map<String, String> baseTemplateParams;
private DQRecordTemplate template;
+ private DQTable table;
+
+
+ public List<Pair<Long, String>> getPartitionAndRuleIdList(Long businessTime, DQEngineEnum engine) {
+ // prepare params
+ List<Pair<Long, String>> partitionAndRuleIdList = Lists.newArrayList();
+ if (template instanceof DQRecordBaseTemplate) {
+ // base template
+ baseTemplateParams.put("partition_sql_info", DQDateUtils.praseBussinesstimeToPartitionSQL(businessTime, table.getPartition()));
+ String recordSql = template.getRecordSql(baseTemplateParams, engine);
+ partitionAndRuleIdList.add(Pair.of(businessTime, recordSql));
+ } else {
+ List<Integer> offsetList = getOffsetList();
+ offsetList.forEach(offset -> {
+ Map<String, String> templateParams = Maps.newHashMap(baseTemplateParams);
+ Long offsetBusinessTime = DQDateUtils.offsetTime(businessTime, offset, table.getUnit());
+ baseTemplateParams.put("partition_sql_info", DQDateUtils.praseBussinesstimeToPartitionSQL(offsetBusinessTime, table.getPartition()));
+ String recordSql = template.getRecordSql(templateParams, engine);
+ partitionAndRuleIdList.add(Pair.of(offsetBusinessTime, recordSql));
+ });
+ }
+ return partitionAndRuleIdList;
+ }
+
+ private List<Integer> getOffsetList() {
+ return null;
+ }
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
index fbe51281..44646da8 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordBaseTemplate.java
@@ -1,4 +1,4 @@
package org.apache.griffin.core.worker.entity.pojo.template;
-public class DQRecordBaseTemplate {
+public class DQRecordBaseTemplate extends DQRecordTemplate {
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
index d4b12047..49db5868 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/entity/pojo/template/DQRecordTemplate.java
@@ -1,5 +1,15 @@
package org.apache.griffin.core.worker.entity.pojo.template;
+import org.apache.griffin.core.worker.driver.TemplateDriver;
+import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
+
+import java.util.Map;
+
public class DQRecordTemplate {
+ private TemplateDriver driver;
+
+ public String getRecordSql(Map<String, String> templateParams, DQEngineEnum engine) {
+ return driver.getRecordSql(engine, this, templateParams);
+ }
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/exception/StageSubmitException.java b/core/src/main/java/org/apache/griffin/core/worker/exception/StageSubmitException.java
new file mode 100644
index 00000000..07a3e05e
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/exception/StageSubmitException.java
@@ -0,0 +1,8 @@
+package org.apache.griffin.core.worker.exception;
+
+public class StageSubmitException extends Exception {
+
+ public StageSubmitException(String message) {
+ super(message);
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
new file mode 100644
index 00000000..386bd8b9
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/DQInstanceFactory.java
@@ -0,0 +1,56 @@
+package org.apache.griffin.core.worker.factory;
+
+import org.apache.griffin.core.api.dao.DQBusinessRuleDao;
+import org.apache.griffin.core.api.dao.DQContentDao;
+import org.apache.griffin.core.api.entity.GriffinDQBusinessRule;
+import org.apache.griffin.core.api.entity.GriffinDQContent;
+import org.apache.griffin.core.worker.dao.DQInstanceDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQStageTypeEnum;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+@Component
+public class DQInstanceFactory {
+
+ @Autowired
+ private DQInstanceDao dqInstanceDao;
+ @Autowired
+ private DQContentDao dqContentDao;
+ @Autowired
+ private DQBusinessRuleDao dqBusinessRuleDao;
+ @Autowired
+ private DQStageFactory dqStageFactory;
+ @Autowired
+ private DQTaskFactory dqTaskFactory;
+
+ public DQInstance constructInstance(Long id, Long dqcId) {
+ DQInstance instance = new DQInstance();
+ instance.setId(id);
+ instance.setDqcId(dqcId);
+ GriffinDQContent griffinDQContent = dqContentDao.getById(dqcId);
+ // construct taskL
+ List<GriffinDQBusinessRule> businessRuleList = dqBusinessRuleDao.getListByDqcId(dqcId);
+ List<DQBaseTask> subTaskList = dqTaskFactory.constructTasks(griffinDQContent.getResoueceEnum(), businessRuleList);
+ instance.setSubTaskList(subTaskList);
+ // construct record stage
+ DQAbstractStage reordStage = dqStageFactory.constructStage(DQStageTypeEnum.RECORD, instance);
+ instance.setRecordingStage(reordStage);
+ // construct check stage
+ DQAbstractStage evaluateStage = dqStageFactory.constructStage(DQStageTypeEnum.EVALUATE, instance);
+ instance.setEvaluatingStage(evaluateStage);
+ // construct alert stage
+ DQAbstractStage alertStage = dqStageFactory.constructStage(DQStageTypeEnum.ALERT, instance);
+ instance.setAlertingStage(alertStage);
+ dqInstanceDao.insert(instance);
+ return instance;
+ }
+
+ public DQInstance recoveryInstance(DQInstance instance) {
+ return null;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
new file mode 100644
index 00000000..519c7162
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/DQStageFactory.java
@@ -0,0 +1,52 @@
+package org.apache.griffin.core.worker.factory;
+
+import org.apache.griffin.core.api.utils.SpringUtils;
+import org.apache.griffin.core.worker.dao.DQStageDao;
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.enums.DQStageTypeEnum;
+import org.apache.griffin.core.worker.service.DQStageService;
+import org.apache.griffin.core.worker.service.DQTaskService;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQAlertStage;
+import org.apache.griffin.core.worker.stage.DQEvaluateStage;
+import org.apache.griffin.core.worker.stage.DQRecordStage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DQStageFactory {
+
+ private static final Logger log = LoggerFactory.getLogger(DQStageFactory.class);
+
+ @Autowired
+ private DQStageDao dqStageDao;
+
+ public DQAbstractStage constructStage(DQStageTypeEnum stageTypeEnum, DQInstance instance) {
+ DQAbstractStage stage = null;
+ switch (stageTypeEnum) {
+ case RECORD:
+ stage = new DQRecordStage();
+ break;
+ case EVALUATE:
+ stage = new DQEvaluateStage();
+ break;
+ case ALERT:
+ stage = new DQAlertStage();
+ break;
+ default:
+ break;
+ }
+ if (stage == null) {
+ log.error("DQStageFactory - Unknown Stage Type: {}", stageTypeEnum);
+ return stage;
+ }
+ stage.setInstance(instance);
+ stage.setSubTaskList(instance.getSubTaskList());
+ stage.setDqTaskService(SpringUtils.getObject("dqTaskService", DQTaskService.class));
+ stage.setDqStageService(SpringUtils.getObject("dqStageService", DQStageService.class));
+ dqStageDao.insert(stage);
+ return stage;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
new file mode 100644
index 00000000..dcb2ce07
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/DQTaskFactory.java
@@ -0,0 +1,55 @@
+package org.apache.griffin.core.worker.factory;
+
+import org.apache.griffin.core.api.entity.DQResoueceEnum;
+import org.apache.griffin.core.api.entity.GriffinDQBusinessRule;
+import org.apache.griffin.core.worker.dao.DQTaskDao;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Component
+public class DQTaskFactory {
+
+ private static final Logger log = LoggerFactory.getLogger(DQTaskFactory.class);
+
+ @Autowired
+ private DQTaskDao dqTaskDao;
+
+ public List<DQBaseTask> constructTasks(DQResoueceEnum resoueceEnum,
+ List<GriffinDQBusinessRule> businessRuleList) {
+ switch (resoueceEnum) {
+ // construct hive
+ case HIVE: return constructHiveTasks(businessRuleList);
+ // construct kafka
+ case KAFKA: return constructKafkaTasks(businessRuleList);
+ }
+ return null;
+ }
+
+ private List<DQBaseTask> constructHiveTasks(List<GriffinDQBusinessRule> businessRuleList) {
+ return businessRuleList.stream()
+ .map(this::constructHiveTask)
+ .collect(Collectors.toList());
+ }
+
+ private DQBaseTask constructHiveTask(GriffinDQBusinessRule businessRule) {
+ return null;
+ }
+
+ private List<DQBaseTask> constructKafkaTasks(List<GriffinDQBusinessRule> businessRuleList) {
+ return businessRuleList.stream()
+ .map(this::constructKafkaTask)
+ .collect(Collectors.toList());
+ }
+
+ private DQBaseTask constructKafkaTask(GriffinDQBusinessRule businessRule) {
+ // todo
+ return null;
+ }
+
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java
deleted file mode 100644
index b6b79f32..00000000
--- a/core/src/main/java/org/apache/griffin/core/worker/factory/TaskFactory.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.griffin.core.worker.factory;
-
-public class TaskFactory {
-
-
-
-
-
-}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java b/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
index 32970020..bc1c280a 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/factory/TemplateDriverFactory.java
@@ -1,12 +1,33 @@
package org.apache.griffin.core.worker.factory;
+import com.google.common.collect.Maps;
+import org.apache.griffin.core.worker.driver.PrestoTemplateDriver;
+import org.apache.griffin.core.worker.driver.SparkTemplateDriver;
import org.apache.griffin.core.worker.driver.TemplateDriver;
import org.apache.griffin.core.worker.entity.enums.DQEngineEnum;
import org.springframework.stereotype.Service;
+import java.util.Map;
+
@Service
public class TemplateDriverFactory {
+ Map<DQEngineEnum, TemplateDriver> engineDriverMap = Maps.newConcurrentMap();
+
public TemplateDriver getTemplateDrvier(DQEngineEnum engine) {
- return null;
+ TemplateDriver templateDriver = engineDriverMap.get(engine);
+ if (templateDriver != null) return templateDriver;
+
+ switch (engine) {
+ case PRESTO:
+ templateDriver = new PrestoTemplateDriver();
+ break;
+ case SPARK:
+ templateDriver = new SparkTemplateDriver();
+ break;
+ default:
+ break;
+ }
+ engineDriverMap.put(engine, templateDriver);
+ return templateDriver;
}
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
index 79759a02..d29bcb81 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/schedule/TaskDispatcherScheduler.java
@@ -3,13 +3,15 @@ package org.apache.griffin.core.worker.schedule;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.api.context.WorkerContext;
import org.apache.griffin.core.worker.entity.bo.DQInstance;
-import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
-import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+import org.apache.griffin.core.worker.exception.StageSubmitException;
import org.apache.griffin.core.worker.service.DQInstanceService;
-import org.apache.griffin.core.worker.service.DQTaskService;
+import org.apache.griffin.core.worker.service.DQStageService;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -19,11 +21,12 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
- * 任务执行调度期 和 dispatcher交互
+ * schedule task
*/
@Component
public class TaskDispatcherScheduler {
@@ -31,7 +34,7 @@ public class TaskDispatcherScheduler {
private WorkerContext wc;
private DQInstanceService dqInstanceService;
- private DQTaskService dqTaskService;
+ private DQStageService dqStageService;
@Autowired
public void setWc(WorkerContext wc) {
@@ -42,41 +45,40 @@ public class TaskDispatcherScheduler {
this.dqInstanceService = dqInstanceService;
}
@Autowired
- public void setDqTaskService(DQTaskService dqTaskService) {
- this.dqTaskService = dqTaskService;
+ public void setDqStageService(DQStageService dqStageService) {
+ this.dqStageService = dqStageService;
}
@PostConstruct
public void startEvaluetingAndAlertThread() {
- // todo 用线程池运行
+ // todo submit to thread pool
scanAlertingTask();
scanEvaluatingTask();
}
/**
- * 进行任务调度
+ * schedule task
+ * put task to the queue of recording task
*/
@Scheduled(fixedDelay = 5 * 1000L)
public void doTaskDispatcherScheduler() {
log.info("doTaskDispatcherScheduler start.");
List<DQInstance> waittingToRemoveFromWaitingList = Lists.newArrayList();
Queue<DQInstance> waitingToRecordingDQInstanceQueue = Queues.newPriorityBlockingQueue(wc.getWAITTING_TASK_QUEUE());
- // 从waitting队列获取任务
+
try {
while (true) {
try {
DQInstance dqInstance = waitingToRecordingDQInstanceQueue.poll();
- // 队列中无元素 结束循环
+ // queue is empty, quit
if (dqInstance == null) break;
if (DQInstanceStatus.ACCEPTED != dqInstance.getStatus()) {
- // 非初始状态
- // 同步数据库状态 缓存中的实例状态可能不是最新的 从数据库构建最新的实例 替换缓存中的实例
+ // State is not init, sync from database and reassign it
dqInstance = dqInstanceService.getById(dqInstance.getId());
- // 根据状态放到对应队列
+ // assign task by status
waittingToRemoveFromWaitingList.add(dqInstance);
} else {
- // 提交到recording队列
- // 开始提交任务 放到recording队列中
+ // normal, update status and remove from queue, then put it to the queue of recording task
if (dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.WAITTING)) {
waittingToRemoveFromWaitingList.add(dqInstance);
}
@@ -90,9 +92,9 @@ public class TaskDispatcherScheduler {
// todo
log.error("scanRecordingTask failed, ex:", e);
} finally {
- // 根据状态分发到指定队列
+ // assign task by status
waittingToRemoveFromWaitingList.forEach(this::offerToSpecQueueByStatus);
- // 从 原队列移除 队列移除
+ // remove from queue
if (CollectionUtils.isNotEmpty(waittingToRemoveFromWaitingList)) wc.removeAll(wc.getWAITTING_TASK_QUEUE(), waittingToRemoveFromWaitingList);
}
log.info("doTaskDispatcherScheduler end.");
@@ -102,15 +104,16 @@ public class TaskDispatcherScheduler {
DQInstanceStatus status = instance.getStatus();
switch (status) {
case WAITTING:
- case RUNNING:
- case SUBMITTING:
+// case RUNNING:
+// case SUBMITTING:
case RECORDING:
wc.offerToRecordingTaskQueue(instance);
break;
case EVALUATING:
wc.offerToEvaluatingTaskQueue(instance);
break;
- case EVALUATE_ALERTING:
+// case EVALUATE_ALERTING:
+ case ALERTING:
case FAILED_ALERTING:
wc.offerToAlertingTaskQueue(instance);
break;
@@ -121,7 +124,7 @@ public class TaskDispatcherScheduler {
wc.addSuccessDQInstanceInfo(instance);
break;
default:
- // todo 未知状态 丢弃任务
+ // todo Unknown state Drop
log.warn("Unknown status, id : {}, status : {}, instance: {}", instance.getId(), status, instance);
break;
}
@@ -154,138 +157,88 @@ public class TaskDispatcherScheduler {
}
private void processRecordingInstance(DQInstance dqInstance, List<DQInstance> waittingToRemoveFromRecordingList) {
- // 检查当前环境是否有可以提交任务到dispatcher(并发度限制 需要根据提交的引擎计算)
- DQInstanceStatus instanceStatus = dqInstance.getStatus();
- List<DQBaseTask> subTaskList = dqInstance.getSubTaskList();
- switch (instanceStatus) {
- case ACCEPTED:
- case WAITTING:
- dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.SUBMITTING);
- submitTaskToDispatcher(subTaskList);
- if (!dqInstance.hasTaskToSubmit()) {
- dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.RUNNING);
- }
- break;
- case SUBMITTING:
- submitTaskToDispatcher(subTaskList);
- // 检查是否所有任务都已经提交
- if (!dqInstance.hasTaskToSubmit()) {
- dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.RUNNING);
- }
- break;
- case RUNNING:
- // 检查并更新任务状态
- checkJobStatus(subTaskList);
- if (dqInstance.isFinishRecord()) {
- // record 任务都完成了 准备移除
- waittingToRemoveFromRecordingList.add(dqInstance);
+ try {
+ DQAbstractStage recordingStage = dqInstance.getRecordingStage();
+ DQStageStatus stageStatus = recordingStage.getStatus();
+ if (stageStatus == DQStageStatus.RUNNABLE) {
+ if (!dqStageService.submitStage(recordingStage)) {
+ throw new StageSubmitException("Submit stage failed!, instance id: " + dqInstance.getId());
+ } else {
+ dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.RECORDING);
}
- break;
- default:
- break;
- }
- }
-
- private void checkJobStatus(List<DQBaseTask> subTaskList) {
- subTaskList.forEach(task -> dqTaskService.checkJobStatus(task));
- }
-
- private void submitTaskToDispatcher(List<DQBaseTask> subTaskList) {
- // 遍历 recording tasks 检查状态进行更新
- subTaskList.forEach(task -> {
- DQTaskStatus taskStatus = task.getStatus();
- switch (taskStatus) {
- case WAITTING:
- // 提交任务
- doSubmitTaskToDispatcher(task);
- if (task.isFailed()) {
- // 提交一直失败
- dqTaskService.updateTaskStatus(task, DQTaskStatus.FAILED);
- }
- break;
- case RECORDING:
- // 查询结果
- boolean isFinished = dqTaskService.checkJobStatus(task);
- if (isFinished) {
- // 任务结束, 设置任务状态为record结束
- dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDED);
- }
- break;
- default:
- // 其余状态不处理
- break;
+ } else if (stageStatus == DQStageStatus.FINISH) {
+ // if there is one task success, the instance should be EVALUATING;
+ // if all tasks are failed, the instance should be FAILED_ALERTING;
+ DQInstanceStatus instanceStatus = recordingStage.hasSuccess()? DQInstanceStatus.EVALUATING: DQInstanceStatus.FAILED_ALERTING;
+ dqInstanceService.updateStatus(dqInstance, instanceStatus);
+ waittingToRemoveFromRecordingList.add(dqInstance);
}
- });
- }
-
- private void doSubmitTaskToDispatcher(DQBaseTask task) {
- // 并发度检查
- if (!wc.canSubmitToSpecEngine(task.getEngine())) return;
- if (dqTaskService.doSubmitRecordingTask(task)) {
- // 任务提交成功 更新状态为recording
- dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDING);
- } else {
- // 提交失败 记录一次失败
- task.incrStatusAge();
+ } catch (Exception e) {
+ // todo rollback dqInstance status
+ log.error("e: ", e);
}
}
public void scanEvaluatingTask() {
- LinkedBlockingQueue<DQInstance> evaluating_task_queue = wc.getEVALUATING_TASK_QUEUE();
- while (true) {
- DQInstance dqInstance = null;
- try {
- // Evaluating 来一个处理一个
- dqInstance = evaluating_task_queue.poll(5, TimeUnit.SECONDS);
- if (dqInstance == null) continue;
- // 根据状态打回任务
- // 执行evaluating
- if (dqInstance.getStatus() == DQInstanceStatus.EVALUATING) {
- dqInstance.doEvaluteTask();
- } else {
- offerToSpecQueueByStatus(dqInstance);
- }
- } catch (Exception e) {
- if (dqInstance != null) {
- log.error("scanEvaluatingTask doEvalute failed, id : {}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
- dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.FAILED_ALERTING);
+ Executors.newCachedThreadPool().execute(() -> {
+ LinkedBlockingQueue<DQInstance> evaluating_task_queue = wc.getEVALUATING_TASK_QUEUE();
+ while (true) {
+ DQInstance dqInstance = null;
+ try {
+ dqInstance = evaluating_task_queue.poll(5, TimeUnit.SECONDS);
+ if (dqInstance == null) continue;
+ // do Evaluate
+ DQStage evaluatingStage = dqInstance.getEvaluatingStage();
+ if (dqInstance.getStatus() == DQInstanceStatus.EVALUATING) {
+ dqStageService.executeStage(evaluatingStage);
+ DQInstanceStatus dqInstanceStatus = evaluatingStage.hasSuccess() ? DQInstanceStatus.ALERTING : DQInstanceStatus.FAILED_ALERTING;
+ dqInstanceService.updateStatus(dqInstance, dqInstanceStatus);
+ }
offerToSpecQueueByStatus(dqInstance);
- } else {
- log.error("scanEvaluatingTask poll instance failed. ex:", e);
+ } catch (Exception e) {
+ if (dqInstance != null) {
+ log.error("scanEvaluatingTask doEvalute failed, id : {}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
+ dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.FAILED_ALERTING);
+ offerToSpecQueueByStatus(dqInstance);
+ } else {
+ log.error("scanEvaluatingTask poll instance failed. ex:", e);
+ }
}
}
- }
+ });
}
public void scanAlertingTask() {
- LinkedBlockingQueue<DQInstance> alerting_task_queue = wc.getALERTING_TASK_QUEUE();
- while (true) {
- DQInstance dqInstance = null;
- try {
- // Evaluating 来一个处理一个
- dqInstance = alerting_task_queue.poll(1, TimeUnit.SECONDS);
- if (dqInstance == null) continue;
- // 根据状态打回任务
- // 执行evaluating
- if (dqInstance.getStatus() == DQInstanceStatus.FAILED_ALERTING || dqInstance.getStatus() == DQInstanceStatus.EVALUATE_ALERTING) {
- dqInstance.doAlertTask();
- } else {
- offerToSpecQueueByStatus(dqInstance);
- }
- } catch (Exception e) {
- if (dqInstance != null) {
- log.error("scanAlertingTask doAlert failed, id : {}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
- if (dqInstance.isFailed()) {
- // 重试很多次了 直接设置为失败
- dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.FAILED);
- // 没有失败很多次的话,状态不修改 放回队列重试
+ Executors.newCachedThreadPool().execute(() -> {
+ LinkedBlockingQueue<DQInstance> alerting_task_queue = wc.getALERTING_TASK_QUEUE();
+ while (true) {
+ DQInstance dqInstance = null;
+ try {
+ dqInstance = alerting_task_queue.poll(1, TimeUnit.SECONDS);
+ if (dqInstance == null) continue;
+ // do alerting
+ if (dqInstance.getStatus() == DQInstanceStatus.FAILED_ALERTING || dqInstance.getStatus() == DQInstanceStatus.ALERTING) {
+ DQStage alertingStage = dqInstance.getAlertingStage();
+ dqStageService.executeStage(alertingStage);
+ DQInstanceStatus dqInstanceStatus = alertingStage.hasSuccess()? DQInstanceStatus.SUCCESS : DQInstanceStatus.FAILED;
+ dqInstanceService.updateStatus(dqInstance, dqInstanceStatus);
}
- // 放回队列准备重试
offerToSpecQueueByStatus(dqInstance);
- } else {
- log.error("scanAlertingTask poll instance failed. ex:", e);
+ } catch (Exception e) {
+ if (dqInstance != null) {
+ log.error("scanAlertingTask doAlert failed, id : {}, instance : {}, ex:", dqInstance.getId(), dqInstance, e);
+ if (dqInstance.isFailed()) {
+ // retry 5 times, set failed
+ dqInstanceService.updateStatus(dqInstance, DQInstanceStatus.FAILED);
+ // retry times less than 5, do not modify status and put task back
+ }
+ // put task back
+ offerToSpecQueueByStatus(dqInstance);
+ } else {
+ log.error("scanAlertingTask poll instance failed. ex:", e);
+ }
}
}
- }
+ });
}
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java b/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
index a39c17d1..276ebdaa 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/service/DQInstanceService.java
@@ -1,8 +1,11 @@
package org.apache.griffin.core.worker.service;
+import org.apache.griffin.core.api.entity.GriffinDQContentInstanceMap;
+import org.apache.griffin.core.api.dao.DQContentInstanceMapDao;
import org.apache.griffin.core.worker.dao.DQInstanceDao;
import org.apache.griffin.core.worker.entity.bo.DQInstance;
import org.apache.griffin.core.worker.entity.enums.DQInstanceStatus;
+import org.apache.griffin.core.worker.factory.DQInstanceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -14,12 +17,16 @@ public class DQInstanceService {
@Autowired
private DQInstanceDao dqInstanceDao;
+ @Autowired
+ private DQContentInstanceMapDao dqContentInstanceMapDao;
+ @Autowired
+ private DQInstanceFactory dqInstanceFactory;
/**
* 数据库和内存需要保持同步
- * @param instance
- * @param status
- * @return
+ * @param instance obj
+ * @param status stat
+ * @return true or false
*/
public boolean updateStatus(DQInstance instance, DQInstanceStatus status) {
try {
@@ -33,7 +40,32 @@ public class DQInstanceService {
return false;
}
- public DQInstance getById(long id) {
- return null;
+ /**
+ * construct instance
+ * @param id master assign id
+ * @return DQInstance
+ */
+ public DQInstance getById(Long id) {
+ if (id == null) {
+ log.error("Unknown instance id: null");
+ }
+ DQInstance ins = dqInstanceDao.getById(id);
+ if (ins == null) {
+ // the ins id has no task info
+ return constructInstance(id);
+ }
+ return recoveryInstance(ins);
+ }
+
+ private DQInstance constructInstance(Long id) {
+ log.info("constructInstance id: {}", id);
+ GriffinDQContentInstanceMap contentInstanceMap = dqContentInstanceMapDao.getContentInstanceMapByInstanceId(id);
+ Long instanceId = contentInstanceMap.getInstanceId();
+ Long dqcId = contentInstanceMap.getDqcId();
+ return dqInstanceFactory.constructInstance(instanceId, dqcId);
+ }
+
+ private DQInstance recoveryInstance(DQInstance instance) {
+ return dqInstanceFactory.recoveryInstance(instance);
}
}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/service/DQStageService.java b/core/src/main/java/org/apache/griffin/core/worker/service/DQStageService.java
new file mode 100644
index 00000000..2a2916d5
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/service/DQStageService.java
@@ -0,0 +1,58 @@
+package org.apache.griffin.core.worker.service;
+
+import org.apache.griffin.core.worker.dao.DQStageDao;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.apache.griffin.core.worker.stage.DQAbstractStage;
+import org.apache.griffin.core.worker.stage.DQStage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.Executors;
+
+@Service
+public class DQStageService {
+
+ private static final Logger log = LoggerFactory.getLogger(DQStageService.class);
+ private DQStageDao dqStageDao;
+
+ public boolean updateTaskStatus(DQAbstractStage stage, DQStageStatus status) {
+ try {
+ dqStageDao.updateDQStageStatus(stage, status.getCode());
+ stage.setStatus(status);
+ return true;
+ } catch (Exception e) {
+ // todo
+ log.error("stage {} {} => {} failed, ex", stage, stage.getStatus(), status, e);
+ }
+ return false;
+ }
+
+ // async to submit task
+ public boolean submitStage(DQStage dqStage) {
+ log.info("start to submit stage");
+ // todo prepare thread pool
+ try {
+ Executors.newCachedThreadPool().execute(dqStage::start);
+ return true;
+ } catch (Exception e) {
+ // todo
+ log.error("Submit failed. ex:", e);
+ }
+ return false;
+ }
+
+ // sync to execute task
+ public boolean executeStage(DQStage dqStage) {
+ try {
+ dqStage.start();
+ return true;
+ } catch (Exception e) {
+ // todo
+ log.error("Submit failed. ex:", e);
+ }
+ return false;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
index 8c0f018b..bf5b38b6 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/service/DQTaskService.java
@@ -28,42 +28,43 @@ public class DQTaskService {
@Autowired
private DispatcherClient dispatcherClient;
- public boolean updateTaskStatus(DQBaseTask task, DQTaskStatus status) {
+ public void updateTaskStatus(DQBaseTask task, DQTaskStatus status) {
try {
- dqTaskDao.updateDQTaskListStatus(task, status.getCode());
+ dqTaskDao.updateDQTaskStatus(task, status.getCode());
task.setStatus(status);
- return true;
} catch (Exception e) {
log.error("task {} {} => {} failed, ex", task.getId(), task.getStatus(), status, e);
}
- return false;
}
- public boolean updateTaskStatus(List<DQBaseTask> tasks, DQTaskStatus status) {
- if (CollectionUtils.isEmpty(tasks)) return false;
+ public void updateTaskStatus(List<DQBaseTask> tasks, DQTaskStatus status) {
+ if (CollectionUtils.isEmpty(tasks)) return;
DQBaseTask sampleTask = tasks.get(0);
List<Long> taskIdList = tasks.stream().map(DQBaseTask::getId).collect(Collectors.toList());
try {
dqTaskDao.updateDQTaskListStatus(tasks, status.getCode());
tasks.forEach(x -> x.setStatus(status));
- return true;
} catch (Exception e) {
log.error("task {} {} => {} failed, ex", taskIdList, sampleTask.getStatus(), status, e);
}
- return false;
}
public boolean doSubmitRecordingTask(DQBaseTask task) {
- // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
- List<Pair<Long, SubmitRequest>> requestList = getSubmitRequest(task);
- // 提交任务 获取任务对应的job信息
- List<JobStatus> jobStatusList = requestList.stream()
- .map(reqPair -> Pair.of(reqPair.getLeft(), dispatcherClient.submitSql(reqPair.getRight())))
- .map(respPair -> dispatcherClient.wrapperSubmitResponse(respPair))
- .collect(Collectors.toList());
- // 设置job信息
- task.setJobStatusList(jobStatusList);
- return true;
+ try {
+ // 一个task对应多个dispatcher任务 分别获取所有的任务对应的请求
+ List<Pair<Long, SubmitRequest>> requestList = getSubmitRequest(task);
+ // 提交任务 获取任务对应的job信息
+ List<JobStatus> jobStatusList = requestList.stream()
+ .map(reqPair -> Pair.of(reqPair.getLeft(), dispatcherClient.submitSql(reqPair.getRight())))
+ .map(respPair -> dispatcherClient.wrapperSubmitResponse(respPair))
+ .collect(Collectors.toList());
+ // 设置job信息
+ task.setJobStatusList(jobStatusList);
+ return true;
+ } catch (Exception e) {
+ log.error("ex: ", e);
+ }
+ return false;
}
public boolean checkJobStatus(DQBaseTask task) {
diff --git a/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java b/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
index 314a328d..2a5f9863 100644
--- a/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
+++ b/core/src/main/java/org/apache/griffin/core/worker/service/WorkCoreService.java
@@ -1,6 +1,6 @@
package org.apache.griffin.core.worker.service;
-import org.apache.griffin.core.worker.context.WorkerContext;
+import org.apache.griffin.core.api.context.WorkerContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
new file mode 100644
index 00000000..f866a533
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAbstractStage.java
@@ -0,0 +1,67 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.griffin.core.worker.entity.bo.DQInstance;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+import org.apache.griffin.core.worker.service.DQStageService;
+import org.apache.griffin.core.worker.service.DQTaskService;
+
+import java.util.List;
+
+public abstract class DQAbstractStage implements DQStage {
+ protected DQTaskService dqTaskService;
+ protected DQStageService dqStageService;
+
+ protected DQStageStatus status;
+ protected DQInstance instance;
+
+ protected List<DQBaseTask> subTaskList;
+
+ public DQAbstractStage() {
+ this.status = DQStageStatus.INIT;
+ }
+
+ public DQInstance getInstance() {
+ return instance;
+ }
+
+ public void setInstance(DQInstance instance) {
+ this.instance = instance;
+ }
+
+ public void setDqTaskService(DQTaskService dqTaskService) {
+ this.dqTaskService = dqTaskService;
+ }
+
+ public void setDqStageService(DQStageService dqStageService) {
+ this.dqStageService = dqStageService;
+ }
+
+ public List<DQBaseTask> getSubTaskList() {
+ return subTaskList;
+ }
+
+ public void setSubTaskList(List<DQBaseTask> subTaskList) {
+ this.subTaskList = subTaskList;
+ }
+
+ public DQStageStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(DQStageStatus status) {
+ this.status = status;
+ }
+
+ public abstract void process();
+
+ public void updateStatus(DQStageStatus status) {
+ dqStageService.updateTaskStatus(this, status);
+ }
+
+ public void start() {
+ updateStatus(DQStageStatus.RUNNING);
+ process();
+ updateStatus(DQStageStatus.FINISH);
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQAlertStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAlertStage.java
new file mode 100644
index 00000000..2f980afd
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQAlertStage.java
@@ -0,0 +1,33 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.utils.MsgSender;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class DQAlertStage extends DQAbstractStage {
+
+ @Override
+ public void process() {
+
+ List<String> alertMsgList = subTaskList.stream()
+ .filter(DQBaseTask::isNeedAlert)
+ .map(DQBaseTask::alert)
+ .collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(alertMsgList)) {
+ return;
+ }
+ MsgSender.send(packageAlertMsg(alertMsgList), instance.getDqAlertRule().getReceivers(), instance.getDqAlertRule().getSendType());
+ }
+
+ private String packageAlertMsg(List<String> alertMsgList) {
+ return null;
+ }
+
+ @Override
+ public boolean hasSuccess() {
+ return false;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQEvaluateStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQEvaluateStage.java
new file mode 100644
index 00000000..bcbabc08
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQEvaluateStage.java
@@ -0,0 +1,50 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+@Component
+public class DQEvaluateStage extends DQAbstractStage {
+
+ private static final Logger log = LoggerFactory.getLogger(DQEvaluateStage.class);
+
+ @Override
+ public void process() {
+ // failed task will not do evaluate
+ List<DQBaseTask> waitToDoEvaluateTaskList = subTaskList.stream()
+ .filter(task -> task.getStatus() == DQTaskStatus.RECORDED)
+ .collect(Collectors.toList());
+ dqTaskService.updateTaskStatus(waitToDoEvaluateTaskList, DQTaskStatus.EVALUATING);
+ waitToDoEvaluateTaskList.forEach(task -> {
+ // retry 3 times
+ for (int i = 0; i < 3; i++) {
+ try {
+ task.evaluate();
+ dqTaskService.updateTaskStatus(task, DQTaskStatus.EVALUATED);
+ break;
+ } catch (Exception e) {
+ log.error("Evaluate failed {} times", i);
+ }
+ }
+ if (task.getStatus() != DQTaskStatus.EVALUATED) {
+ dqTaskService.updateTaskStatus(task, DQTaskStatus.FAILED);
+ }
+ });
+ }
+
+ @Override
+ public boolean hasSuccess() {
+ for (DQBaseTask task : subTaskList) {
+ if (task.getStatus() == DQTaskStatus.EVALUATED) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQRecordStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQRecordStage.java
new file mode 100644
index 00000000..24550c75
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQRecordStage.java
@@ -0,0 +1,106 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.griffin.core.worker.client.DispatcherClient;
+import org.apache.griffin.core.worker.entity.bo.task.DQBaseTask;
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+import org.apache.griffin.core.worker.entity.enums.DQTaskStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class DQRecordStage extends DQAbstractStage {
+ private static final Logger log = LoggerFactory.getLogger(DQRecordStage.class);
+
+ private DispatcherClient dispatcherClient;
+
+
+ public DispatcherClient getDispatcherClient() {
+ return dispatcherClient;
+ }
+
+ public void setDispatcherClient(DispatcherClient dispatcherClient) {
+ this.dispatcherClient = dispatcherClient;
+ }
+
+
+ @Override
+ public void process() {
+ while (continueCheckTaskStaus()) {
+ try {
+ submitTaskToDispatcher();
+ TimeUnit.SECONDS.sleep(1);
+ } catch (Exception e) {
+ // todo
+ log.error("ex: ", e);
+ }
+ }
+ }
+
+ /**
+ * one of tasks submitted to dispatcher is success
+ * @return boolean
+ */
+ @Override
+ public boolean hasSuccess() {
+ for (DQBaseTask task : subTaskList) {
+ if (task.getStatus() == DQTaskStatus.RECORDED) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean continueCheckTaskStaus() {
+ boolean doContinue = false;
+ for (DQBaseTask dqBaseTask : subTaskList) {
+ if (dqBaseTask.getStatus() == DQTaskStatus.RECORDING || dqBaseTask.getStatus() == DQTaskStatus.WAITTING) {
+ doContinue = true;
+ break;
+ }
+ }
+ return doContinue;
+ }
+
+ private void submitTaskToDispatcher() {
+ // loop and check status
+ subTaskList.forEach(task -> {
+ DQTaskStatus taskStatus = task.getStatus();
+ switch (taskStatus) {
+ case WAITTING:
+ // submit task
+ doSubmitTaskToDispatcher(task);
+ if (task.isFailed()) {
+ // failed
+ dqTaskService.updateTaskStatus(task, DQTaskStatus.FAILED);
+ }
+ break;
+ case RECORDING:
+ // query status
+ boolean isFinished = dqTaskService.checkJobStatus(task);
+ if (isFinished) {
+ // task is finish, set status
+ dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDED);
+ }
+ break;
+ default:
+ // no handle
+ break;
+ }
+ });
+ }
+
+ private void doSubmitTaskToDispatcher(DQBaseTask task) {
+ // 并发度检查
+ if (!dispatcherClient.canSubmitToSpecEngine(task.getEngine())) return;
+ if (dqTaskService.doSubmitRecordingTask(task)) {
+ // 任务提交成功 更新状态为recording
+ dqTaskService.updateTaskStatus(task, DQTaskStatus.RECORDING);
+ } else {
+ // 提交失败 记录一次失败
+ task.incrStatusAge();
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/stage/DQStage.java b/core/src/main/java/org/apache/griffin/core/worker/stage/DQStage.java
new file mode 100644
index 00000000..4daa54cf
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/stage/DQStage.java
@@ -0,0 +1,9 @@
+package org.apache.griffin.core.worker.stage;
+
+import org.apache.griffin.core.worker.entity.enums.DQStageStatus;
+
+public interface DQStage {
+ void process();
+ void start();
+ boolean hasSuccess();
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java b/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java
new file mode 100644
index 00000000..8e5a1cf4
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/utils/DQDateUtils.java
@@ -0,0 +1,14 @@
+package org.apache.griffin.core.worker.utils;
+
+import java.util.concurrent.TimeUnit;
+
+public class DQDateUtils {
+
+ public static String praseBussinesstimeToPartitionSQL(Long businessTime, String partition) {
+ return null;
+ }
+
+ public static Long offsetTime(Long businessTime, Integer offset, TimeUnit unit) {
+ return 0L;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java b/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java
new file mode 100644
index 00000000..23884c88
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/utils/ExpressionUtils.java
@@ -0,0 +1,7 @@
+package org.apache.griffin.core.worker.utils;
+
+public class ExpressionUtils {
+ public static boolean evaluate(String expression, double metricValue) {
+ return false;
+ }
+}
diff --git a/core/src/main/java/org/apache/griffin/core/worker/utils/MsgSender.java b/core/src/main/java/org/apache/griffin/core/worker/utils/MsgSender.java
new file mode 100644
index 00000000..5bc5a6ce
--- /dev/null
+++ b/core/src/main/java/org/apache/griffin/core/worker/utils/MsgSender.java
@@ -0,0 +1,11 @@
+package org.apache.griffin.core.worker.utils;
+
+import java.util.List;
+
+public class MsgSender {
+
+ public static boolean send(String msg, List<String> receiver, int sendType) {
+ return false;
+ }
+
+}
diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml
new file mode 100644
index 00000000..fddc2639
--- /dev/null
+++ b/core/src/main/resources/application.yml
@@ -0,0 +1,4 @@
+task:
+ assign:
+ strategy:
+ org.apache.griffin.core.master.strategy.LooperAssignStrategy
\ No newline at end of file